You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/13 10:28:08 UTC
[34/50] [abbrv] ignite git commit: ignite-1.9 - SQL related fixes and
improvements: - Sorted MERGE index - EXPLAIN fixes - Replicated subqueries
fixes
ignite-1.9 - SQL related fixes and improvements:
- Sorted MERGE index
- EXPLAIN fixes
- Replicated subqueries fixes
Squashed commit of the following:
commit 423c2155c85ed9be8dffb3517b7331b753e1ce5c
Author: Sergi Vladykin <se...@gmail.com>
Date: Thu Mar 9 23:21:38 2017 +0300
ignite-1.9.1 - test fix
commit ff3c1f2967905b0bcac7661014656d1c080fa803
Author: Sergi Vladykin <se...@gmail.com>
Date: Thu Mar 9 11:08:34 2017 +0300
ignite-1.9.0 - replicated subqueries fix
commit bc0801a3c976f5d87cab2c414f76f69dc28b43d7
Author: Sergi Vladykin <se...@gmail.com>
Date: Wed Mar 8 16:03:40 2017 +0300
ignite-1.9.0 - fix for distributed join test
commit f1f1d96c6babaadab9e3ed1fbb3c9740c94d8209
Author: Sergi Vladykin <se...@gmail.com>
Date: Wed Mar 8 15:28:44 2017 +0300
ignite-1.9.0 - fix for distributed join test
commit a8751d535b3e025a804c441204465e94035a5247
Author: Sergi Vladykin <se...@gmail.com>
Date: Tue Feb 28 18:46:07 2017 +0300
ignite-1.9 - splitter fixes
commit 0601ce6e291eb4689d526e922b02fd9e21df5b08
Author: Sergi Vladykin <se...@gmail.com>
Date: Sun Feb 26 23:24:14 2017 +0300
ignite-1.9 - merge index test
commit 4ad048e248157d799a325b3ce9975d4ad8a9fb49
Author: Sergi Vladykin <se...@gmail.com>
Date: Sun Feb 26 23:19:49 2017 +0300
ignite-1.9 - merge index
commit 4ea63d7335000b8f30bfbd1bb907e411cd62a5e8
Author: Sergi Vladykin <se...@gmail.com>
Date: Sun Feb 26 22:44:51 2017 +0300
ignite-1.9 - unsorted index fixed
commit a639bff6f25a8397e49a892f830c9de23c847127
Author: Sergi Vladykin <se...@gmail.com>
Date: Sun Feb 26 20:08:26 2017 +0300
ignite-1.9 - sorted index fixes2
commit ee9d524f5a0d6f1c416345822e8201c327f1e562
Author: Sergi Vladykin <se...@gmail.com>
Date: Fri Feb 24 16:00:26 2017 +0300
ignite-1.9 - sorted index fixes
commit fc42406a9e55851d53d9dfed8e6cf3c8b12af345
Author: Sergi Vladykin <se...@gmail.com>
Date: Thu Feb 23 16:46:39 2017 +0300
ignite-1.9 - sorted index
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8817190e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8817190e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8817190e
Branch: refs/heads/ignite-4565-ddl
Commit: 8817190e1dd31d869682df0167bb3e82fb597aad
Parents: 8362fe7
Author: Sergi Vladykin <se...@gmail.com>
Authored: Thu Mar 9 23:30:09 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Thu Mar 9 23:30:09 2017 +0300
----------------------------------------------------------------------
.../cache/query/GridCacheSqlQuery.java | 82 ++++-
.../processors/query/h2/IgniteH2Indexing.java | 4 +-
.../query/h2/opt/GridH2CollocationModel.java | 6 +-
.../query/h2/opt/GridH2ScanIndex.java | 273 +++++++++++++++++
.../processors/query/h2/opt/GridH2Table.java | 244 +--------------
.../processors/query/h2/sql/GridSqlQuery.java | 17 --
.../query/h2/sql/GridSqlQueryParser.java | 4 +-
.../query/h2/sql/GridSqlQuerySplitter.java | 38 ++-
.../query/h2/sql/GridSqlSortColumn.java | 41 +++
.../query/h2/twostep/GridMapQueryExecutor.java | 83 +++--
.../query/h2/twostep/GridMergeIndex.java | 300 +++++++++++--------
.../query/h2/twostep/GridMergeIndexSorted.java | 172 ++++++++---
.../h2/twostep/GridMergeIndexUnsorted.java | 67 ++++-
.../query/h2/twostep/GridMergeTable.java | 70 ++++-
.../h2/twostep/GridReduceQueryExecutor.java | 101 +++++--
.../query/h2/twostep/GridResultPage.java | 34 ++-
.../h2/twostep/msg/GridH2QueryRequest.java | 11 +-
.../cache/IgniteCacheAbstractQuerySelfTest.java | 10 +-
.../query/IgniteSqlSplitterSelfTest.java | 100 ++++++-
.../query/h2/sql/H2CompareBigQueryTest.java | 4 +-
.../IgniteCacheQuerySelfTestSuite.java | 2 +
.../processors/query/h2/sql/bigQuery.sql | 34 ++-
22 files changed, 1138 insertions(+), 559 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index 18688b7..c4bb205 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.cache.query;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridDirectTransient;
@@ -74,6 +76,19 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
/** Field kept for backward compatibility. */
private String alias;
+ /** Sort columns. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private transient List<?> sort;
+
+ /** If we have partitioned tables in this query. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private transient boolean partitioned;
+
+ /** Single node to execute the query on. */
+ private UUID node;
+
/**
* For {@link Message}.
*/
@@ -218,12 +233,18 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
writer.incrementState();
case 1:
- if (!writer.writeByteArray("paramsBytes", paramsBytes))
+ if (!writer.writeUuid("node", node))
return false;
writer.incrementState();
case 2:
+ if (!writer.writeByteArray("paramsBytes", paramsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
if (!writer.writeString("qry", qry))
return false;
@@ -251,7 +272,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 1:
- paramsBytes = reader.readByteArray("paramsBytes");
+ node = reader.readUuid("node");
if (!reader.isLastRead())
return false;
@@ -259,6 +280,14 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 2:
+ paramsBytes = reader.readByteArray("paramsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
qry = reader.readString("qry");
if (!reader.isLastRead())
@@ -278,7 +307,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 3;
+ return 4;
}
/**
@@ -292,6 +321,8 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
cp.cols = cols;
cp.paramIdxs = paramIdxs;
cp.paramsSize = paramsSize;
+ cp.sort = sort;
+ cp.partitioned = partitioned;
if (F.isEmpty(args))
cp.params = EMPTY_PARAMS;
@@ -304,4 +335,49 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
return cp;
}
+
+ /**
+ * @param sort Sort columns.
+ */
+ public void sortColumns(List<?> sort) {
+ this.sort = sort;
+ }
+
+ /**
+ * @return Sort columns.
+ */
+ public List<?> sortColumns() {
+ return sort;
+ }
+
+ /**
+ * @param partitioned If the query contains partitioned tables.
+ */
+ public void partitioned(boolean partitioned) {
+ this.partitioned = partitioned;
+ }
+
+ /**
+ * @return {@code true} If the query contains partitioned tables.
+ */
+ public boolean isPartitioned() {
+ return partitioned;
+ }
+
+ /**
+ * @return Single node to execute the query on or {@code null} if need to execute on all the nodes.
+ */
+ public UUID node() {
+ return node;
+ }
+
+ /**
+ * @param node Single node to execute the query on or {@code null} if need to execute on all the nodes.
+ * @return {@code this}.
+ */
+ public GridCacheSqlQuery node(UUID node) {
+ this.node = node;
+
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index b4bf608..8de8dc4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -85,7 +85,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -155,7 +154,6 @@ import org.h2.result.SortOrder;
import org.h2.server.web.WebServer;
import org.h2.table.Column;
import org.h2.table.IndexColumn;
-import org.h2.table.Table;
import org.h2.tools.Server;
import org.h2.util.JdbcUtils;
import org.h2.value.DataType;
@@ -1453,7 +1451,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
- Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt);
+ Prepared prepared = GridSqlQueryParser.prepared(stmt);
if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery) qry).isQuery() != prepared.isQuery())
throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver",
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
index ce11fd5..4df355e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
@@ -300,10 +300,10 @@ public final class GridH2CollocationModel {
assert childFilters == null;
// We are at table instance.
- GridH2Table tbl = (GridH2Table)filter().getTable();
+ Table tbl = filter().getTable();
// Only partitioned tables will do distributed joins.
- if (!tbl.isPartitioned()) {
+ if (!(tbl instanceof GridH2Table) || !((GridH2Table)tbl).isPartitioned()) {
type = Type.REPLICATED;
multiplier = MULTIPLIER_COLLOCATED;
@@ -593,7 +593,7 @@ public final class GridH2CollocationModel {
private GridH2CollocationModel child(int i, boolean create) {
GridH2CollocationModel child = children[i];
- if (child == null && create && isChildTableOrView(i, null)) {
+ if (child == null && create) {
TableFilter f = childFilters[i];
if (f.getTable().isView()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java
new file mode 100644
index 0000000..3ddd490
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import java.util.ArrayList;
+import org.h2.engine.Database;
+import org.h2.engine.DbObject;
+import org.h2.engine.Session;
+import org.h2.index.BaseIndex;
+import org.h2.index.Cursor;
+import org.h2.index.IndexLookupBatch;
+import org.h2.index.IndexType;
+import org.h2.message.DbException;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+import org.h2.schema.Schema;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.Table;
+import org.h2.table.TableFilter;
+
+/**
+ * Scan index base class.
+ */
+public abstract class GridH2ScanIndex<D extends BaseIndex> extends BaseIndex {
+ /** */
+ private static final IndexType TYPE = IndexType.createScan(false);
+
+ /** */
+ protected final D delegate;
+
+ /**
+ * @param delegate Delegate.
+ */
+ public GridH2ScanIndex(D delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDiskSpaceUsed() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void add(Session ses, Row row) {
+ delegate.add(ses, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canFindNext() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canGetFirstOrLast() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canScan() {
+ return delegate.canScan();
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void close(Session ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commit(int operation, Row row) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareRows(SearchRow rowData, SearchRow compare) {
+ return delegate.compareRows(rowData, compare);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) {
+ return find(filter.getSession(), first, last);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
+ return delegate.find(ses, null, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor findFirstOrLast(Session ses, boolean first) {
+ throw DbException.getUnsupportedException("SCAN");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor findNext(Session ses, SearchRow higherThan, SearchRow last) {
+ throw DbException.throwInternalError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getColumnIndex(Column col) {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Column[] getColumns() {
+ return delegate.getColumns();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexColumn[] getIndexColumns() {
+ return delegate.getIndexColumns();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexType getIndexType() {
+ return TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row getRow(Session ses, long key) {
+ return delegate.getRow(ses, key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCount(Session ses) {
+ return delegate.getRowCount(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getRowCountApproximation() {
+ return delegate.getRowCountApproximation();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table getTable() {
+ return delegate.getTable();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isRowIdIndex() {
+ return delegate.isRowIdIndex();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needRebuild() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(Session ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(Session ses, Row row) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setSortedInsertMode(boolean sortedInsertMode) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
+ return delegate.createLookupBatch(filter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void truncate(Session ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Schema getSchema() {
+ return delegate.getSchema();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isHidden() {
+ return delegate.isHidden();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkRename() {
+ throw DbException.getUnsupportedException("rename");
+ }
+
+ /** {@inheritDoc} */
+ @Override public ArrayList<DbObject> getChildren() {
+ return delegate.getChildren();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getComment() {
+ return delegate.getComment();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getCreateSQL() {
+ return null; // Scan should return null.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getCreateSQLForCopy(Table tbl, String quotedName) {
+ return delegate.getCreateSQLForCopy(tbl, quotedName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Database getDatabase() {
+ return delegate.getDatabase();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDropSQL() {
+ return delegate.getDropSQL();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getId() {
+ return delegate.getId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getSQL() {
+ return delegate.getSQL();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getType() {
+ return delegate.getType();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isTemporary() {
+ return delegate.isTemporary();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeChildrenAndResources(Session ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rename(String newName) {
+ throw DbException.getUnsupportedException("rename");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setComment(String comment) {
+ throw DbException.getUnsupportedException("comment");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setTemporary(boolean temporary) {
+ throw DbException.getUnsupportedException("temporary");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 8d080ae..4d5ea4b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -34,22 +34,14 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.h2.api.TableEngine;
import org.h2.command.ddl.CreateTableData;
-import org.h2.engine.Database;
-import org.h2.engine.DbObject;
import org.h2.engine.Session;
-import org.h2.index.BaseIndex;
-import org.h2.index.Cursor;
import org.h2.index.Index;
-import org.h2.index.IndexLookupBatch;
import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.result.SortOrder;
-import org.h2.schema.Schema;
-import org.h2.table.Column;
import org.h2.table.IndexColumn;
-import org.h2.table.Table;
import org.h2.table.TableBase;
import org.h2.table.TableFilter;
import org.h2.value.Value;
@@ -857,93 +849,15 @@ public class GridH2Table extends TableBase {
* Wrapper type for primary key.
*/
@SuppressWarnings("PackageVisibleInnerClass")
- static class ScanIndex extends BaseIndex {
+ static class ScanIndex extends GridH2ScanIndex<GridH2IndexBase> {
/** */
static final String SCAN_INDEX_NAME_SUFFIX = "__SCAN_";
- /** */
- private static final IndexType TYPE = IndexType.createScan(false);
-
- /** */
- private final GridH2IndexBase delegate;
-
/**
- * Constructor.
- *
- * @param delegate Index delegate to.
+ * @param delegate Delegate.
*/
- private ScanIndex(GridH2IndexBase delegate) {
- this.delegate = delegate;
- }
-
- /** {@inheritDoc} */
- @Override public long getDiskSpaceUsed() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void add(Session ses, Row row) {
- delegate.add(ses, row);
- }
-
- /** {@inheritDoc} */
- @Override public boolean canFindNext() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean canGetFirstOrLast() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean canScan() {
- return delegate.canScan();
- }
-
- /** {@inheritDoc} */
- @Override public final void close(Session ses) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void commit(int operation, Row row) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public int compareRows(SearchRow rowData, SearchRow compare) {
- return delegate.compareRows(rowData, compare);
- }
-
- /** {@inheritDoc} */
- @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) {
- return find(filter.getSession(), first, last);
- }
-
- /** {@inheritDoc} */
- @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
- return delegate.find(ses, null, null);
- }
-
- /** {@inheritDoc} */
- @Override public Cursor findFirstOrLast(Session ses, boolean first) {
- throw DbException.getUnsupportedException("SCAN");
- }
-
- /** {@inheritDoc} */
- @Override public Cursor findNext(Session ses, SearchRow higherThan, SearchRow last) {
- throw DbException.throwInternalError();
- }
-
- /** {@inheritDoc} */
- @Override public int getColumnIndex(Column col) {
- return -1;
- }
-
- /** {@inheritDoc} */
- @Override public Column[] getColumns() {
- return delegate.getColumns();
+ public ScanIndex(GridH2IndexBase delegate) {
+ super(delegate);
}
/** {@inheritDoc} */
@@ -957,163 +871,13 @@ public class GridH2Table extends TableBase {
}
/** {@inheritDoc} */
- @Override public IndexColumn[] getIndexColumns() {
- return delegate.getIndexColumns();
- }
-
- /** {@inheritDoc} */
- @Override public IndexType getIndexType() {
- return TYPE;
- }
-
- /** {@inheritDoc} */
@Override public String getPlanSQL() {
return delegate.getTable().getSQL() + "." + SCAN_INDEX_NAME_SUFFIX;
}
/** {@inheritDoc} */
- @Override public Row getRow(Session ses, long key) {
- return delegate.getRow(ses, key);
- }
-
- /** {@inheritDoc} */
- @Override public long getRowCount(Session ses) {
- return delegate.getRowCount(ses);
- }
-
- /** {@inheritDoc} */
- @Override public long getRowCountApproximation() {
- return delegate.getRowCountApproximation();
- }
-
- /** {@inheritDoc} */
- @Override public Table getTable() {
- return delegate.getTable();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isRowIdIndex() {
- return delegate.isRowIdIndex();
- }
-
- /** {@inheritDoc} */
- @Override public boolean needRebuild() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void remove(Session ses) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void remove(Session ses, Row row) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void setSortedInsertMode(boolean sortedInsertMode) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
- return delegate.createLookupBatch(filter);
- }
-
- /** {@inheritDoc} */
- @Override public void truncate(Session ses) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public Schema getSchema() {
- return delegate.getSchema();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isHidden() {
- return delegate.isHidden();
- }
-
- /** {@inheritDoc} */
- @Override public void checkRename() {
- throw DbException.getUnsupportedException("rename");
- }
-
- /** {@inheritDoc} */
- @Override public ArrayList<DbObject> getChildren() {
- return delegate.getChildren();
- }
-
- /** {@inheritDoc} */
- @Override public String getComment() {
- return delegate.getComment();
- }
-
- /** {@inheritDoc} */
- @Override public String getCreateSQL() {
- return null; // Scan should return null.
- }
-
- /** {@inheritDoc} */
- @Override public String getCreateSQLForCopy(Table tbl, String quotedName) {
- return delegate.getCreateSQLForCopy(tbl, quotedName);
- }
-
- /** {@inheritDoc} */
- @Override public Database getDatabase() {
- return delegate.getDatabase();
- }
-
- /** {@inheritDoc} */
- @Override public String getDropSQL() {
- return delegate.getDropSQL();
- }
-
- /** {@inheritDoc} */
- @Override public int getId() {
- return delegate.getId();
- }
-
- /** {@inheritDoc} */
@Override public String getName() {
return delegate.getName() + SCAN_INDEX_NAME_SUFFIX;
}
-
- /** {@inheritDoc} */
- @Override public String getSQL() {
- return delegate.getSQL();
- }
-
- /** {@inheritDoc} */
- @Override public int getType() {
- return delegate.getType();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isTemporary() {
- return delegate.isTemporary();
- }
-
- /** {@inheritDoc} */
- @Override public void removeChildrenAndResources(Session ses) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void rename(String newName) {
- throw DbException.getUnsupportedException("rename");
- }
-
- /** {@inheritDoc} */
- @Override public void setComment(String comment) {
- throw DbException.getUnsupportedException("comment");
- }
-
- /** {@inheritDoc} */
- @Override public void setTemporary(boolean temporary) {
- throw DbException.getUnsupportedException("temporary");
- }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
index 7d4b7f0..9511866 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
@@ -38,9 +38,6 @@ public abstract class GridSqlQuery extends GridSqlStatement implements GridSqlAs
/** */
private GridSqlAst offset;
- /** */
- private boolean distinct;
-
/**
* @return Offset.
*/
@@ -56,20 +53,6 @@ public abstract class GridSqlQuery extends GridSqlStatement implements GridSqlAs
}
/**
- * @return Distinct.
- */
- public boolean distinct() {
- return distinct;
- }
-
- /**
- * @param distinct New distinct.
- */
- public void distinct(boolean distinct) {
- this.distinct = distinct;
- }
-
- /**
* @return Sort.
*/
public List<GridSqlSortColumn> sort() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 024529c..16d7105 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -415,7 +415,7 @@ public class GridSqlQueryParser {
res.distinct(select.isDistinct());
Expression where = CONDITION.get(select);
- res.where(parseExpression(where, false));
+ res.where(parseExpression(where, true));
ArrayList<TableFilter> tableFilters = new ArrayList<>();
@@ -447,7 +447,7 @@ public class GridSqlQueryParser {
GridSqlElement gridFilter = parseTableFilter(f);
from = from == null ? gridFilter : new GridSqlJoin(from, gridFilter, f.isJoinOuter(),
- parseExpression(f.getJoinCondition(), false));
+ parseExpression(f.getJoinCondition(), true));
}
res.from(from);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 277cabc..aec0b36 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -207,7 +207,6 @@ public class GridSqlQuerySplitter {
// If we have distributed joins, then we have to optimize all MAP side queries
// to have a correct join order with respect to batched joins and check if we need
// distributed joins at all.
- // TODO Also we need to have a list of table aliases to filter by primary or explicit partitions.
if (distributedJoins) {
boolean allCollocated = true;
@@ -220,7 +219,7 @@ public class GridSqlQuerySplitter {
mapSqlQry.query(parse(prepared, true).getSQL());
}
- // We do not need distributed joins if all MAP queries are colocated.
+ // We do not need distributed joins if all MAP queries are collocated.
if (allCollocated)
distributedJoins = false;
}
@@ -861,6 +860,7 @@ public class GridSqlQuerySplitter {
if (!tblAliases.contains(tblAlias))
return;
+ GridSqlType resType = col.resultType();
String uniqueColAlias = uniqueColumnAlias(col);
GridSqlAlias colAlias = cols.get(uniqueColAlias);
@@ -874,6 +874,7 @@ public class GridSqlQuerySplitter {
col = column(uniqueColAlias);
// col.tableAlias(wrapAlias.alias());
col.expressionInFrom(wrapAlias);
+ col.resultType(resType);
prnt.child(childIdx, col);
}
@@ -1066,7 +1067,7 @@ public class GridSqlQuerySplitter {
else if (qrym.type == Type.UNION) {
// If it is not a UNION ALL, then we have to split because otherwise we can produce duplicates or
// wrong results for UNION DISTINCT, EXCEPT, INTERSECT queries.
- if (!qrym.needSplitChild && !qrym.unionAll)
+ if (!qrym.needSplitChild && (!qrym.unionAll || hasOffsetLimit(qrym.<GridSqlUnion>ast())))
qrym.needSplitChild = true;
// If we have to split some child SELECT in this UNION, then we have to enforce split
@@ -1151,6 +1152,14 @@ public class GridSqlQuerySplitter {
}
/**
+ * @param qry Query.
+ * @return {@code true} If we have OFFSET LIMIT.
+ */
+ private static boolean hasOffsetLimit(GridSqlQuery qry) {
+ return qry.limit() != null || qry.offset() != null;
+ }
+
+ /**
* @param select Select to check.
* @return {@code true} If we need to split this select.
*/
@@ -1158,6 +1167,9 @@ public class GridSqlQuerySplitter {
if (select.distinct())
return true;
+ if (hasOffsetLimit(select))
+ return true;
+
if (collocatedGrpBy)
return false;
@@ -1304,11 +1316,29 @@ public class GridSqlQuerySplitter {
setupParameters(map, mapQry, params);
map.columns(collectColumns(mapExps));
+ map.sortColumns(mapQry.sort());
+ map.partitioned(hasPartitionedTables(mapQry));
mapSqlQrys.add(map);
}
/**
+ * @param ast Map query AST.
+ * @return {@code true} If the given AST has partitioned tables.
+ */
+ private static boolean hasPartitionedTables(GridSqlAst ast) {
+ if (ast instanceof GridSqlTable)
+ return ((GridSqlTable)ast).dataTable().isPartitioned();
+
+ for (int i = 0; i < ast.size(); i++) {
+ if (hasPartitionedTables(ast.child(i)))
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* @param sqlQry Query.
* @param qryAst Select AST.
* @param params All parameters.
@@ -1333,7 +1363,7 @@ public class GridSqlQuerySplitter {
GridSqlType t = col.resultType();
if (t == null)
- throw new NullPointerException("Column type.");
+ throw new NullPointerException("Column type: " + col);
if (t == GridSqlType.UNKNOWN)
throw new IllegalStateException("Unknown type: " + col);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
index 8e8947f..d870ac5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
@@ -17,6 +17,13 @@
package org.apache.ignite.internal.processors.query.h2.sql;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.F;
+import org.h2.result.SortOrder;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.Table;
+
/**
* Sort order for ORDER BY clause.
*/
@@ -47,6 +54,40 @@ public class GridSqlSortColumn {
}
/**
+ * @param tbl Table.
+ * @param sortCols Sort columns.
+ * @return Index columns.
+ */
+ public static IndexColumn[] toIndexColumns(Table tbl, List<GridSqlSortColumn> sortCols) {
+ assert !F.isEmpty(sortCols);
+
+ IndexColumn[] res = new IndexColumn[sortCols.size()];
+
+ for (int i = 0; i < res.length; i++) {
+ GridSqlSortColumn sc = sortCols.get(i);
+
+ Column col = tbl.getColumn(sc.column());
+
+ IndexColumn c = new IndexColumn();
+
+ c.column = col;
+ c.columnName = col.getName();
+
+ c.sortType = sc.asc ? SortOrder.ASCENDING : SortOrder.DESCENDING;
+
+ if (sc.nullsFirst)
+ c.sortType |= SortOrder.NULLS_FIRST;
+
+ if (sc.nullsLast)
+ c.sortType |= SortOrder.NULLS_LAST;
+
+ res[i] = c;
+ }
+
+ return res;
+ }
+
+ /**
* @return Column index.
*/
public int column() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index f002a5e..6416b21 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -459,8 +459,11 @@ public class GridMapQueryExecutor {
req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
+ final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
- for (int i = 1; i < mainCctx.config().getQueryParallelism(); i++) {
+ int segments = explain ? 1 : mainCctx.config().getQueryParallelism();
+
+ for (int i = 1; i < segments; i++) {
final int segment = i;
ctx.closure().callLocal(
@@ -587,7 +590,6 @@ public class GridMapQueryExecutor {
Connection conn = h2.connectionForSpace(mainCctx.name());
- // Here we enforce join order to have the same behavior on all the nodes.
setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
GridH2QueryContext.set(qctx);
@@ -610,28 +612,34 @@ public class GridMapQueryExecutor {
boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
for (GridCacheSqlQuery qry : qrys) {
- ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
- F.asList(qry.parameters()), true,
- timeout,
- qr.cancels[qryIdx]);
-
- if (evt) {
- ctx.event().record(new CacheQueryExecutedEvent<>(
- node,
- "SQL query executed.",
- EVT_CACHE_QUERY_EXECUTED,
- CacheQueryType.SQL.name(),
- mainCctx.namex(),
- null,
- qry.query(),
- null,
- null,
- qry.parameters(),
- node.id(),
- null));
- }
+ ResultSet rs = null;
+
+ // If we are not the target node for this replicated query, just ignore it.
+ if (qry.node() == null ||
+ (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
+ rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
+ F.asList(qry.parameters()), true,
+ timeout,
+ qr.cancels[qryIdx]);
+
+ if (evt) {
+ ctx.event().record(new CacheQueryExecutedEvent<>(
+ node,
+ "SQL query executed.",
+ EVT_CACHE_QUERY_EXECUTED,
+ CacheQueryType.SQL.name(),
+ mainCctx.namex(),
+ null,
+ qry.query(),
+ null,
+ null,
+ qry.parameters(),
+ node.id(),
+ null));
+ }
- assert rs instanceof JdbcResultSet : rs.getClass();
+ assert rs instanceof JdbcResultSet : rs.getClass();
+ }
qr.addResult(qryIdx, qry, node.id(), rs);
@@ -751,6 +759,9 @@ public class GridMapQueryExecutor {
assert res != null;
+ if (res.closed)
+ return;
+
int page = res.page;
List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize));
@@ -1081,21 +1092,31 @@ public class GridMapQueryExecutor {
* @param qry Query.
*/
private QueryResult(ResultSet rs, GridCacheContext<?, ?> cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry) {
- this.rs = rs;
this.cctx = cctx;
this.qry = qry;
this.qrySrcNodeId = qrySrcNodeId;
this.cpNeeded = cctx.isLocalNode(qrySrcNodeId);
- try {
- res = (ResultInterface)RESULT_FIELD.get(rs);
- }
- catch (IllegalAccessException e) {
- throw new IllegalStateException(e); // Must not happen.
+ if (rs != null) {
+ this.rs = rs;
+ try {
+ res = (ResultInterface)RESULT_FIELD.get(rs);
+ }
+ catch (IllegalAccessException e) {
+ throw new IllegalStateException(e); // Must not happen.
+ }
+
+ rowCnt = res.getRowCount();
+ cols = res.getVisibleColumnCount();
}
+ else {
+ this.rs = null;
+ this.res = null;
+ this.cols = -1;
+ this.rowCnt = -1;
- rowCnt = res.getRowCount();
- cols = res.getVisibleColumnCount();
+ closed = true;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 6a6e045..27622bb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -22,20 +22,22 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.RandomAccess;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.CacheException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.engine.Session;
import org.h2.index.BaseIndex;
@@ -44,13 +46,10 @@ import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
-import org.h2.result.SortOrder;
import org.h2.table.IndexColumn;
-import org.h2.table.TableFilter;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
-import static java.util.Collections.emptyIterator;
import static java.util.Objects.requireNonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE;
@@ -67,6 +66,22 @@ public abstract class GridMergeIndex extends BaseIndex {
private static final int PREFETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE, 1024);
/** */
+ private static final AtomicReferenceFieldUpdater<GridMergeIndex, ConcurrentMap> lastPagesUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(GridMergeIndex.class, ConcurrentMap.class, "lastPages");
+
+ static {
+ if (!U.isPow2(PREFETCH_SIZE)) {
+ throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
+ ") must be positive and a power of 2.");
+ }
+
+ if (PREFETCH_SIZE >= MAX_FETCH_SIZE) {
+ throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
+ ") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ").");
+ }
+ }
+
+ /** */
protected final Comparator<SearchRow> firstRowCmp = new Comparator<SearchRow>() {
@Override public int compare(SearchRow rowInList, SearchRow searchRow) {
int res = compareRows(rowInList, searchRow);
@@ -84,14 +99,11 @@ public abstract class GridMergeIndex extends BaseIndex {
}
};
- /** All rows number. */
- private final AtomicInteger expRowsCnt = new AtomicInteger(0);
-
- /** Remaining rows per source node ID. */
- private Map<UUID, Counter[]> remainingRows;
+ /** Row source nodes. */
+ private Set<UUID> sources;
/** */
- private final AtomicBoolean lastSubmitted = new AtomicBoolean();
+ private int pageSize;
/**
* Will be r/w from query execution thread only, does not need to be threadsafe.
@@ -107,6 +119,9 @@ public abstract class GridMergeIndex extends BaseIndex {
/** */
private final GridKernalContext ctx;
+ /** */
+ private volatile ConcurrentMap<SourceKey, Integer> lastPages;
+
/**
* @param ctx Context.
* @param tbl Table.
@@ -129,16 +144,6 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param ctx Context.
*/
protected GridMergeIndex(GridKernalContext ctx) {
- if (!U.isPow2(PREFETCH_SIZE)) {
- throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
- ") must be positive and a power of 2.");
- }
-
- if (PREFETCH_SIZE >= MAX_FETCH_SIZE) {
- throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
- ") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ").");
- }
-
this.ctx = ctx;
fetched = new BlockList<>(PREFETCH_SIZE);
@@ -148,7 +153,7 @@ public abstract class GridMergeIndex extends BaseIndex {
* @return Return source nodes for this merge index.
*/
public Set<UUID> sources() {
- return remainingRows.keySet();
+ return sources;
}
/**
@@ -169,17 +174,24 @@ public abstract class GridMergeIndex extends BaseIndex {
* @return {@code true} If this index needs data from the given source node.
*/
public boolean hasSource(UUID nodeId) {
- return remainingRows.containsKey(nodeId);
+ return sources.contains(nodeId);
}
/** {@inheritDoc} */
@Override public long getRowCount(Session ses) {
- return expRowsCnt.get();
+ Cursor c = find(ses, null, null);
+
+ long cnt = 0;
+
+ while (c.next())
+ cnt++;
+
+ return cnt;
}
/** {@inheritDoc} */
@Override public long getRowCountApproximation() {
- return getRowCount(null);
+ return 10_000;
}
/**
@@ -189,27 +201,28 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param segmentsCnt Index segments per table.
*/
public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
- assert remainingRows == null;
+ assert sources == null;
- remainingRows = U.newHashMap(nodes.size());
+ sources = new HashSet<>();
for (ClusterNode node : nodes) {
- Counter[] counters = new Counter[segmentsCnt];
-
- for (int i = 0; i < segmentsCnt; i++)
- counters[i] = new Counter();
-
- if (remainingRows.put(node.id(), counters) != null)
- throw new IllegalStateException("Duplicate node id: " + node.id());
-
+ if (!sources.add(node.id()))
+ throw new IllegalStateException();
}
}
/**
+ * @param pageSize Page size.
+ */
+ public void setPageSize(int pageSize) {
+ this.pageSize = pageSize;
+ }
+
+ /**
* @param queue Queue to poll.
* @return Next page.
*/
- private GridResultPage takeNextPage(BlockingQueue<GridResultPage> queue) {
+ private GridResultPage takeNextPage(Pollable<GridResultPage> queue) {
GridResultPage page;
for (;;) {
@@ -234,16 +247,17 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param iter Current iterator.
* @return The same or new iterator.
*/
- protected final Iterator<Value[]> pollNextIterator(BlockingQueue<GridResultPage> queue, Iterator<Value[]> iter) {
- while (!iter.hasNext()) {
+ protected final Iterator<Value[]> pollNextIterator(Pollable<GridResultPage> queue, Iterator<Value[]> iter) {
+ if (!iter.hasNext()) {
GridResultPage page = takeNextPage(queue);
- if (page.isLast())
- return emptyIterator(); // We are done.
-
- fetchNextPage(page);
+ if (!page.isLast())
+ page.fetchNextPage(); // Failed will throw an exception here.
iter = page.rows();
+
+ // The received iterator must be empty in the dummy last page or on failure.
+ assert iter.hasNext() || page.isDummyLast() || page.isFail();
}
return iter;
@@ -253,23 +267,18 @@ public abstract class GridMergeIndex extends BaseIndex {
* @param e Error.
*/
public void fail(final CacheException e) {
- for (UUID nodeId0 : remainingRows.keySet()) {
- addPage0(new GridResultPage(null, nodeId0, null) {
- @Override public boolean isFail() {
- return true;
- }
-
- @Override public void fetchNextPage() {
- throw e;
- }
- });
- }
+ for (UUID nodeId : sources)
+ fail(nodeId, e);
}
/**
* @param nodeId Node ID.
+ * @param e Exception.
*/
public void fail(UUID nodeId, final CacheException e) {
+ if (nodeId == null)
+ nodeId = F.first(sources);
+
addPage0(new GridResultPage(null, nodeId, null) {
@Override public boolean isFail() {
return true;
@@ -285,91 +294,88 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/**
- * @param page Page.
+ * @param nodeId Node ID.
+ * @param res Response.
*/
- public final void addPage(GridResultPage page) {
- int pageRowsCnt = page.rowsInPage();
+ private void initLastPages(UUID nodeId, GridQueryNextPageResponse res) {
+ int allRows = res.allRows();
- Counter cnt = remainingRows.get(page.source())[page.res.segmentId()];
+ // If the old protocol we send all rows number in the page 0, other pages have -1.
+ // In the new protocol we do not know it and always have -1, except terminating page,
+ // which has -2. Thus we have to init page counters only when we receive positive value
+ // in the first page.
+ if (allRows < 0 || res.page() != 0)
+ return;
- // RemainingRowsCount should be updated before page adding to avoid race
- // in GridMergeIndexUnsorted cursor iterator
- int remainingRowsCount;
+ ConcurrentMap<SourceKey,Integer> lp = lastPages;
- int allRows = page.response().allRows();
+ if (lp == null && !lastPagesUpdater.compareAndSet(this, null, lp = new ConcurrentHashMap<>()))
+ lp = lastPages;
- if (allRows != -1) { // Only the first page contains allRows count and is allowed to init counter.
- assert cnt.state == State.UNINITIALIZED : "Counter is already initialized.";
+ assert pageSize > 0: pageSize;
- remainingRowsCount = cnt.addAndGet(allRows - pageRowsCnt);
+ int lastPage = allRows == 0 ? 0 : (allRows - 1) / pageSize;
- expRowsCnt.addAndGet(allRows);
+ assert lastPage >= 0: lastPage;
- // Add page before setting initialized flag to avoid race condition with adding last page
- if (pageRowsCnt > 0)
- addPage0(page);
+ if (lp.put(new SourceKey(nodeId, res.segmentId()), lastPage) != null)
+ throw new IllegalStateException();
+ }
- // We need this separate flag to handle case when the first source contains only one page
- // and it will signal that all remaining counters are zero and fetch is finished.
- cnt.state = State.INITIALIZED;
- }
- else {
- remainingRowsCount = cnt.addAndGet(-pageRowsCnt);
+ /**
+ * @param page Page.
+ */
+ private void markLastPage(GridResultPage page) {
+ GridQueryNextPageResponse res = page.response();
- if (pageRowsCnt > 0)
- addPage0(page);
- }
+ if (res.allRows() != -2) { // -2 means the last page.
+ UUID nodeId = page.source();
- if (remainingRowsCount == 0) { // Result can be negative in case of race between messages, it is ok.
- if (cnt.state == State.UNINITIALIZED)
- return;
+ initLastPages(nodeId, res);
- // Guarantee that finished state possible only if counter is zero and all pages was added
- cnt.state = State.FINISHED;
+ ConcurrentMap<SourceKey,Integer> lp = lastPages;
- for (Counter[] cntrs : remainingRows.values()) { // Check all the sources.
- for(int i = 0; i < cntrs.length; i++) {
- if (cntrs[i].state != State.FINISHED)
- return;
- }
- }
+ if (lp == null)
+ return; // It was not initialized --> wait for -2.
- if (lastSubmitted.compareAndSet(false, true)) {
- addPage0(new GridResultPage(null, page.source(), null) {
- @Override public boolean isLast() {
- return true;
- }
- });
+ Integer lastPage = lp.get(new SourceKey(nodeId, res.segmentId()));
+
+ if (lastPage == null)
+ return; // This node may use the new protocol --> wait for -2.
+
+ if (lastPage != res.page()) {
+ assert lastPage > res.page();
+
+ return; // This is not the last page.
}
}
+
+ page.setLast(true);
}
/**
* @param page Page.
*/
- protected abstract void addPage0(GridResultPage page);
+ public final void addPage(GridResultPage page) {
+ markLastPage(page);
+ addPage0(page);
+ }
/**
- * @param page Page.
+ * @param lastPage Real last page.
+ * @return Created dummy page.
*/
- protected void fetchNextPage(GridResultPage page) {
- assert !page.isLast();
-
- if(page.isFail())
- page.fetchNextPage(); //rethrow exceptions
-
- assert page.res != null;
-
- Counter[] counters = remainingRows.get(page.source());
+ protected final GridResultPage createDummyLastPage(GridResultPage lastPage) {
+ assert !lastPage.isDummyLast(); // It must be a real last page.
- int segId = page.res.segmentId();
-
- Counter counter = counters[segId];
-
- if (counter.get() != 0)
- page.fetchNextPage();
+ return new GridResultPage(ctx, lastPage.source(), null).setLast(true);
}
+ /**
+ * @param page Page.
+ */
+ protected abstract void addPage0(GridResultPage page);
+
/** {@inheritDoc} */
@Override public final Cursor find(Session ses, SearchRow first, SearchRow last) {
checkBounds(lastEvictedRow, first, last);
@@ -381,11 +387,9 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/**
- * @return {@code true} If we have fetched all the remote rows.
+ * @return {@code true} If we have fetched all the remote rows into a fetched list.
*/
- public boolean fetchedAll() {
- return fetchedCnt == expRowsCnt.get();
- }
+ public abstract boolean fetchedAll();
/**
* @param lastEvictedRow Last evicted fetched row.
@@ -433,11 +437,6 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/** {@inheritDoc} */
- @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
- return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true);
- }
-
- /** {@inheritDoc} */
@Override public void remove(Session ses) {
throw DbException.getUnsupportedException("remove index");
}
@@ -683,14 +682,6 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/**
- * Counter with initialization flag.
- */
- private static class Counter extends AtomicInteger {
- /** */
- volatile State state = State.UNINITIALIZED;
- }
-
- /**
*/
private static final class BlockList<Z> extends AbstractList<Z> implements RandomAccess {
/** */
@@ -766,4 +757,53 @@ public abstract class GridMergeIndex extends BaseIndex {
return res;
}
}
+
+ /**
+ * Pollable.
+ */
+ protected static interface Pollable<E> {
+ /**
+ * @param timeout Timeout.
+ * @param unit Time unit.
+ * @return Polled value or {@code null} if none.
+ * @throws InterruptedException If interrupted.
+ */
+ E poll(long timeout, TimeUnit unit) throws InterruptedException;
+ }
+
+ /**
+ */
+ private static class SourceKey {
+ final UUID nodeId;
+
+ /** */
+ final int segment;
+
+ /**
+ * @param nodeId Node ID.
+ * @param segment Segment.
+ */
+ SourceKey(UUID nodeId, int segment) {
+ this.nodeId = nodeId;
+ this.segment = segment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ SourceKey sourceKey = (SourceKey)o;
+
+ if (segment != sourceKey.segment) return false;
+ return nodeId.equals(sourceKey.nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = nodeId.hashCode();
+ result = 31 * result + segment;
+ return result;
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
index 32c676d..361bb2d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
@@ -25,18 +25,24 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.engine.Session;
import org.h2.index.Cursor;
import org.h2.index.IndexType;
import org.h2.result.Row;
import org.h2.result.SearchRow;
+import org.h2.result.SortOrder;
import org.h2.table.IndexColumn;
+import org.h2.table.TableFilter;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
@@ -48,6 +54,9 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase
*/
public final class GridMergeIndexSorted extends GridMergeIndex {
/** */
+ private static final IndexType TYPE = IndexType.createNonUnique(false);
+
+ /** */
private final Comparator<RowStream> streamCmp = new Comparator<RowStream>() {
@Override public int compare(RowStream o1, RowStream o2) {
// Nulls at the beginning.
@@ -62,26 +71,33 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
};
/** */
- private Map<UUID,RowStream> streamsMap;
+ private Map<UUID,RowStream[]> streamsMap;
/** */
- private RowStream[] streams;
+ private final Lock lock = new ReentrantLock();
+
+ /** */
+ private final Condition notEmpty = lock.newCondition();
+
+ /** */
+ private GridResultPage failPage;
+
+ /** */
+ private MergeStreamIterator it;
/**
* @param ctx Kernal context.
* @param tbl Table.
* @param name Index name,
- * @param type Index type.
* @param cols Columns.
*/
public GridMergeIndexSorted(
GridKernalContext ctx,
GridMergeTable tbl,
String name,
- IndexType type,
IndexColumn[] cols
) {
- super(ctx, tbl, name, type, cols);
+ super(ctx, tbl, name, TYPE, cols);
}
/** {@inheritDoc} */
@@ -89,33 +105,48 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
super.setSources(nodes, segmentsCnt);
streamsMap = U.newHashMap(nodes.size());
- streams = new RowStream[nodes.size()];
+ RowStream[] streams = new RowStream[nodes.size() * segmentsCnt];
int i = 0;
for (ClusterNode node : nodes) {
- RowStream stream = new RowStream(node.id());
+ RowStream[] segments = new RowStream[segmentsCnt];
- streams[i] = stream;
+ for (int s = 0; s < segmentsCnt; s++)
+ streams[i++] = segments[s] = new RowStream();
- if (streamsMap.put(stream.src, stream) != null)
+ if (streamsMap.put(node.id(), segments) != null)
throw new IllegalStateException();
}
+
+ it = new MergeStreamIterator(streams);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean fetchedAll() {
+ return it.fetchedAll();
}
/** {@inheritDoc} */
@Override protected void addPage0(GridResultPage page) {
- if (page.isLast() || page.isFail()) {
- // Finish all the streams.
- for (RowStream stream : streams)
- stream.addPage(page);
+ if (page.isFail()) {
+ lock.lock();
+
+ try {
+ if (failPage == null) {
+ failPage = page;
+
+ notEmpty.signalAll();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
}
else {
- assert page.rowsInPage() > 0;
-
UUID src = page.source();
- streamsMap.get(src).addPage(page);
+ streamsMap.get(src)[page.segmentId()].addPage(page);
}
}
@@ -153,8 +184,13 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
}
/** {@inheritDoc} */
+ @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
+ return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, false);
+ }
+
+ /** {@inheritDoc} */
@Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
- return new FetchingCursor(first, last, new MergeStreamIterator());
+ return new FetchingCursor(first, last, it);
}
/**
@@ -165,17 +201,42 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
private boolean first = true;
/** */
- private int off;
+ private volatile int off;
/** */
private boolean hasNext;
+ /** */
+ private final RowStream[] streams;
+
+ /**
+ * @param streams Streams.
+ */
+ MergeStreamIterator(RowStream[] streams) {
+ assert !F.isEmpty(streams);
+
+ this.streams = streams;
+ }
+
+ /**
+ * @return {@code true} If fetched all.
+ */
+ private boolean fetchedAll() {
+ return off == streams.length;
+ }
+
/**
*
*/
private void goFirst() {
+ assert first;
+
+ first = false;
+
for (int i = 0; i < streams.length; i++) {
- if (!streams[i].next()) {
+ RowStream s = streams[i];
+
+ if (!s.next()) {
streams[i] = null;
off++; // Move left bound.
}
@@ -183,8 +244,6 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
if (off < streams.length)
Arrays.sort(streams, streamCmp);
-
- first = false;
}
/**
@@ -229,31 +288,68 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
/**
* Row stream.
*/
- private final class RowStream {
- /** */
- final UUID src;
-
- /** */
- final BlockingQueue<GridResultPage> queue = new ArrayBlockingQueue<>(8);
-
+ private final class RowStream implements Pollable<GridResultPage> {
/** */
Iterator<Value[]> iter = emptyIterator();
/** */
Row cur;
- /**
- * @param src Source.
- */
- private RowStream(UUID src) {
- this.src = src;
- }
+ /** */
+ GridResultPage nextPage;
/**
* @param page Page.
*/
private void addPage(GridResultPage page) {
- queue.offer(page);
+ assert !page.isFail();
+
+ if (page.isLast() && page.rowsInPage() == 0)
+ page = createDummyLastPage(page); // Terminate.
+
+ lock.lock();
+
+ try {
+ // We can fetch the next page only when we have polled the previous one.
+ assert nextPage == null;
+
+ nextPage = page;
+
+ notEmpty.signalAll();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridResultPage poll(long timeout, TimeUnit unit) throws InterruptedException {
+ long nanos = unit.toNanos(timeout);
+
+ lock.lock();
+
+ try {
+ for (;;) {
+ if (failPage != null)
+ return failPage;
+
+ GridResultPage page = nextPage;
+
+ if (page != null) {
+ // isLast && !isDummyLast
+ nextPage = page.isLast() && page.response() != null
+ ? createDummyLastPage(page) : null; // Terminate with empty iterator.
+
+ return page;
+ }
+
+ if ((nanos = notEmpty.awaitNanos(nanos)) <= 0)
+ return null;
+ }
+ }
+ finally {
+ lock.unlock();
+ }
}
/**
@@ -262,7 +358,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
private boolean next() {
cur = null;
- iter = pollNextIterator(queue, iter);
+ iter = pollNextIterator(this, iter);
if (!iter.hasNext())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index b69c898..430a687 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -17,19 +17,24 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory;
+import org.h2.engine.Session;
import org.h2.index.Cursor;
import org.h2.index.IndexType;
import org.h2.result.Row;
import org.h2.result.SearchRow;
+import org.h2.result.SortOrder;
import org.h2.table.IndexColumn;
+import org.h2.table.TableFilter;
import org.h2.value.Value;
/**
@@ -37,7 +42,16 @@ import org.h2.value.Value;
*/
public final class GridMergeIndexUnsorted extends GridMergeIndex {
/** */
- private final BlockingQueue<GridResultPage> queue = new LinkedBlockingQueue<>();
+ private static final IndexType TYPE = IndexType.createScan(false);
+
+ /** */
+ private final PollableQueue<GridResultPage> queue = new PollableQueue<>();
+
+ /** */
+ private final AtomicInteger activeSources = new AtomicInteger(-1);
+
+ /** */
+ private Iterator<Value[]> iter = Collections.emptyIterator();
/**
* @param ctx Context.
@@ -45,7 +59,7 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
* @param name Index name.
*/
public GridMergeIndexUnsorted(GridKernalContext ctx, GridMergeTable tbl, String name) {
- super(ctx, tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns()));
+ super(ctx, tbl, name, TYPE, IndexColumn.wrap(tbl.getColumns()));
}
/**
@@ -64,10 +78,46 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
}
/** {@inheritDoc} */
+ @Override public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
+ super.setSources(nodes, segmentsCnt);
+
+ int x = nodes.size() * segmentsCnt;
+
+ assert x > 0: x;
+
+ activeSources.set(x);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean fetchedAll() {
+ int x = activeSources.get();
+
+ assert x >= 0: x; // This method must not be called if the sources were not set.
+
+ return x == 0 && queue.isEmpty();
+ }
+
+ /** {@inheritDoc} */
@Override protected void addPage0(GridResultPage page) {
assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
- queue.add(page);
+ // Do not add empty page to avoid premature stream termination.
+ if (page.rowsInPage() != 0 || page.isFail())
+ queue.add(page);
+
+ if (page.isLast()) {
+ int x = activeSources.decrementAndGet();
+
+ assert x >= 0: x;
+
+ if (x == 0) // Always terminate with empty iterator.
+ queue.add(createDummyLastPage(page));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
+ return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true);
}
/** {@inheritDoc} */
@@ -80,9 +130,6 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
@Override protected Cursor findInStream(SearchRow first, SearchRow last) {
// This index is unsorted: have to ignore bounds.
return new FetchingCursor(null, null, new Iterator<Row>() {
- /** */
- Iterator<Value[]> iter = Collections.emptyIterator();
-
@Override public boolean hasNext() {
iter = pollNextIterator(queue, iter);
@@ -98,4 +145,10 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
}
});
}
+
+ /**
+ */
+ private static class PollableQueue<X> extends LinkedBlockingQueue<X> implements Pollable<X> {
+ // No-op.
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
index 1489021..f7495c0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
@@ -18,35 +18,55 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
import java.util.ArrayList;
-import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2ScanIndex;
+import org.apache.ignite.internal.util.typedef.F;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Session;
import org.h2.index.Index;
import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.result.Row;
+import org.h2.result.SortOrder;
import org.h2.table.IndexColumn;
import org.h2.table.TableBase;
+import org.h2.table.TableFilter;
/**
* Merge table for distributed queries.
*/
public class GridMergeTable extends TableBase {
/** */
- private final GridKernalContext ctx;
-
- /** */
- private final GridMergeIndex idx;
+ private ArrayList<Index> idxs;
/**
* @param data Data.
- * @param ctx Kernal context.
*/
- public GridMergeTable(CreateTableData data, GridKernalContext ctx) {
+ public GridMergeTable(CreateTableData data) {
super(data);
+ }
+
+ /**
+ * @param idxs Indexes.
+ */
+ public void indexes(ArrayList<Index> idxs) {
+ assert !F.isEmpty(idxs);
+
+ this.idxs = idxs;
+ }
- this.ctx = ctx;
- idx = new GridMergeIndexUnsorted(ctx, this, "merge_scan");
+ /**
+ * @return Merge index.
+ */
+ public GridMergeIndex getMergeIndex() {
+ return (GridMergeIndex)idxs.get(idxs.size() - 1); // Sorted index must be the last.
+ }
+
+ /**
+ * @param idx Index.
+ * @return Scan index.
+ */
+ public static GridH2ScanIndex<GridMergeIndex> createScanIndex(GridMergeIndex idx) {
+ return new ScanIndex(idx);
}
/** {@inheritDoc} */
@@ -56,7 +76,7 @@ public class GridMergeTable extends TableBase {
/** {@inheritDoc} */
@Override public void close(Session ses) {
- idx.close(ses);
+ // No-op.
}
/** {@inheritDoc} */
@@ -96,8 +116,8 @@ public class GridMergeTable extends TableBase {
}
/** {@inheritDoc} */
- @Override public GridMergeIndex getScanIndex(Session session) {
- return idx;
+ @Override public Index getScanIndex(Session session) {
+ return idxs.get(0); // Must be always at 0.
}
/** {@inheritDoc} */
@@ -107,7 +127,7 @@ public class GridMergeTable extends TableBase {
/** {@inheritDoc} */
@Override public ArrayList<Index> getIndexes() {
- return null;
+ return idxs;
}
/** {@inheritDoc} */
@@ -137,12 +157,12 @@ public class GridMergeTable extends TableBase {
/** {@inheritDoc} */
@Override public long getRowCount(Session ses) {
- return idx.getRowCount(ses);
+ return getScanIndex(ses).getRowCount(ses);
}
/** {@inheritDoc} */
@Override public long getRowCountApproximation() {
- return idx.getRowCountApproximation();
+ return getScanIndex(null).getRowCountApproximation();
}
/** {@inheritDoc} */
@@ -154,4 +174,24 @@ public class GridMergeTable extends TableBase {
@Override public void checkRename() {
throw DbException.getUnsupportedException("rename");
}
+
+ /**
+ * Scan index wrapper.
+ */
+ private static class ScanIndex extends GridH2ScanIndex<GridMergeIndex> {
+ /**
+ * @param delegate Delegate.
+ */
+ public ScanIndex(GridMergeIndex delegate) {
+ super(delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double getCost(Session session, int[] masks, TableFilter[] filters, int filter,
+ SortOrder sortOrder) {
+ long rows = getRowCountApproximation();
+
+ return getCostRangeIndex(masks, rows, filters, filter, sortOrder, true);
+ }
+ }
}
\ No newline at end of file