You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2017/03/09 20:40:44 UTC
[1/3] ignite git commit: ignite-1.9 - SQL related fixes and
improvements: - Sorted MERGE index - EXPLAIN fixes - Replicated subqueries
fixes
Repository: ignite
Updated Branches:
refs/heads/master a61a98ad3 -> 32506d69c
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 4cae6ac..8d3ece9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSortColumn;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@ -86,6 +87,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Session;
import org.h2.index.Cursor;
+import org.h2.index.Index;
import org.h2.jdbc.JdbcConnection;
import org.h2.jdbc.JdbcResultSet;
import org.h2.jdbc.JdbcStatement;
@@ -97,6 +99,7 @@ import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import static java.util.Collections.singletonList;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection;
@@ -115,6 +118,12 @@ public class GridReduceQueryExecutor {
private static final IgniteProductVersion DISTRIBUTED_JOIN_SINCE = IgniteProductVersion.fromString("1.7.0");
/** */
+ private static final String MERGE_INDEX_UNSORTED = "merge_scan";
+
+ /** */
+ private static final String MERGE_INDEX_SORTED = "merge_sorted";
+
+ /** */
private GridKernalContext ctx;
/** */
@@ -354,7 +363,7 @@ public class GridReduceQueryExecutor {
if (msg.retry() != null)
retry(r, msg.retry(), node.id());
- else if (msg.allRows() != -1) // Only the first page contains row count.
+ else if (msg.page() == 0) // Do count down on each first page received.
r.latch.countDown();
}
@@ -518,7 +527,7 @@ public class GridReduceQueryExecutor {
Map<ClusterNode, IntArray> partsMap = null;
if (qry.isLocal())
- nodes = Collections.singleton(ctx.discovery().localNode());
+ nodes = singletonList(ctx.discovery().localNode());
else {
if (isPreloadingActive(cctx, extraSpaces)) {
if (cctx.isReplicated())
@@ -542,17 +551,17 @@ public class GridReduceQueryExecutor {
"We must be on a client node.";
// Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
- nodes = Collections.singleton(F.rand(nodes));
+ nodes = singletonList(F.rand(nodes));
}
}
- final Collection<ClusterNode> finalNodes = nodes;
-
int tblIdx = 0;
final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
- final int segmentsPerIndex = cctx.config().getQueryParallelism();
+ final int segmentsPerIndex = qry.explain() ? 1 : cctx.config().getQueryParallelism();
+
+ int replicatedQrysCnt = 0;
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
GridMergeIndex idx;
@@ -567,19 +576,33 @@ public class GridReduceQueryExecutor {
throw new IgniteException(e);
}
- idx = tbl.getScanIndex(null);
+ idx = tbl.getMergeIndex();
fakeTable(r.conn, tblIdx++).innerTable(tbl);
}
else
idx = GridMergeIndexUnsorted.createDummy(ctx);
- idx.setSources(nodes, segmentsPerIndex);
+ // If the query has only replicated tables, we have to run it on a single node only.
+ if (!mapQry.isPartitioned()) {
+ ClusterNode node = F.rand(nodes);
+
+ mapQry.node(node.id());
+
+ replicatedQrysCnt++;
+
+ idx.setSources(singletonList(node), 1); // Replicated tables can have only 1 segment.
+ }
+ else
+ idx.setSources(nodes, segmentsPerIndex);
+
+ idx.setPageSize(r.pageSize);
r.idxs.add(idx);
}
- r.latch = new CountDownLatch(r.idxs.size() * nodes.size() * segmentsPerIndex);
+ r.latch = new CountDownLatch(
+ (r.idxs.size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt);
runs.put(qryReqId, r);
@@ -607,6 +630,8 @@ public class GridReduceQueryExecutor {
final boolean oldStyle = minNodeVer.compareToIgnoreTimestamp(DISTRIBUTED_JOIN_SINCE) < 0;
final boolean distributedJoins = qry.distributedJoins();
+ final Collection<ClusterNode> finalNodes = nodes;
+
cancel.set(new Runnable() {
@Override public void run() {
send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false);
@@ -627,6 +652,9 @@ public class GridReduceQueryExecutor {
if (qry.isLocal())
flags |= GridH2QueryRequest.FLAG_IS_LOCAL;
+ if (qry.explain())
+ flags |= GridH2QueryRequest.FLAG_EXPLAIN;
+
if (send(nodes,
oldStyle ?
new GridQueryRequest(qryReqId,
@@ -724,7 +752,7 @@ public class GridReduceQueryExecutor {
r.conn,
rdc.query(),
F.asList(rdc.parameters()),
- false,
+ false, // The statement will cache some extra thread local objects.
timeoutMillis,
cancel);
@@ -783,19 +811,6 @@ public class GridReduceQueryExecutor {
}
/**
- * @param idxs Merge indexes.
- * @return {@code true} If all remote data was fetched.
- */
- private static boolean allIndexesFetched(List<GridMergeIndex> idxs) {
- for (int i = 0; i < idxs.size(); i++) {
- if (!idxs.get(i).fetchedAll())
- return false;
- }
-
- return true;
- }
-
- /**
* Returns true if the exception is triggered by query cancel.
*
* @param e Exception.
@@ -1256,6 +1271,7 @@ public class GridReduceQueryExecutor {
* @return Table.
* @throws IgniteCheckedException If failed.
*/
+ @SuppressWarnings("unchecked")
private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
throws IgniteCheckedException {
try {
@@ -1290,7 +1306,31 @@ public class GridReduceQueryExecutor {
else
data.columns = planColumns();
- return new GridMergeTable(data, ctx);
+ boolean sortedIndex = !F.isEmpty(qry.sortColumns());
+
+ GridMergeTable tbl = new GridMergeTable(data);
+
+ ArrayList<Index> idxs = new ArrayList<>(2);
+
+ if (explain) {
+ idxs.add(new GridMergeIndexUnsorted(ctx, tbl,
+ sortedIndex ? MERGE_INDEX_SORTED : MERGE_INDEX_UNSORTED));
+ }
+ else if (sortedIndex) {
+ List<GridSqlSortColumn> sortCols = (List<GridSqlSortColumn>)qry.sortColumns();
+
+ GridMergeIndexSorted sortedMergeIdx = new GridMergeIndexSorted(ctx, tbl, MERGE_INDEX_SORTED,
+ GridSqlSortColumn.toIndexColumns(tbl, sortCols));
+
+ idxs.add(GridMergeTable.createScanIndex(sortedMergeIdx));
+ idxs.add(sortedMergeIdx);
+ }
+ else
+ idxs.add(new GridMergeIndexUnsorted(ctx, tbl, MERGE_INDEX_UNSORTED));
+
+ tbl.indexes(idxs);
+
+ return tbl;
}
catch (Exception e) {
U.closeQuiet(conn);
@@ -1407,22 +1447,17 @@ public class GridReduceQueryExecutor {
while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
latch.countDown();
+ CacheException e = o instanceof CacheException ? (CacheException) o : null;
+
for (GridMergeIndex idx : idxs) // Fail all merge indexes.
- idx.fail(nodeId, o instanceof CacheException ? (CacheException) o : null);
+ idx.fail(nodeId, e);
}
/**
* @param e Error.
*/
void disconnected(CacheException e) {
- if (!state.compareAndSet(null, e))
- return;
-
- while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
- latch.countDown();
-
- for (GridMergeIndex idx : idxs) // Fail all merge indexes.
- idx.fail(e);
+ state(e, null);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index 2057330..103084e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -40,7 +40,7 @@ public class GridResultPage {
private final UUID src;
/** */
- protected final GridQueryNextPageResponse res;
+ private final GridQueryNextPageResponse res;
/** */
private final int rowsInPage;
@@ -48,6 +48,9 @@ public class GridResultPage {
/** */
private Iterator<Value[]> rows;
+ /** */
+ private boolean last;
+
/**
* @param ctx Kernal context.
* @param src Source.
@@ -119,10 +122,28 @@ public class GridResultPage {
}
/**
- * @return {@code true} If this is a dummy last page for all the sources.
+ * @return {@code true} If this is either a real last page for a source or
+ * a dummy terminating page with no rows.
*/
public boolean isLast() {
- return false;
+ return last;
+ }
+
+ /**
+ * @return {@code true} If it is a dummy last page.
+ */
+ public boolean isDummyLast() {
+ return last && res == null;
+ }
+
+ /**
+ * @param last Last page for a source.
+ * @return {@code this}.
+ */
+ public GridResultPage setLast(boolean last) {
+ this.last = last;
+
+ return this;
}
/**
@@ -153,6 +174,13 @@ public class GridResultPage {
}
/**
+ * @return Segment ID.
+ */
+ public int segmentId() {
+ return res.segmentId();
+ }
+
+ /**
* @return Response.
*/
public GridQueryNextPageResponse response() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 0ad534c..0aa788b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -48,17 +48,22 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
* Map query will not destroy context until explicit query cancel request will be received because distributed join
* requests can be received.
*/
- public static int FLAG_DISTRIBUTED_JOINS = 1;
+ public static final int FLAG_DISTRIBUTED_JOINS = 1;
/**
* Remote map query executor will enforce join order for the received map queries.
*/
- public static int FLAG_ENFORCE_JOIN_ORDER = 1 << 1;
+ public static final int FLAG_ENFORCE_JOIN_ORDER = 1 << 1;
/**
* Restrict distributed joins range-requests to local index segments. Range requests to other nodes will not be sent.
*/
- public static int FLAG_IS_LOCAL = 1 << 2;
+ public static final int FLAG_IS_LOCAL = 1 << 2;
+
+ /**
+ * If it is an EXPLAIN command.
+ */
+ public static final int FLAG_EXPLAIN = 1 << 3;
/** */
private long reqId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 81c28a3..842b947 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -395,10 +395,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
Collection<List<?>> res = qry.getAll();
- if (cacheMode() == REPLICATED)
- assertEquals(1, res.size());
- else
- assertEquals(gridCount(), res.size());
+ assertEquals(1, res.size());
List<?> row = res.iterator().next();
@@ -410,10 +407,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
res = qry.getAll();
- if (cacheMode() == REPLICATED)
- assertEquals(1, res.size());
- else
- assertEquals(gridCount(), res.size());
+ assertEquals(1, res.size());
row = res.iterator().next();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 69f66a5..8eae549 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -36,13 +36,12 @@ import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
-import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridMergeIndex;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -156,6 +155,70 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
+ public void testSortedMergeIndex() throws Exception {
+ IgniteCache<Integer,Value> c = ignite(0).getOrCreateCache(cacheConfig("v", true,
+ Integer.class, Value.class));
+
+ try {
+ GridTestUtils.setFieldValue(null, GridMergeIndex.class, "PREFETCH_SIZE", 8);
+
+ Random rnd = new GridRandom();
+
+ int cnt = 1000;
+
+ for (int i = 0; i < cnt; i++) {
+ c.put(i, new Value(
+ rnd.nextInt(5) == 0 ? null: rnd.nextInt(100),
+ rnd.nextInt(8) == 0 ? null: rnd.nextInt(2000)));
+ }
+
+ List<List<?>> plan = c.query(new SqlFieldsQuery(
+ "explain select snd from Value order by fst desc")).getAll();
+ String rdcPlan = (String)plan.get(1).get(0);
+
+ assertTrue(rdcPlan.contains("merge_sorted"));
+ assertTrue(rdcPlan.contains("/* index sorted */"));
+
+ plan = c.query(new SqlFieldsQuery(
+ "explain select snd from Value")).getAll();
+ rdcPlan = (String)plan.get(1).get(0);
+
+ assertTrue(rdcPlan.contains("merge_scan"));
+ assertFalse(rdcPlan.contains("/* index sorted */"));
+
+ for (int i = 0; i < 10; i++) {
+ X.println(" --> " + i);
+
+ List<List<?>> res = c.query(new SqlFieldsQuery(
+ "select fst from Value order by fst").setPageSize(5)
+ ).getAll();
+
+ assertEquals(cnt, res.size());
+
+ Integer p = null;
+
+ for (List<?> row : res) {
+ Integer x = (Integer)row.get(0);
+
+ if (x != null) {
+ if (p != null)
+ assertTrue(x + " >= " + p, x >= p);
+
+ p = x;
+ }
+ }
+ }
+ }
+ finally {
+ GridTestUtils.setFieldValue(null, GridMergeIndex.class, "PREFETCH_SIZE", 1024);
+
+ c.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testGroupIndexOperations() throws Exception {
IgniteCache<Integer, GroupIndexTestValue> c = ignite(0).getOrCreateCache(cacheConfig("grp", false,
Integer.class, GroupIndexTestValue.class));
@@ -848,13 +911,44 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key";
- final SqlFieldsQuery qry = new SqlFieldsQuery(select0);
+ SqlFieldsQuery qry = new SqlFieldsQuery(select0);
qry.setDistributedJoins(true);
List<List<?>> results = c1.query(qry).getAll();
assertEquals(2, results.size());
+
+ select0 += " order by n2 desc";
+
+ qry = new SqlFieldsQuery(select0);
+
+ qry.setDistributedJoins(true);
+
+ results = c1.query(qry).getAll();
+
+ assertEquals(2, results.size());
+
+ assertEquals("p2", results.get(0).get(1));
+ assertEquals("p1", results.get(1).get(1));
+
+ // Test for replicated subquery with aggregate.
+ select0 = "select p.name " +
+ "from \"pers\".Person2 p, " +
+ "(select max(_key) orgId from \"org\".Organization) o " +
+ "where p.orgId = o.orgId";
+
+ X.println("Plan: \n" +
+ c1.query(new SqlFieldsQuery("explain " + select0).setDistributedJoins(true)).getAll());
+
+ qry = new SqlFieldsQuery(select0);
+
+ qry.setDistributedJoins(true);
+
+ results = c1.query(qry).getAll();
+
+ assertEquals(1, results.size());
+ assertEquals("p2", results.get(0).get(0));
}
finally {
c1.destroy();
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
index f8526a8..30dd80d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/H2CompareBigQueryTest.java
@@ -249,11 +249,13 @@ public class H2CompareBigQueryTest extends AbstractH2CompareQueryTest {
X.println(bigQry);
X.println();
- X.println("Plan: " + pCache.query(new SqlFieldsQuery("EXPLAIN " + bigQry)
+ X.println(" Plan: \n" + pCache.query(new SqlFieldsQuery("EXPLAIN " + bigQry)
.setDistributedJoins(distributedJoins())).getAll());
List<List<?>> res = compareQueryRes0(pCache, bigQry, distributedJoins(), new Object[0], Ordering.RANDOM);
+ X.println(" Result size: " + res.size());
+
assertTrue(!res.isEmpty()); // Ensure we set good testing data at database.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index b417b0a..0c74f12 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -106,6 +106,7 @@ import org.apache.ignite.internal.processors.query.h2.GridH2IndexingOffheapSelfT
import org.apache.ignite.internal.processors.query.h2.opt.GridH2TableSelfTest;
import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest;
import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
+import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistributedJoinsTest;
import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest;
import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;
import org.apache.ignite.testframework.IgniteTestSuite;
@@ -214,6 +215,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
// Ignite cache and H2 comparison.
suite.addTestSuite(BaseH2CompareQueryTest.class);
suite.addTestSuite(H2CompareBigQueryTest.class);
+ suite.addTestSuite(H2CompareBigQueryDistributedJoinsTest.class);
// Cache query metrics.
suite.addTestSuite(CacheLocalQueryMetricsSelfTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/query/h2/sql/bigQuery.sql
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/query/h2/sql/bigQuery.sql b/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/query/h2/sql/bigQuery.sql
index 8d42d44..64b7e2a 100644
--- a/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/query/h2/sql/bigQuery.sql
+++ b/modules/indexing/src/test/resources/org/apache/ignite/internal/processors/query/h2/sql/bigQuery.sql
@@ -26,21 +26,25 @@ from (
select date, orderId, rootOrderId, refOrderId as origOrderId, archSeq, alias
from "part".ReplaceOrder where alias='CUSTOM'
- ) co, "part".OrderParams op
- where co.date = op.date and co.orderId = op.orderId and co.archSeq = -- TODO: replace with 'dateToLong(co.date)+archSeq'.
- (
- select max(archSeq) -- TODO: replace with 'dateToLong(co.date)+archSeq'.
- from (
- select date, orderId, rootOrderId, origOrderId, archSeq, alias
- from "part".CustOrder where alias='CUSTOM'
-
- union all
-
- select date, orderId, rootOrderId, refOrderId as origOrderId, archSeq, alias
- from "part".ReplaceOrder where alias='CUSTOM'
- )
- where origOrderId = co.origOrderId and date = co.date
- ) and co.alias='CUSTOM'
+ ) co,
+ "part".OrderParams op,
+ (
+ select origOrderId, date, max(archSeq) maxArchSeq
+ from (
+ select date, orderId, rootOrderId, origOrderId, archSeq, alias
+ from "part".CustOrder where alias='CUSTOM'
+
+ union all
+
+ select date, orderId, rootOrderId, refOrderId as origOrderId, archSeq, alias
+ from "part".ReplaceOrder where alias='CUSTOM'
+ )
+ group by origOrderId, date
+ ) h
+ where co.date = op.date and co.orderId = op.orderId
+ and h.origOrderId = co.origOrderId and h.date = co.date
+ and co.archSeq = h.maxArchSeq
+ and co.alias='CUSTOM'
) cop
inner join (
select e.date, e.rootOrderId as eRootOrderId, e.rootOrderId, sum(e.execShares) as execShares,
[2/3] ignite git commit: ignite-1.9 - SQL related fixes and
improvements: - Sorted MERGE index - EXPLAIN fixes - Replicated subqueries
fixes
Posted by se...@apache.org.
ignite-1.9 - SQL related fixes and improvements:
- Sorted MERGE index
- EXPLAIN fixes
- Replicated subqueries fixes
Squashed commit of the following:
commit 423c2155c85ed9be8dffb3517b7331b753e1ce5c
Author: Sergi Vladykin <se...@gmail.com>
Date: Thu Mar 9 23:21:38 2017 +0300
ignite-1.9.1 - test fix
commit ff3c1f2967905b0bcac7661014656d1c080fa803
Author: Sergi Vladykin <se...@gmail.com>
Date: Thu Mar 9 11:08:34 2017 +0300
ignite-1.9.0 - replicated subqueries fix
commit bc0801a3c976f5d87cab2c414f76f69dc28b43d7
Author: Sergi Vladykin <se...@gmail.com>
Date: Wed Mar 8 16:03:40 2017 +0300
ignite-1.9.0 - fix for distributed join test
commit f1f1d96c6babaadab9e3ed1fbb3c9740c94d8209
Author: Sergi Vladykin <se...@gmail.com>
Date: Wed Mar 8 15:28:44 2017 +0300
ignite-1.9.0 - fix for distributed join test
commit a8751d535b3e025a804c441204465e94035a5247
Author: Sergi Vladykin <se...@gmail.com>
Date: Tue Feb 28 18:46:07 2017 +0300
ignite-1.9 - splitter fixes
commit 0601ce6e291eb4689d526e922b02fd9e21df5b08
Author: Sergi Vladykin <se...@gmail.com>
Date: Sun Feb 26 23:24:14 2017 +0300
ignite-1.9 - merge index test
commit 4ad048e248157d799a325b3ce9975d4ad8a9fb49
Author: Sergi Vladykin <se...@gmail.com>
Date: Sun Feb 26 23:19:49 2017 +0300
ignite-1.9 - merge index
commit 4ea63d7335000b8f30bfbd1bb907e411cd62a5e8
Author: Sergi Vladykin <se...@gmail.com>
Date: Sun Feb 26 22:44:51 2017 +0300
ignite-1.9 - unsorted index fixed
commit a639bff6f25a8397e49a892f830c9de23c847127
Author: Sergi Vladykin <se...@gmail.com>
Date: Sun Feb 26 20:08:26 2017 +0300
ignite-1.9 - sorted index fixes2
commit ee9d524f5a0d6f1c416345822e8201c327f1e562
Author: Sergi Vladykin <se...@gmail.com>
Date: Fri Feb 24 16:00:26 2017 +0300
ignite-1.9 - sorted index fixes
commit fc42406a9e55851d53d9dfed8e6cf3c8b12af345
Author: Sergi Vladykin <se...@gmail.com>
Date: Thu Feb 23 16:46:39 2017 +0300
ignite-1.9 - sorted index
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8817190e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8817190e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8817190e
Branch: refs/heads/master
Commit: 8817190e1dd31d869682df0167bb3e82fb597aad
Parents: 8362fe7
Author: Sergi Vladykin <se...@gmail.com>
Authored: Thu Mar 9 23:30:09 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Thu Mar 9 23:30:09 2017 +0300
----------------------------------------------------------------------
.../cache/query/GridCacheSqlQuery.java | 82 ++++-
.../processors/query/h2/IgniteH2Indexing.java | 4 +-
.../query/h2/opt/GridH2CollocationModel.java | 6 +-
.../query/h2/opt/GridH2ScanIndex.java | 273 +++++++++++++++++
.../processors/query/h2/opt/GridH2Table.java | 244 +--------------
.../processors/query/h2/sql/GridSqlQuery.java | 17 --
.../query/h2/sql/GridSqlQueryParser.java | 4 +-
.../query/h2/sql/GridSqlQuerySplitter.java | 38 ++-
.../query/h2/sql/GridSqlSortColumn.java | 41 +++
.../query/h2/twostep/GridMapQueryExecutor.java | 83 +++--
.../query/h2/twostep/GridMergeIndex.java | 300 +++++++++++--------
.../query/h2/twostep/GridMergeIndexSorted.java | 172 ++++++++---
.../h2/twostep/GridMergeIndexUnsorted.java | 67 ++++-
.../query/h2/twostep/GridMergeTable.java | 70 ++++-
.../h2/twostep/GridReduceQueryExecutor.java | 101 +++++--
.../query/h2/twostep/GridResultPage.java | 34 ++-
.../h2/twostep/msg/GridH2QueryRequest.java | 11 +-
.../cache/IgniteCacheAbstractQuerySelfTest.java | 10 +-
.../query/IgniteSqlSplitterSelfTest.java | 100 ++++++-
.../query/h2/sql/H2CompareBigQueryTest.java | 4 +-
.../IgniteCacheQuerySelfTestSuite.java | 2 +
.../processors/query/h2/sql/bigQuery.sql | 34 ++-
22 files changed, 1138 insertions(+), 559 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index 18688b7..c4bb205 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.cache.query;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridDirectTransient;
@@ -74,6 +76,19 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
/** Field kept for backward compatibility. */
private String alias;
+ /** Sort columns. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private transient List<?> sort;
+
+ /** If we have partitioned tables in this query. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private transient boolean partitioned;
+
+ /** Single node to execute the query on. */
+ private UUID node;
+
/**
* For {@link Message}.
*/
@@ -218,12 +233,18 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
writer.incrementState();
case 1:
- if (!writer.writeByteArray("paramsBytes", paramsBytes))
+ if (!writer.writeUuid("node", node))
return false;
writer.incrementState();
case 2:
+ if (!writer.writeByteArray("paramsBytes", paramsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
if (!writer.writeString("qry", qry))
return false;
@@ -251,7 +272,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 1:
- paramsBytes = reader.readByteArray("paramsBytes");
+ node = reader.readUuid("node");
if (!reader.isLastRead())
return false;
@@ -259,6 +280,14 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 2:
+ paramsBytes = reader.readByteArray("paramsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
qry = reader.readString("qry");
if (!reader.isLastRead())
@@ -278,7 +307,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 3;
+ return 4;
}
/**
@@ -292,6 +321,8 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
cp.cols = cols;
cp.paramIdxs = paramIdxs;
cp.paramsSize = paramsSize;
+ cp.sort = sort;
+ cp.partitioned = partitioned;
if (F.isEmpty(args))
cp.params = EMPTY_PARAMS;
@@ -304,4 +335,49 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
return cp;
}
+
+ /**
+ * @param sort Sort columns.
+ */
+ public void sortColumns(List<?> sort) {
+ this.sort = sort;
+ }
+
+ /**
+ * @return Sort columns.
+ */
+ public List<?> sortColumns() {
+ return sort;
+ }
+
+ /**
+ * @param partitioned If the query contains partitioned tables.
+ */
+ public void partitioned(boolean partitioned) {
+ this.partitioned = partitioned;
+ }
+
+ /**
+ * @return {@code true} If the query contains partitioned tables.
+ */
+ public boolean isPartitioned() {
+ return partitioned;
+ }
+
+ /**
+ * @return Single node to execute the query on or {@code null} if need to execute on all the nodes.
+ */
+ public UUID node() {
+ return node;
+ }
+
+ /**
+ * @param node Single node to execute the query on or {@code null} if need to execute on all the nodes.
+ * @return {@code this}.
+ */
+ public GridCacheSqlQuery node(UUID node) {
+ this.node = node;
+
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index b4bf608..8de8dc4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -85,7 +85,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -155,7 +154,6 @@ import org.h2.result.SortOrder;
import org.h2.server.web.WebServer;
import org.h2.table.Column;
import org.h2.table.IndexColumn;
-import org.h2.table.Table;
import org.h2.tools.Server;
import org.h2.util.JdbcUtils;
import org.h2.value.DataType;
@@ -1453,7 +1451,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
- Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt);
+ Prepared prepared = GridSqlQueryParser.prepared(stmt);
if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery) qry).isQuery() != prepared.isQuery())
throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver",
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
index ce11fd5..4df355e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
@@ -300,10 +300,10 @@ public final class GridH2CollocationModel {
assert childFilters == null;
// We are at table instance.
- GridH2Table tbl = (GridH2Table)filter().getTable();
+ Table tbl = filter().getTable();
// Only partitioned tables will do distributed joins.
- if (!tbl.isPartitioned()) {
+ if (!(tbl instanceof GridH2Table) || !((GridH2Table)tbl).isPartitioned()) {
type = Type.REPLICATED;
multiplier = MULTIPLIER_COLLOCATED;
@@ -593,7 +593,7 @@ public final class GridH2CollocationModel {
private GridH2CollocationModel child(int i, boolean create) {
GridH2CollocationModel child = children[i];
- if (child == null && create && isChildTableOrView(i, null)) {
+ if (child == null && create) {
TableFilter f = childFilters[i];
if (f.getTable().isView()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java
new file mode 100644
index 0000000..3ddd490
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import java.util.ArrayList;
+import org.h2.engine.Database;
+import org.h2.engine.DbObject;
+import org.h2.engine.Session;
+import org.h2.index.BaseIndex;
+import org.h2.index.Cursor;
+import org.h2.index.IndexLookupBatch;
+import org.h2.index.IndexType;
+import org.h2.message.DbException;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+import org.h2.schema.Schema;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.Table;
+import org.h2.table.TableFilter;
+
+/**
+ * Scan index base class.
+ */
+public abstract class GridH2ScanIndex<D extends BaseIndex> extends BaseIndex {
+ /** */
+ private static final IndexType TYPE = IndexType.createScan(false);
+
+ /** */
+ protected final D delegate;
+
+ /**
+ * @param delegate Delegate.
+ */
+ public GridH2ScanIndex(D delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDiskSpaceUsed() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void add(Session ses, Row row) {
+ delegate.add(ses, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canFindNext() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canGetFirstOrLast() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canScan() {
+ return delegate.canScan();
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void close(Session ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commit(int operation, Row row) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareRows(SearchRow rowData, SearchRow compare) {
+ return delegate.compareRows(rowData, compare);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) {
+ return find(filter.getSession(), first, last);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
+ return delegate.find(ses, null, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor findFirstOrLast(Session ses, boolean first) {
+ throw DbException.getUnsupportedException("SCAN");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor findNext(Session ses, SearchRow higherThan, SearchRow last) {
+ throw DbException.throwInternalError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getColumnIndex(Column col) {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Column[] getColumns() {
+ return delegate.getColumns();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexColumn[] getIndexColumns() {
+ return delegate.getIndexColumns();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexType getIndexType() {
+ return TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row getRow(Session ses, long key) {
+ return delegate.getRow(ses, key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCount(Session ses) {
+ return delegate.getRowCount(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCountApproximation() {
+ return delegate.getRowCountApproximation();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table getTable() {
+ return delegate.getTable();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isRowIdIndex() {
+ return delegate.isRowIdIndex();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needRebuild() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(Session ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(Session ses, Row row) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setSortedInsertMode(boolean sortedInsertMode) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
+ return delegate.createLookupBatch(filter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void truncate(Session ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Schema getSchema() {
+ return delegate.getSchema();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isHidden() {
+ return delegate.isHidden();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkRename() {
+ throw DbException.getUnsupportedException("rename");
+ }
+
+ /** {@inheritDoc} */
+ @Override public ArrayList<DbObject> getChildren() {
+ return delegate.getChildren();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getComment() {
+ return delegate.getComment();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getCreateSQL() {
+ return null; // Scan should return null.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getCreateSQLForCopy(Table tbl, String quotedName) {
+ return delegate.getCreateSQLForCopy(tbl, quotedName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Database getDatabase() {
+ return delegate.getDatabase();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDropSQL() {
+ return delegate.getDropSQL();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getId() {
+ return delegate.getId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSQL() {
+ return delegate.getSQL();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getType() {
+ return delegate.getType();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isTemporary() {
+ return delegate.isTemporary();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeChildrenAndResources(Session ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rename(String newName) {
+ throw DbException.getUnsupportedException("rename");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setComment(String comment) {
+ throw DbException.getUnsupportedException("comment");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setTemporary(boolean temporary) {
+ throw DbException.getUnsupportedException("temporary");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 8d080ae..4d5ea4b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -34,22 +34,14 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.h2.api.TableEngine;
import org.h2.command.ddl.CreateTableData;
-import org.h2.engine.Database;
-import org.h2.engine.DbObject;
import org.h2.engine.Session;
-import org.h2.index.BaseIndex;
-import org.h2.index.Cursor;
import org.h2.index.Index;
-import org.h2.index.IndexLookupBatch;
import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.result.SortOrder;
-import org.h2.schema.Schema;
-import org.h2.table.Column;
import org.h2.table.IndexColumn;
-import org.h2.table.Table;
import org.h2.table.TableBase;
import org.h2.table.TableFilter;
import org.h2.value.Value;
@@ -857,93 +849,15 @@ public class GridH2Table extends TableBase {
* Wrapper type for primary key.
*/
@SuppressWarnings("PackageVisibleInnerClass")
- static class ScanIndex extends BaseIndex {
+ static class ScanIndex extends GridH2ScanIndex<GridH2IndexBase> {
/** */
static final String SCAN_INDEX_NAME_SUFFIX = "__SCAN_";
- /** */
- private static final IndexType TYPE = IndexType.createScan(false);
-
- /** */
- private final GridH2IndexBase delegate;
-
/**
- * Constructor.
- *
- * @param delegate Index delegate to.
+ * @param delegate Delegate.
*/
- private ScanIndex(GridH2IndexBase delegate) {
- this.delegate = delegate;
- }
-
- /** {@inheritDoc} */
- @Override public long getDiskSpaceUsed() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void add(Session ses, Row row) {
- delegate.add(ses, row);
- }
-
- /** {@inheritDoc} */
- @Override public boolean canFindNext() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean canGetFirstOrLast() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean canScan() {
- return delegate.canScan();
- }
-
- /** {@inheritDoc} */
- @Override public final void close(Session ses) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void commit(int operation, Row row) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public int compareRows(SearchRow rowData, SearchRow compare) {
- return delegate.compareRows(rowData, compare);
- }
-
- /** {@inheritDoc} */
- @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) {
- return find(filter.getSession(), first, last);
- }
-
- /** {@inheritDoc} */
- @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
- return delegate.find(ses, null, null);
- }
-
- /** {@inheritDoc} */
- @Override public Cursor findFirstOrLast(Session ses, boolean first) {
- throw DbException.getUnsupportedException("SCAN");
- }
-
- /** {@inheritDoc} */
- @Override public Cursor findNext(Session ses, SearchRow higherThan, SearchRow last) {
- throw DbException.throwInternalError();
- }
-
- /** {@inheritDoc} */
- @Override public int getColumnIndex(Column col) {
- return -1;
- }
-
- /** {@inheritDoc} */
- @Override public Column[] getColumns() {
- return delegate.getColumns();
+ public ScanIndex(GridH2IndexBase delegate) {
+ super(delegate);
}
/** {@inheritDoc} */
@@ -957,163 +871,13 @@ public class GridH2Table extends TableBase {
}
/** {@inheritDoc} */
- @Override public IndexColumn[] getIndexColumns() {
- return delegate.getIndexColumns();
- }
-
- /** {@inheritDoc} */
- @Override public IndexType getIndexType() {
- return TYPE;
- }
-
- /** {@inheritDoc} */
@Override public String getPlanSQL() {
return delegate.getTable().getSQL() + "." + SCAN_INDEX_NAME_SUFFIX;
}
/** {@inheritDoc} */
- @Override public Row getRow(Session ses, long key) {
- return delegate.getRow(ses, key);
- }
-
- /** {@inheritDoc} */
- @Override public long getRowCount(Session ses) {
- return delegate.getRowCount(ses);
- }
-
- /** {@inheritDoc} */
- @Override public long getRowCountApproximation() {
- return delegate.getRowCountApproximation();
- }
-
- /** {@inheritDoc} */
- @Override public Table getTable() {
- return delegate.getTable();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isRowIdIndex() {
- return delegate.isRowIdIndex();
- }
-
- /** {@inheritDoc} */
- @Override public boolean needRebuild() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void remove(Session ses) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void remove(Session ses, Row row) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void setSortedInsertMode(boolean sortedInsertMode) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
- return delegate.createLookupBatch(filter);
- }
-
- /** {@inheritDoc} */
- @Override public void truncate(Session ses) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public Schema getSchema() {
- return delegate.getSchema();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isHidden() {
- return delegate.isHidden();
- }
-
- /** {@inheritDoc} */
- @Override public void checkRename() {
- throw DbException.getUnsupportedException("rename");
- }
-
- /** {@inheritDoc} */
- @Override public ArrayList<DbObject> getChildren() {
- return delegate.getChildren();
- }
-
- /** {@inheritDoc} */
- @Override public String getComment() {
- return delegate.getComment();
- }
-
- /** {@inheritDoc} */
- @Override public String getCreateSQL() {
- return null; // Scan should return null.
- }
-
- /** {@inheritDoc} */
- @Override public String getCreateSQLForCopy(Table tbl, String quotedName) {
- return delegate.getCreateSQLForCopy(tbl, quotedName);
- }
-
- /** {@inheritDoc} */
- @Override public Database getDatabase() {
- return delegate.getDatabase();
- }
-
- /** {@inheritDoc} */
- @Override public String getDropSQL() {
- return delegate.getDropSQL();
- }
-
- /** {@inheritDoc} */
- @Override public int getId() {
- return delegate.getId();
- }
-
- /** {@inheritDoc} */
@Override public String getName() {
return delegate.getName() + SCAN_INDEX_NAME_SUFFIX;
}
-
- /** {@inheritDoc} */
- @Override public String getSQL() {
- return delegate.getSQL();
- }
-
- /** {@inheritDoc} */
- @Override public int getType() {
- return delegate.getType();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isTemporary() {
- return delegate.isTemporary();
- }
-
- /** {@inheritDoc} */
- @Override public void removeChildrenAndResources(Session ses) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void rename(String newName) {
- throw DbException.getUnsupportedException("rename");
- }
-
- /** {@inheritDoc} */
- @Override public void setComment(String comment) {
- throw DbException.getUnsupportedException("comment");
- }
-
- /** {@inheritDoc} */
- @Override public void setTemporary(boolean temporary) {
- throw DbException.getUnsupportedException("temporary");
- }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
index 7d4b7f0..9511866 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
@@ -38,9 +38,6 @@ public abstract class GridSqlQuery extends GridSqlStatement implements GridSqlAs
/** */
private GridSqlAst offset;
- /** */
- private boolean distinct;
-
/**
* @return Offset.
*/
@@ -56,20 +53,6 @@ public abstract class GridSqlQuery extends GridSqlStatement implements GridSqlAs
}
/**
- * @return Distinct.
- */
- public boolean distinct() {
- return distinct;
- }
-
- /**
- * @param distinct New distinct.
- */
- public void distinct(boolean distinct) {
- this.distinct = distinct;
- }
-
- /**
* @return Sort.
*/
public List<GridSqlSortColumn> sort() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 024529c..16d7105 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -415,7 +415,7 @@ public class GridSqlQueryParser {
res.distinct(select.isDistinct());
Expression where = CONDITION.get(select);
- res.where(parseExpression(where, false));
+ res.where(parseExpression(where, true));
ArrayList<TableFilter> tableFilters = new ArrayList<>();
@@ -447,7 +447,7 @@ public class GridSqlQueryParser {
GridSqlElement gridFilter = parseTableFilter(f);
from = from == null ? gridFilter : new GridSqlJoin(from, gridFilter, f.isJoinOuter(),
- parseExpression(f.getJoinCondition(), false));
+ parseExpression(f.getJoinCondition(), true));
}
res.from(from);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 277cabc..aec0b36 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -207,7 +207,6 @@ public class GridSqlQuerySplitter {
// If we have distributed joins, then we have to optimize all MAP side queries
// to have a correct join order with respect to batched joins and check if we need
// distributed joins at all.
- // TODO Also we need to have a list of table aliases to filter by primary or explicit partitions.
if (distributedJoins) {
boolean allCollocated = true;
@@ -220,7 +219,7 @@ public class GridSqlQuerySplitter {
mapSqlQry.query(parse(prepared, true).getSQL());
}
- // We do not need distributed joins if all MAP queries are colocated.
+ // We do not need distributed joins if all MAP queries are collocated.
if (allCollocated)
distributedJoins = false;
}
@@ -861,6 +860,7 @@ public class GridSqlQuerySplitter {
if (!tblAliases.contains(tblAlias))
return;
+ GridSqlType resType = col.resultType();
String uniqueColAlias = uniqueColumnAlias(col);
GridSqlAlias colAlias = cols.get(uniqueColAlias);
@@ -874,6 +874,7 @@ public class GridSqlQuerySplitter {
col = column(uniqueColAlias);
// col.tableAlias(wrapAlias.alias());
col.expressionInFrom(wrapAlias);
+ col.resultType(resType);
prnt.child(childIdx, col);
}
@@ -1066,7 +1067,7 @@ public class GridSqlQuerySplitter {
else if (qrym.type == Type.UNION) {
// If it is not a UNION ALL, then we have to split because otherwise we can produce duplicates or
// wrong results for UNION DISTINCT, EXCEPT, INTERSECT queries.
- if (!qrym.needSplitChild && !qrym.unionAll)
+ if (!qrym.needSplitChild && (!qrym.unionAll || hasOffsetLimit(qrym.<GridSqlUnion>ast())))
qrym.needSplitChild = true;
// If we have to split some child SELECT in this UNION, then we have to enforce split
@@ -1151,6 +1152,14 @@ public class GridSqlQuerySplitter {
}
/**
+ * @param qry Query.
+ * @return {@code true} If we have OFFSET LIMIT.
+ */
+ private static boolean hasOffsetLimit(GridSqlQuery qry) {
+ return qry.limit() != null || qry.offset() != null;
+ }
+
+ /**
* @param select Select to check.
* @return {@code true} If we need to split this select.
*/
@@ -1158,6 +1167,9 @@ public class GridSqlQuerySplitter {
if (select.distinct())
return true;
+ if (hasOffsetLimit(select))
+ return true;
+
if (collocatedGrpBy)
return false;
@@ -1304,11 +1316,29 @@ public class GridSqlQuerySplitter {
setupParameters(map, mapQry, params);
map.columns(collectColumns(mapExps));
+ map.sortColumns(mapQry.sort());
+ map.partitioned(hasPartitionedTables(mapQry));
mapSqlQrys.add(map);
}
/**
+ * @param ast Map query AST.
+ * @return {@code true} If the given AST has partitioned tables.
+ */
+ private static boolean hasPartitionedTables(GridSqlAst ast) {
+ if (ast instanceof GridSqlTable)
+ return ((GridSqlTable)ast).dataTable().isPartitioned();
+
+ for (int i = 0; i < ast.size(); i++) {
+ if (hasPartitionedTables(ast.child(i)))
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* @param sqlQry Query.
* @param qryAst Select AST.
* @param params All parameters.
@@ -1333,7 +1363,7 @@ public class GridSqlQuerySplitter {
GridSqlType t = col.resultType();
if (t == null)
- throw new NullPointerException("Column type.");
+ throw new NullPointerException("Column type: " + col);
if (t == GridSqlType.UNKNOWN)
throw new IllegalStateException("Unknown type: " + col);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
index 8e8947f..d870ac5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
@@ -17,6 +17,13 @@
package org.apache.ignite.internal.processors.query.h2.sql;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.F;
+import org.h2.result.SortOrder;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.Table;
+
/**
* Sort order for ORDER BY clause.
*/
@@ -47,6 +54,40 @@ public class GridSqlSortColumn {
}
/**
+ * @param tbl Table.
+ * @param sortCols Sort columns.
+ * @return Index columns.
+ */
+ public static IndexColumn[] toIndexColumns(Table tbl, List<GridSqlSortColumn> sortCols) {
+ assert !F.isEmpty(sortCols);
+
+ IndexColumn[] res = new IndexColumn[sortCols.size()];
+
+ for (int i = 0; i < res.length; i++) {
+ GridSqlSortColumn sc = sortCols.get(i);
+
+ Column col = tbl.getColumn(sc.column());
+
+ IndexColumn c = new IndexColumn();
+
+ c.column = col;
+ c.columnName = col.getName();
+
+ c.sortType = sc.asc ? SortOrder.ASCENDING : SortOrder.DESCENDING;
+
+ if (sc.nullsFirst)
+ c.sortType |= SortOrder.NULLS_FIRST;
+
+ if (sc.nullsLast)
+ c.sortType |= SortOrder.NULLS_LAST;
+
+ res[i] = c;
+ }
+
+ return res;
+ }
+
+ /**
* @return Column index.
*/
public int column() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index f002a5e..6416b21 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -459,8 +459,11 @@ public class GridMapQueryExecutor {
req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
+ final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
- for (int i = 1; i < mainCctx.config().getQueryParallelism(); i++) {
+ int segments = explain ? 1 : mainCctx.config().getQueryParallelism();
+
+ for (int i = 1; i < segments; i++) {
final int segment = i;
ctx.closure().callLocal(
@@ -587,7 +590,6 @@ public class GridMapQueryExecutor {
Connection conn = h2.connectionForSpace(mainCctx.name());
- // Here we enforce join order to have the same behavior on all the nodes.
setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
GridH2QueryContext.set(qctx);
@@ -610,28 +612,34 @@ public class GridMapQueryExecutor {
boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
for (GridCacheSqlQuery qry : qrys) {
- ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
- F.asList(qry.parameters()), true,
- timeout,
- qr.cancels[qryIdx]);
-
- if (evt) {
- ctx.event().record(new CacheQueryExecutedEvent<>(
- node,
- "SQL query executed.",
- EVT_CACHE_QUERY_EXECUTED,
- CacheQueryType.SQL.name(),
- mainCctx.namex(),
- null,
- qry.query(),
- null,
- null,
- qry.parameters(),
- node.id(),
- null));
- }
+ ResultSet rs = null;
+
+ // If we are not the target node for this replicated query, just ignore it.
+ if (qry.node() == null ||
+ (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
+ rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
+ F.asList(qry.parameters()), true,
+ timeout,
+ qr.cancels[qryIdx]);
+
+ if (evt) {
+ ctx.event().record(new CacheQueryExecutedEvent<>(
+ node,
+ "SQL query executed.",
+ EVT_CACHE_QUERY_EXECUTED,
+ CacheQueryType.SQL.name(),
+ mainCctx.namex(),
+ null,
+ qry.query(),
+ null,
+ null,
+ qry.parameters(),
+ node.id(),
+ null));
+ }
- assert rs instanceof JdbcResultSet : rs.getClass();
+ assert rs instanceof JdbcResultSet : rs.getClass();
+ }
qr.addResult(qryIdx, qry, node.id(), rs);
@@ -751,6 +759,9 @@ public class GridMapQueryExecutor {
assert res != null;
+ if (res.closed)
+ return;
+
int page = res.page;
List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize));
@@ -1081,21 +1092,31 @@ public class GridMapQueryExecutor {
* @param qry Query.
*/
private QueryResult(ResultSet rs, GridCacheContext<?, ?> cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry) {
- this.rs = rs;
this.cctx = cctx;
this.qry = qry;
this.qrySrcNodeId = qrySrcNodeId;
this.cpNeeded = cctx.isLocalNode(qrySrcNodeId);
- try {
- res = (ResultInterface)RESULT_FIELD.get(rs);
- }
- catch (IllegalAccessException e) {
- throw new IllegalStateException(e); // Must not happen.
+ if (rs != null) {
+ this.rs = rs;
+ try {
+ res = (ResultInterface)RESULT_FIELD.get(rs);
+ }
+ catch (IllegalAccessException e) {
+ throw new IllegalStateException(e); // Must not happen.
+ }
+
+ rowCnt = res.getRowCount();
+ cols = res.getVisibleColumnCount();
}
+ else {
+ this.rs = null;
+ this.res = null;
+ this.cols = -1;
+ this.rowCnt = -1;
- rowCnt = res.getRowCount();
- cols = res.getVisibleColumnCount();
+ closed = true;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 6a6e045..27622bb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -22,20 +22,22 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.RandomAccess;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.CacheException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.engine.Session;
import org.h2.index.BaseIndex;
@@ -44,13 +46,10 @@ import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
-import org.h2.result.SortOrder;
import org.h2.table.IndexColumn;
-import org.h2.table.TableFilter;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
-import static java.util.Collections.emptyIterator;
import static java.util.Objects.requireNonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE;
@@ -67,6 +66,22 @@ public abstract class GridMergeIndex extends BaseIndex {
private static final int PREFETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE, 1024);
/** */
+ private static final AtomicReferenceFieldUpdater<GridMergeIndex, ConcurrentMap> lastPagesUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(GridMergeIndex.class, ConcurrentMap.class, "lastPages");
+
+ static {
+ if (!U.isPow2(PREFETCH_SIZE)) {
+ throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
+ ") must be positive and a power of 2.");
+ }
+
+ if (PREFETCH_SIZE >= MAX_FETCH_SIZE) {
+ throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
+ ") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ").");
+ }
+ }
+
+ /** */
protected final Comparator<SearchRow> firstRowCmp = new Comparator<SearchRow>() {
@Override public int compare(SearchRow rowInList, SearchRow searchRow) {
int res = compareRows(rowInList, searchRow);
@@ -84,14 +99,11 @@ public abstract class GridMergeIndex extends BaseIndex {
}
};
- /** All rows number. */
- private final AtomicInteger expRowsCnt = new AtomicInteger(0);
-
- /** Remaining rows per source node ID. */
- private Map<UUID, Counter[]> remainingRows;
+ /** Row source nodes. */
+ private Set<UUID> sources;
/** */
- private final AtomicBoolean lastSubmitted = new AtomicBoolean();
+ private int pageSize;
/**
* Will be r/w from query execution thread only, does not need to be threadsafe.
@@ -107,6 +119,9 @@ public abstract class GridMergeIndex extends BaseIndex {
/** */
private final GridKernalContext ctx;
+ /** */
+ private volatile ConcurrentMap<SourceKey, Integer> lastPages;
+
/**
* @param ctx Context.
* @param tbl Table.
@@ -129,16 +144,6 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param ctx Context.
*/
protected GridMergeIndex(GridKernalContext ctx) {
- if (!U.isPow2(PREFETCH_SIZE)) {
- throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
- ") must be positive and a power of 2.");
- }
-
- if (PREFETCH_SIZE >= MAX_FETCH_SIZE) {
- throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
- ") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ").");
- }
-
this.ctx = ctx;
fetched = new BlockList<>(PREFETCH_SIZE);
@@ -148,7 +153,7 @@ public abstract class GridMergeIndex extends BaseIndex {
* @return Return source nodes for this merge index.
*/
public Set<UUID> sources() {
- return remainingRows.keySet();
+ return sources;
}
/**
@@ -169,17 +174,24 @@ public abstract class GridMergeIndex extends BaseIndex {
* @return {@code true} If this index needs data from the given source node.
*/
public boolean hasSource(UUID nodeId) {
- return remainingRows.containsKey(nodeId);
+ return sources.contains(nodeId);
}
/** {@inheritDoc} */
@Override public long getRowCount(Session ses) {
- return expRowsCnt.get();
+ Cursor c = find(ses, null, null);
+
+ long cnt = 0;
+
+ while (c.next())
+ cnt++;
+
+ return cnt;
}
/** {@inheritDoc} */
@Override public long getRowCountApproximation() {
- return getRowCount(null);
+ return 10_000;
}
/**
@@ -189,27 +201,28 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param segmentsCnt Index segments per table.
*/
public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
- assert remainingRows == null;
+ assert sources == null;
- remainingRows = U.newHashMap(nodes.size());
+ sources = new HashSet<>();
for (ClusterNode node : nodes) {
- Counter[] counters = new Counter[segmentsCnt];
-
- for (int i = 0; i < segmentsCnt; i++)
- counters[i] = new Counter();
-
- if (remainingRows.put(node.id(), counters) != null)
- throw new IllegalStateException("Duplicate node id: " + node.id());
-
+ if (!sources.add(node.id()))
+ throw new IllegalStateException();
}
}
/**
+ * @param pageSize Page size.
+ */
+ public void setPageSize(int pageSize) {
+ this.pageSize = pageSize;
+ }
+
+ /**
* @param queue Queue to poll.
* @return Next page.
*/
- private GridResultPage takeNextPage(BlockingQueue<GridResultPage> queue) {
+ private GridResultPage takeNextPage(Pollable<GridResultPage> queue) {
GridResultPage page;
for (;;) {
@@ -234,16 +247,17 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param iter Current iterator.
* @return The same or new iterator.
*/
- protected final Iterator<Value[]> pollNextIterator(BlockingQueue<GridResultPage> queue, Iterator<Value[]> iter) {
- while (!iter.hasNext()) {
+ protected final Iterator<Value[]> pollNextIterator(Pollable<GridResultPage> queue, Iterator<Value[]> iter) {
+ if (!iter.hasNext()) {
GridResultPage page = takeNextPage(queue);
- if (page.isLast())
- return emptyIterator(); // We are done.
-
- fetchNextPage(page);
+ if (!page.isLast())
+ page.fetchNextPage(); // Failed will throw an exception here.
iter = page.rows();
+
+ // The received iterator must be empty in the dummy last page or on failure.
+ assert iter.hasNext() || page.isDummyLast() || page.isFail();
}
return iter;
@@ -253,23 +267,18 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param e Error.
*/
public void fail(final CacheException e) {
- for (UUID nodeId0 : remainingRows.keySet()) {
- addPage0(new GridResultPage(null, nodeId0, null) {
- @Override public boolean isFail() {
- return true;
- }
-
- @Override public void fetchNextPage() {
- throw e;
- }
- });
- }
+ for (UUID nodeId : sources)
+ fail(nodeId, e);
}
/**
* @param nodeId Node ID.
+ * @param e Exception.
*/
public void fail(UUID nodeId, final CacheException e) {
+ if (nodeId == null)
+ nodeId = F.first(sources);
+
addPage0(new GridResultPage(null, nodeId, null) {
@Override public boolean isFail() {
return true;
@@ -285,91 +294,88 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/**
- * @param page Page.
+ * @param nodeId Node ID.
+ * @param res Response.
*/
- public final void addPage(GridResultPage page) {
- int pageRowsCnt = page.rowsInPage();
+ private void initLastPages(UUID nodeId, GridQueryNextPageResponse res) {
+ int allRows = res.allRows();
- Counter cnt = remainingRows.get(page.source())[page.res.segmentId()];
+ // If the old protocol we send all rows number in the page 0, other pages have -1.
+ // In the new protocol we do not know it and always have -1, except terminating page,
+ // which has -2. Thus we have to init page counters only when we receive positive value
+ // in the first page.
+ if (allRows < 0 || res.page() != 0)
+ return;
- // RemainingRowsCount should be updated before page adding to avoid race
- // in GridMergeIndexUnsorted cursor iterator
- int remainingRowsCount;
+ ConcurrentMap<SourceKey,Integer> lp = lastPages;
- int allRows = page.response().allRows();
+ if (lp == null && !lastPagesUpdater.compareAndSet(this, null, lp = new ConcurrentHashMap<>()))
+ lp = lastPages;
- if (allRows != -1) { // Only the first page contains allRows count and is allowed to init counter.
- assert cnt.state == State.UNINITIALIZED : "Counter is already initialized.";
+ assert pageSize > 0: pageSize;
- remainingRowsCount = cnt.addAndGet(allRows - pageRowsCnt);
+ int lastPage = allRows == 0 ? 0 : (allRows - 1) / pageSize;
- expRowsCnt.addAndGet(allRows);
+ assert lastPage >= 0: lastPage;
- // Add page before setting initialized flag to avoid race condition with adding last page
- if (pageRowsCnt > 0)
- addPage0(page);
+ if (lp.put(new SourceKey(nodeId, res.segmentId()), lastPage) != null)
+ throw new IllegalStateException();
+ }
- // We need this separate flag to handle case when the first source contains only one page
- // and it will signal that all remaining counters are zero and fetch is finished.
- cnt.state = State.INITIALIZED;
- }
- else {
- remainingRowsCount = cnt.addAndGet(-pageRowsCnt);
+ /**
+ * @param page Page.
+ */
+ private void markLastPage(GridResultPage page) {
+ GridQueryNextPageResponse res = page.response();
- if (pageRowsCnt > 0)
- addPage0(page);
- }
+ if (res.allRows() != -2) { // -2 means the last page.
+ UUID nodeId = page.source();
- if (remainingRowsCount == 0) { // Result can be negative in case of race between messages, it is ok.
- if (cnt.state == State.UNINITIALIZED)
- return;
+ initLastPages(nodeId, res);
- // Guarantee that finished state possible only if counter is zero and all pages was added
- cnt.state = State.FINISHED;
+ ConcurrentMap<SourceKey,Integer> lp = lastPages;
- for (Counter[] cntrs : remainingRows.values()) { // Check all the sources.
- for(int i = 0; i < cntrs.length; i++) {
- if (cntrs[i].state != State.FINISHED)
- return;
- }
- }
+ if (lp == null)
+ return; // It was not initialized --> wait for -2.
- if (lastSubmitted.compareAndSet(false, true)) {
- addPage0(new GridResultPage(null, page.source(), null) {
- @Override public boolean isLast() {
- return true;
- }
- });
+ Integer lastPage = lp.get(new SourceKey(nodeId, res.segmentId()));
+
+ if (lastPage == null)
+ return; // This node may use the new protocol --> wait for -2.
+
+ if (lastPage != res.page()) {
+ assert lastPage > res.page();
+
+ return; // This is not the last page.
}
}
+
+ page.setLast(true);
}
/**
* @param page Page.
*/
- protected abstract void addPage0(GridResultPage page);
+ public final void addPage(GridResultPage page) {
+ markLastPage(page);
+ addPage0(page);
+ }
/**
- * @param page Page.
+ * @param lastPage Real last page.
+ * @return Created dummy page.
*/
- protected void fetchNextPage(GridResultPage page) {
- assert !page.isLast();
-
- if(page.isFail())
- page.fetchNextPage(); //rethrow exceptions
-
- assert page.res != null;
-
- Counter[] counters = remainingRows.get(page.source());
+ protected final GridResultPage createDummyLastPage(GridResultPage lastPage) {
+ assert !lastPage.isDummyLast(); // It must be a real last page.
- int segId = page.res.segmentId();
-
- Counter counter = counters[segId];
-
- if (counter.get() != 0)
- page.fetchNextPage();
+ return new GridResultPage(ctx, lastPage.source(), null).setLast(true);
}
+ /**
+ * @param page Page.
+ */
+ protected abstract void addPage0(GridResultPage page);
+
/** {@inheritDoc} */
@Override public final Cursor find(Session ses, SearchRow first, SearchRow last) {
checkBounds(lastEvictedRow, first, last);
@@ -381,11 +387,9 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/**
- * @return {@code true} If we have fetched all the remote rows.
+ * @return {@code true} If we have fetched all the remote rows into a fetched list.
*/
- public boolean fetchedAll() {
- return fetchedCnt == expRowsCnt.get();
- }
+ public abstract boolean fetchedAll();
/**
* @param lastEvictedRow Last evicted fetched row.
@@ -433,11 +437,6 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
- return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true);
- }
-
- /** {@inheritDoc} */
@Override public void remove(Session ses) {
throw DbException.getUnsupportedException("remove index");
}
@@ -683,14 +682,6 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/**
- * Counter with initialization flag.
- */
- private static class Counter extends AtomicInteger {
- /** */
- volatile State state = State.UNINITIALIZED;
- }
-
- /**
*/
private static final class BlockList<Z> extends AbstractList<Z> implements RandomAccess {
/** */
@@ -766,4 +757,53 @@ public abstract class GridMergeIndex extends BaseIndex {
return res;
}
}
+
+ /**
+ * Pollable.
+ */
+ protected static interface Pollable<E> {
+ /**
+ * @param timeout Timeout.
+ * @param unit Time unit.
+ * @return Polled value or {@code null} if none.
+ * @throws InterruptedException If interrupted.
+ */
+ E poll(long timeout, TimeUnit unit) throws InterruptedException;
+ }
+
+ /**
+ */
+ private static class SourceKey {
+ final UUID nodeId;
+
+ /** */
+ final int segment;
+
+ /**
+ * @param nodeId Node ID.
+ * @param segment Segment.
+ */
+ SourceKey(UUID nodeId, int segment) {
+ this.nodeId = nodeId;
+ this.segment = segment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ SourceKey sourceKey = (SourceKey)o;
+
+ if (segment != sourceKey.segment) return false;
+ return nodeId.equals(sourceKey.nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = nodeId.hashCode();
+ result = 31 * result + segment;
+ return result;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
index 32c676d..361bb2d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
@@ -25,18 +25,24 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.engine.Session;
import org.h2.index.Cursor;
import org.h2.index.IndexType;
import org.h2.result.Row;
import org.h2.result.SearchRow;
+import org.h2.result.SortOrder;
import org.h2.table.IndexColumn;
+import org.h2.table.TableFilter;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
@@ -48,6 +54,9 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase
*/
public final class GridMergeIndexSorted extends GridMergeIndex {
/** */
+ private static final IndexType TYPE = IndexType.createNonUnique(false);
+
+ /** */
private final Comparator<RowStream> streamCmp = new Comparator<RowStream>() {
@Override public int compare(RowStream o1, RowStream o2) {
// Nulls at the beginning.
@@ -62,26 +71,33 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
};
/** */
- private Map<UUID,RowStream> streamsMap;
+ private Map<UUID,RowStream[]> streamsMap;
/** */
- private RowStream[] streams;
+ private final Lock lock = new ReentrantLock();
+
+ /** */
+ private final Condition notEmpty = lock.newCondition();
+
+ /** */
+ private GridResultPage failPage;
+
+ /** */
+ private MergeStreamIterator it;
/**
* @param ctx Kernal context.
* @param tbl Table.
* @param name Index name,
- * @param type Index type.
* @param cols Columns.
*/
public GridMergeIndexSorted(
GridKernalContext ctx,
GridMergeTable tbl,
String name,
- IndexType type,
IndexColumn[] cols
) {
- super(ctx, tbl, name, type, cols);
+ super(ctx, tbl, name, TYPE, cols);
}
/** {@inheritDoc} */
@@ -89,33 +105,48 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
super.setSources(nodes, segmentsCnt);
streamsMap = U.newHashMap(nodes.size());
- streams = new RowStream[nodes.size()];
+ RowStream[] streams = new RowStream[nodes.size() * segmentsCnt];
int i = 0;
for (ClusterNode node : nodes) {
- RowStream stream = new RowStream(node.id());
+ RowStream[] segments = new RowStream[segmentsCnt];
- streams[i] = stream;
+ for (int s = 0; s < segmentsCnt; s++)
+ streams[i++] = segments[s] = new RowStream();
- if (streamsMap.put(stream.src, stream) != null)
+ if (streamsMap.put(node.id(), segments) != null)
throw new IllegalStateException();
}
+
+ it = new MergeStreamIterator(streams);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean fetchedAll() {
+ return it.fetchedAll();
}
/** {@inheritDoc} */
@Override protected void addPage0(GridResultPage page) {
- if (page.isLast() || page.isFail()) {
- // Finish all the streams.
- for (RowStream stream : streams)
- stream.addPage(page);
+ if (page.isFail()) {
+ lock.lock();
+
+ try {
+ if (failPage == null) {
+ failPage = page;
+
+ notEmpty.signalAll();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
}
else {
- assert page.rowsInPage() > 0;
-
UUID src = page.source();
- streamsMap.get(src).addPage(page);
+ streamsMap.get(src)[page.segmentId()].addPage(page);
}
}
@@ -153,8 +184,13 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
}
/** {@inheritDoc} */
+ @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
+ return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, false);
+ }
+
+ /** {@inheritDoc} */
@Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
- return new FetchingCursor(first, last, new MergeStreamIterator());
+ return new FetchingCursor(first, last, it);
}
/**
@@ -165,17 +201,42 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
private boolean first = true;
/** */
- private int off;
+ private volatile int off;
/** */
private boolean hasNext;
+ /** */
+ private final RowStream[] streams;
+
+ /**
+ * @param streams Streams.
+ */
+ MergeStreamIterator(RowStream[] streams) {
+ assert !F.isEmpty(streams);
+
+ this.streams = streams;
+ }
+
+ /**
+ * @return {@code true} If fetched all.
+ */
+ private boolean fetchedAll() {
+ return off == streams.length;
+ }
+
/**
*
*/
private void goFirst() {
+ assert first;
+
+ first = false;
+
for (int i = 0; i < streams.length; i++) {
- if (!streams[i].next()) {
+ RowStream s = streams[i];
+
+ if (!s.next()) {
streams[i] = null;
off++; // Move left bound.
}
@@ -183,8 +244,6 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
if (off < streams.length)
Arrays.sort(streams, streamCmp);
-
- first = false;
}
/**
@@ -229,31 +288,68 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
/**
* Row stream.
*/
- private final class RowStream {
- /** */
- final UUID src;
-
- /** */
- final BlockingQueue<GridResultPage> queue = new ArrayBlockingQueue<>(8);
-
+ private final class RowStream implements Pollable<GridResultPage> {
/** */
Iterator<Value[]> iter = emptyIterator();
/** */
Row cur;
- /**
- * @param src Source.
- */
- private RowStream(UUID src) {
- this.src = src;
- }
+ /** */
+ GridResultPage nextPage;
/**
* @param page Page.
*/
private void addPage(GridResultPage page) {
- queue.offer(page);
+ assert !page.isFail();
+
+ if (page.isLast() && page.rowsInPage() == 0)
+ page = createDummyLastPage(page); // Terminate.
+
+ lock.lock();
+
+ try {
+ // We can fetch the next page only when we have polled the previous one.
+ assert nextPage == null;
+
+ nextPage = page;
+
+ notEmpty.signalAll();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridResultPage poll(long timeout, TimeUnit unit) throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+
+ lock.lock();
+
+ try {
+ for (;;) {
+ if (failPage != null)
+ return failPage;
+
+ GridResultPage page = nextPage;
+
+ if (page != null) {
+ // isLast && !isDummyLast
+ nextPage = page.isLast() && page.response() != null
+ ? createDummyLastPage(page) : null; // Terminate with empty iterator.
+
+ return page;
+ }
+
+ if ((nanos = notEmpty.awaitNanos(nanos)) <= 0)
+ return null;
+ }
+ }
+ finally {
+ lock.unlock();
+ }
}
/**
@@ -262,7 +358,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
private boolean next() {
cur = null;
- iter = pollNextIterator(queue, iter);
+ iter = pollNextIterator(this, iter);
if (!iter.hasNext())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index b69c898..430a687 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -17,19 +17,24 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory;
+import org.h2.engine.Session;
import org.h2.index.Cursor;
import org.h2.index.IndexType;
import org.h2.result.Row;
import org.h2.result.SearchRow;
+import org.h2.result.SortOrder;
import org.h2.table.IndexColumn;
+import org.h2.table.TableFilter;
import org.h2.value.Value;
/**
@@ -37,7 +42,16 @@ import org.h2.value.Value;
*/
public final class GridMergeIndexUnsorted extends GridMergeIndex {
/** */
- private final BlockingQueue<GridResultPage> queue = new LinkedBlockingQueue<>();
+ private static final IndexType TYPE = IndexType.createScan(false);
+
+ /** */
+ private final PollableQueue<GridResultPage> queue = new PollableQueue<>();
+
+ /** */
+ private final AtomicInteger activeSources = new AtomicInteger(-1);
+
+ /** */
+ private Iterator<Value[]> iter = Collections.emptyIterator();
/**
* @param ctx Context.
@@ -45,7 +59,7 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
* @param name Index name.
*/
public GridMergeIndexUnsorted(GridKernalContext ctx, GridMergeTable tbl, String name) {
- super(ctx, tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns()));
+ super(ctx, tbl, name, TYPE, IndexColumn.wrap(tbl.getColumns()));
}
/**
@@ -64,10 +78,46 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
}
/** {@inheritDoc} */
+ @Override public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
+ super.setSources(nodes, segmentsCnt);
+
+ int x = nodes.size() * segmentsCnt;
+
+ assert x > 0: x;
+
+ activeSources.set(x);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean fetchedAll() {
+ int x = activeSources.get();
+
+ assert x >= 0: x; // This method must not be called if the sources were not set.
+
+ return x == 0 && queue.isEmpty();
+ }
+
+ /** {@inheritDoc} */
@Override protected void addPage0(GridResultPage page) {
assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
- queue.add(page);
+ // Do not add empty page to avoid premature stream termination.
+ if (page.rowsInPage() != 0 || page.isFail())
+ queue.add(page);
+
+ if (page.isLast()) {
+ int x = activeSources.decrementAndGet();
+
+ assert x >= 0: x;
+
+ if (x == 0) // Always terminate with empty iterator.
+ queue.add(createDummyLastPage(page));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
+ return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true);
}
/** {@inheritDoc} */
@@ -80,9 +130,6 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
@Override protected Cursor findInStream(SearchRow first, SearchRow last) {
// This index is unsorted: have to ignore bounds.
return new FetchingCursor(null, null, new Iterator<Row>() {
- /** */
- Iterator<Value[]> iter = Collections.emptyIterator();
-
@Override public boolean hasNext() {
iter = pollNextIterator(queue, iter);
@@ -98,4 +145,10 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
}
});
}
+
+ /**
+ */
+ private static class PollableQueue<X> extends LinkedBlockingQueue<X> implements Pollable<X> {
+ // No-op.
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
index 1489021..f7495c0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
@@ -18,35 +18,55 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
import java.util.ArrayList;
-import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2ScanIndex;
+import org.apache.ignite.internal.util.typedef.F;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Session;
import org.h2.index.Index;
import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.result.Row;
+import org.h2.result.SortOrder;
import org.h2.table.IndexColumn;
import org.h2.table.TableBase;
+import org.h2.table.TableFilter;
/**
* Merge table for distributed queries.
*/
public class GridMergeTable extends TableBase {
/** */
- private final GridKernalContext ctx;
-
- /** */
- private final GridMergeIndex idx;
+ private ArrayList<Index> idxs;
/**
* @param data Data.
- * @param ctx Kernal context.
*/
- public GridMergeTable(CreateTableData data, GridKernalContext ctx) {
+ public GridMergeTable(CreateTableData data) {
super(data);
+ }
+
+ /**
+ * @param idxs Indexes.
+ */
+ public void indexes(ArrayList<Index> idxs) {
+ assert !F.isEmpty(idxs);
+
+ this.idxs = idxs;
+ }
- this.ctx = ctx;
- idx = new GridMergeIndexUnsorted(ctx, this, "merge_scan");
+ /**
+ * @return Merge index.
+ */
+ public GridMergeIndex getMergeIndex() {
+ return (GridMergeIndex)idxs.get(idxs.size() - 1); // Sorted index must be the last.
+ }
+
+ /**
+ * @param idx Index.
+ * @return Scan index.
+ */
+ public static GridH2ScanIndex<GridMergeIndex> createScanIndex(GridMergeIndex idx) {
+ return new ScanIndex(idx);
}
/** {@inheritDoc} */
@@ -56,7 +76,7 @@ public class GridMergeTable extends TableBase {
/** {@inheritDoc} */
@Override public void close(Session ses) {
- idx.close(ses);
+ // No-op.
}
/** {@inheritDoc} */
@@ -96,8 +116,8 @@ public class GridMergeTable extends TableBase {
}
/** {@inheritDoc} */
- @Override public GridMergeIndex getScanIndex(Session session) {
- return idx;
+ @Override public Index getScanIndex(Session session) {
+ return idxs.get(0); // Must be always at 0.
}
/** {@inheritDoc} */
@@ -107,7 +127,7 @@ public class GridMergeTable extends TableBase {
/** {@inheritDoc} */
@Override public ArrayList<Index> getIndexes() {
- return null;
+ return idxs;
}
/** {@inheritDoc} */
@@ -137,12 +157,12 @@ public class GridMergeTable extends TableBase {
/** {@inheritDoc} */
@Override public long getRowCount(Session ses) {
- return idx.getRowCount(ses);
+ return getScanIndex(ses).getRowCount(ses);
}
/** {@inheritDoc} */
@Override public long getRowCountApproximation() {
- return idx.getRowCountApproximation();
+ return getScanIndex(null).getRowCountApproximation();
}
/** {@inheritDoc} */
@@ -154,4 +174,24 @@ public class GridMergeTable extends TableBase {
@Override public void checkRename() {
throw DbException.getUnsupportedException("rename");
}
+
+ /**
+ * Scan index wrapper.
+ */
+ private static class ScanIndex extends GridH2ScanIndex<GridMergeIndex> {
+ /**
+ * @param delegate Delegate.
+ */
+ public ScanIndex(GridMergeIndex delegate) {
+ super(delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double getCost(Session session, int[] masks, TableFilter[] filters, int filter,
+ SortOrder sortOrder) {
+ long rows = getRowCountApproximation();
+
+ return getCostRangeIndex(masks, rows, filters, filter, sortOrder, true);
+ }
+ }
}
\ No newline at end of file
[3/3] ignite git commit: Merge branch 'ignite-1.9.2' of
https://github.com/gridgain/apache-ignite
Posted by se...@apache.org.
Merge branch 'ignite-1.9.2' of https://github.com/gridgain/apache-ignite
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/32506d69
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/32506d69
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/32506d69
Branch: refs/heads/master
Commit: 32506d69c6b2654a3547a6797254c7527c6ef239
Parents: a61a98a 8817190
Author: Sergi Vladykin <se...@gmail.com>
Authored: Thu Mar 9 23:39:48 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Thu Mar 9 23:39:48 2017 +0300
----------------------------------------------------------------------
.../cache/query/GridCacheSqlQuery.java | 82 ++++-
.../processors/query/h2/IgniteH2Indexing.java | 4 +-
.../query/h2/opt/GridH2CollocationModel.java | 6 +-
.../query/h2/opt/GridH2ScanIndex.java | 273 +++++++++++++++++
.../processors/query/h2/opt/GridH2Table.java | 244 +--------------
.../processors/query/h2/sql/GridSqlQuery.java | 17 --
.../query/h2/sql/GridSqlQueryParser.java | 4 +-
.../query/h2/sql/GridSqlQuerySplitter.java | 38 ++-
.../query/h2/sql/GridSqlSortColumn.java | 41 +++
.../query/h2/twostep/GridMapQueryExecutor.java | 83 +++--
.../query/h2/twostep/GridMergeIndex.java | 300 +++++++++++--------
.../query/h2/twostep/GridMergeIndexSorted.java | 172 ++++++++---
.../h2/twostep/GridMergeIndexUnsorted.java | 67 ++++-
.../query/h2/twostep/GridMergeTable.java | 70 ++++-
.../h2/twostep/GridReduceQueryExecutor.java | 101 +++++--
.../query/h2/twostep/GridResultPage.java | 34 ++-
.../h2/twostep/msg/GridH2QueryRequest.java | 11 +-
.../cache/IgniteCacheAbstractQuerySelfTest.java | 10 +-
.../query/IgniteSqlSplitterSelfTest.java | 100 ++++++-
.../query/h2/sql/H2CompareBigQueryTest.java | 4 +-
.../IgniteCacheQuerySelfTestSuite.java | 2 +
.../processors/query/h2/sql/bigQuery.sql | 34 ++-
22 files changed, 1138 insertions(+), 559 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/32506d69/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/32506d69/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------