You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2014/12/19 13:12:49 UTC
[6/8] incubator-ignite git commit: ignite-gg9499 - fixes
ignite-gg9499 - fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1226b240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1226b240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1226b240
Branch: refs/heads/ignite-gg9499
Commit: 1226b2401b58e0e75d80e01787c0cee39ce5aeee
Parents: 29823e6
Author: S.Vladykin <sv...@gridgain.com>
Authored: Fri Dec 19 15:09:04 2014 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Fri Dec 19 15:09:04 2014 +0300
----------------------------------------------------------------------
.../query/h2/twostep/GridMapQueryExecutor.java | 11 +-
.../query/h2/twostep/GridMergeIndex.java | 116 ++++++++++++++-----
.../h2/twostep/GridMergeIndexUnsorted.java | 66 ++++++-----
.../query/h2/twostep/GridMergeTable.java | 4 +-
.../h2/twostep/GridReduceQueryExecutor.java | 34 +++---
.../query/h2/twostep/GridResultPage.java | 53 +++------
.../twostep/messages/GridNextPageResponse.java | 29 ++++-
.../cache/GridCacheCrossCacheQuerySelfTest.java | 34 +++++-
8 files changed, 240 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
index b86caf1..92d1875 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -211,19 +211,26 @@ public class GridMapQueryExecutor {
assert res != null;
+ boolean last = false;
+
synchronized (res) {
page = qr.pages[qry]++;
for (int i = 0 ; i < pageSize; i++) {
- if (!res.next())
+ if (!res.next()) {
+ last = true;
+
break;
+ }
rows.add(res.currentRow());
}
}
try {
- ctx.io().sendUserMessage(F.asList(node), new GridNextPageResponse(qr.qryReqId, qry, page, allRows, rows));
+ ctx.io().sendUserMessage(F.asList(node),
+ new GridNextPageResponse(qr.qryReqId, qry, page, allRows, last, rows),
+ GridTopic.TOPIC_QUERY, false, 0);
}
catch (IgniteCheckedException e) {
log.error("Failed to send message.", e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
index 16ba15d..5d51b32 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
@@ -10,11 +10,14 @@
package org.gridgain.grid.kernal.processors.query.h2.twostep;
import org.apache.ignite.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
import org.h2.engine.*;
import org.h2.index.*;
import org.h2.message.*;
import org.h2.result.*;
import org.h2.table.*;
+import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -25,16 +28,32 @@ import java.util.concurrent.atomic.*;
*/
public abstract class GridMergeIndex extends BaseIndex {
/** */
+ protected final GridResultPage<?> END = new GridResultPage<Object>(null, null);
+
+ /** */
private static final int MAX_FETCH_SIZE = 100000;
/** */
private final AtomicInteger cnt = new AtomicInteger(0);
+ /** Result sources. */
+ private final AtomicInteger srcs = new AtomicInteger(0);
+
/**
* Will be r/w from query execution thread only, does not need to be threadsafe.
*/
private ArrayList<Row> fetched = new ArrayList<>();
+ /**
+ * @param tbl Table.
+ * @param name Index name.
+ * @param type Type.
+ * @param cols Columns.
+ */
+ public GridMergeIndex(GridMergeTable tbl, String name, IndexType type, IndexColumn[] cols) {
+ initBaseIndex(tbl, 0, name, cols, type);
+ }
+
/** {@inheritDoc} */
@Override public long getRowCount(Session session) {
return cnt.get();
@@ -46,6 +65,13 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/**
+ * @param srcs Number of sources.
+ */
+ public void setNumberOfSources(int srcs) {
+ this.srcs.set(srcs);
+ }
+
+ /**
* @param cnt Count.
*/
public void addCount(int cnt) {
@@ -55,7 +81,26 @@ public abstract class GridMergeIndex extends BaseIndex {
/**
* @param page Page.
*/
- public abstract void addPage(GridResultPage<?> page);
+ public final void addPage(GridResultPage<?> page) {
+ if (!page.response().rows().isEmpty())
+ addPage0(page);
+ else
+ assert page.response().isLast();
+
+ if (page.response().isLast()) {
+ int srcs0 = srcs.decrementAndGet();
+
+ assert srcs0 >= 0;
+
+ if (srcs0 == 0)
+ addPage0(END); // We've fetched all.
+ }
+ }
+
+ /**
+ * @param page Page.
+ */
+ protected abstract void addPage0(GridResultPage<?> page);
/** {@inheritDoc} */
@Override public Cursor find(Session session, SearchRow first, SearchRow last) {
@@ -145,7 +190,7 @@ public abstract class GridMergeIndex extends BaseIndex {
*/
protected class IteratorCursor implements Cursor {
/** */
- private Iterator<Row> iter;
+ protected Iterator<Row> iter;
/** */
protected Row cur;
@@ -185,47 +230,66 @@ public abstract class GridMergeIndex extends BaseIndex {
/**
* Fetching cursor.
*/
- protected abstract class FetchingCursor extends IteratorCursor {
+ protected class FetchingCursor extends IteratorCursor {
/** */
- private boolean canFetch = true;
+ private Iterator<Row> stream;
/**
*/
- public FetchingCursor() {
- super(fetched == null ? Collections.<Row>emptyIterator() : fetched.iterator());
- }
+ public FetchingCursor(Iterator<Row> stream) {
+ super(new FetchedIterator());
- /**
- * @return Next row or {@code null} if none available.
- */
- @Nullable protected abstract Row fetchNext();
+ assert stream != null;
+
+ this.stream = stream;
+ }
/** {@inheritDoc} */
@Override public boolean next() {
- if (super.next())
+ if (super.next()) {
+ assert cur != null;
+
+ if (iter == stream && fetched != null) { // Cache fetched rows for reuse.
+ if (fetched.size() == MAX_FETCH_SIZE)
+ fetched = null; // Throw away fetched result if it is too large.
+ else
+ fetched.add(cur);
+ }
+
+ X.println("__ row: " + Arrays.toString(cur.getValueList()));
+
return true;
+ }
- if (!canFetch)
+ if (iter == stream) // We've fetched the stream.
return false;
- cur = fetchNext();
-
- if (cur == null) { // No more results to fetch.
- assert fetched == null || fetched.size() == cnt.get() : fetched.size() + " <> " + cnt.get();
+ iter = stream; // Switch from cached to stream.
- canFetch = false;
+ return next();
+ }
+ }
- return false;
- }
+ /**
+ * List iterator without {@link ConcurrentModificationException}.
+ */
+ private class FetchedIterator implements Iterator<Row> {
+ /** */
+ private int idx;
- if (fetched != null) { // Try to reuse fetched result.
- fetched.add(cur);
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return fetched != null && idx < fetched.size();
+ }
- if (fetched.size() == MAX_FETCH_SIZE)
- fetched = null; // Throw away fetched result if it is too large.
- }
+ /** {@inheritDoc} */
+ @Override public Row next() {
+ return fetched.get(idx++);
+ }
- return true;
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index c6aaea9..b93c6ec 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -10,8 +10,10 @@
package org.gridgain.grid.kernal.processors.query.h2.twostep;
import org.apache.ignite.*;
+import org.gridgain.grid.util.typedef.*;
import org.h2.index.*;
import org.h2.result.*;
+import org.h2.table.*;
import org.h2.value.*;
import org.jetbrains.annotations.*;
@@ -25,50 +27,60 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
/** */
private final BlockingQueue<GridResultPage<?>> queue = new LinkedBlockingQueue<>();
+ /**
+ * @param tbl Table.
+ * @param name Index name.
+ */
+ public GridMergeIndexUnsorted(GridMergeTable tbl, String name) {
+ super(tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns()));
+ }
+
/** {@inheritDoc} */
- @Override public void addPage(GridResultPage<?> page) {
+ @Override public void addPage0(GridResultPage<?> page) {
queue.add(page);
}
/** {@inheritDoc} */
@Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
- final GridResultPage<?> p = queue.poll();
-
- assert p != null; // First page must be already fetched.
+ return new FetchingCursor(new Iterator<Row>() {
+ /** */
+ Iterator<Value[]> iter = Collections.emptyIterator();
- if (p.isEmpty())
- return new IteratorCursor(Collections.<Row>emptyIterator());
+ @Override public boolean hasNext() {
+ if (iter.hasNext())
+ return true;
- p.fetchNextPage(); // We always request next page before reading this one.
+ GridResultPage<?> page;
- return new FetchingCursor() {
- /** */
- Iterator<Value[]> iter = p.rows().iterator();
+ try {
+ page = queue.take();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException("Query execution was interrupted.", e);
+ }
- @Nullable @Override protected Row fetchNext() {
- if (!iter.hasNext()) {
- GridResultPage<?> page;
+ if (page == END) {
+ assert queue.isEmpty() : "It must be the last page: " + queue;
- try {
- page = queue.take();
- }
- catch (InterruptedException e) {
- throw new IgniteException("Query execution was interrupted.", e);
- }
+ return false; // We are done.
+ }
- if (page.isEmpty()) {
- assert queue.isEmpty() : "It must be the last page.";
+ page.fetchNextPage();
- return null; // Empty page - we are done.
- }
+ iter = page.response().rows().iterator();
- page.fetchNextPage();
+ assert iter.hasNext();
- iter = page.rows().iterator();
- }
+ return true;
+ }
+ @Override public Row next() {
return new Row(iter.next(), 0);
}
- };
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
index a1f2213..683bb54 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
@@ -27,7 +27,7 @@ public class GridMergeTable extends TableBase {
private final ArrayList<Index> idxs = new ArrayList<>(1);
/** */
- private final GridMergeIndex idx = new GridMergeIndexUnsorted();
+ private final GridMergeIndex idx;
/**
* @param data Data.
@@ -35,6 +35,8 @@ public class GridMergeTable extends TableBase {
public GridMergeTable(CreateTableData data) {
super(data);
+ idx = new GridMergeIndexUnsorted(this, "merge_scan");
+
idxs.add(idx);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3e7e12c..a12b4f9 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -59,8 +59,8 @@ public class GridReduceQueryExecutor {
// TODO handle node failure.
- ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
+ ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
assert msg != null;
ClusterNode node = ctx.discovery().node(nodeId);
@@ -69,6 +69,8 @@ public class GridReduceQueryExecutor {
onNextPage(node, (GridNextPageResponse)msg);
else if (msg instanceof GridQueryFailResponse)
onFail(node, (GridQueryFailResponse)msg);
+
+ return true;
}
});
}
@@ -86,7 +88,13 @@ public class GridReduceQueryExecutor {
GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
- idx.addPage(new GridResultPage<Object>(node.id(), msg.query(), msg.rows()) {
+ if (msg.allRows() != -1) { // Only the first page contains row count.
+ idx.addCount(msg.allRows());
+
+ r.latch.countDown();
+ }
+
+ idx.addPage(new GridResultPage<UUID>(node.id(), msg) {
@Override public void fetchNextPage() {
try {
ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize));
@@ -96,12 +104,6 @@ public class GridReduceQueryExecutor {
}
}
});
-
- if (msg.allRows() != -1) { // Only the first page contains row count.
- idx.addCount(msg.allRows());
-
- r.latch.countDown();
- }
}
public IgniteFuture<GridCacheSqlResult> query(GridCacheTwoStepQuery qry) {
@@ -118,17 +120,23 @@ public class GridReduceQueryExecutor {
throw new IgniteException(e);
}
- for (GridCacheSqlQuery mapQry : qry.mapQueries())
- r.tbls.add(createTable(r.conn, mapQry));
-
Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow?
+ for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+ GridMergeTable tbl = createTable(r.conn, mapQry);
+
+ tbl.getScanIndex(null).setNumberOfSources(nodes.size());
+
+ r.tbls.add(tbl);
+ }
+
r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
this.runs.put(qryReqId, r);
try {
- ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries())); // TODO conf page size
+ ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries()), // TODO conf page size
+ GridTopic.TOPIC_QUERY, false, 0);
r.latch.await();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
index 6c6ab6a..ef52805 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
@@ -9,68 +9,51 @@
package org.gridgain.grid.kernal.processors.query.h2.twostep;
-import org.h2.result.*;
-import org.h2.value.*;
-
-import java.util.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*;
+import org.gridgain.grid.util.typedef.internal.*;
/**
* Page result.
*/
-public abstract class GridResultPage<S> {
- /** */
- private final S src;
-
+public class GridResultPage<Z> {
/** */
- private final Collection<Value[]> rows;
+ private final Z src;
/** */
- private final int page;
+ private final GridNextPageResponse res;
/**
* @param src Source.
- * @param page Page.
- * @param rows Page rows.
+ * @param res Response.
*/
- protected GridResultPage(S src, int page, Collection<Value[]> rows) {
- assert src != null;
- assert rows != null;
-
+ protected GridResultPage(Z src, GridNextPageResponse res) {
this.src = src;
- this.page = page;
- this.rows = rows;
+ this.res = res;
}
/**
* @return Result source.
*/
- public S source() {
+ public Z source() {
return src;
}
/**
- * @return Page.
+ * @return Response.
*/
- public int page() {
- return page;
+ public GridNextPageResponse response() {
+ return res;
}
/**
- * @return {@code true} If result is empty.
+ * Request next page.
*/
- public boolean isEmpty() {
- return rows.isEmpty();
+ public void fetchNextPage() {
+ throw new UnsupportedOperationException();
}
- /**
- * @return Page rows.
- */
- public Collection<Value[]> rows() {
- return rows;
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridResultPage.class, this);
}
-
- /**
- * Request next page.
- */
- public abstract void fetchNextPage();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
index de38172..0834f4c 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
@@ -9,6 +9,7 @@
package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+import org.gridgain.grid.util.typedef.internal.*;
import org.h2.store.*;
import org.h2.value.*;
@@ -34,20 +35,32 @@ public class GridNextPageResponse implements Externalizable {
/** */
private Collection<Value[]> rows;
+ /** */
+ private boolean last;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public GridNextPageResponse() {
+ // No-op.
+ }
+
/**
* @param qryReqId Query request ID.
* @param qry Query.
* @param page Page.
* @param allRows All rows count.
+ * @param last Last row.
* @param rows Rows.
*/
- public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, Collection<Value[]> rows) {
+ public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, boolean last, Collection<Value[]> rows) {
assert rows != null;
this.qryReqId = qryReqId;
this.qry = qry;
this.page = page;
this.allRows = allRows;
+ this.last = last;
this.rows = rows;
}
@@ -80,6 +93,13 @@ public class GridNextPageResponse implements Externalizable {
}
/**
+ * @return {@code true} If this is the last page.
+ */
+ public boolean isLast() {
+ return last;
+ }
+
+ /**
* @return Rows.
*/
public Collection<Value[]> rows() {
@@ -91,6 +111,7 @@ public class GridNextPageResponse implements Externalizable {
out.writeLong(qryReqId);
out.writeInt(qry);
out.writeInt(page);
+ out.writeBoolean(last);
out.writeInt(allRows);
out.writeInt(rows.size());
@@ -122,6 +143,7 @@ public class GridNextPageResponse implements Externalizable {
qryReqId = in.readLong();
qry = in.readInt();
page = in.readInt();
+ last = in.readBoolean();
allRows = in.readInt();
int rowCnt = in.readInt();
@@ -146,4 +168,9 @@ public class GridNextPageResponse implements Externalizable {
}
}
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNextPageResponse.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1226b240/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index bc9dcf8..90a2d4c 100644
--- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -18,6 +18,8 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.query.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
+import org.gridgain.grid.util.typedef.*;
import org.gridgain.testframework.junits.common.*;
import java.util.*;
@@ -88,11 +90,37 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
return cc;
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTwoStep() throws Exception {
+ fillCaches();
+
+ GridCacheTwoStepQuery q = new GridCacheTwoStepQuery("select * from _cnts_");
+
+ q.addMapQuery("_cnts_", "select count(*) cnt from \"partitioned\".FactPurchase");
+
+ GridCacheQueriesEx<Integer, FactPurchase> qx =
+ (GridCacheQueriesEx<Integer, FactPurchase>)ignite.<Integer, FactPurchase>cache("partitioned").queries();
+
+ for (List<?> row : qx.execute(q).get())
+ X.println("__ " + row);
+
+
+
+// Object cnt = .next().get(0);
+//
+// assertEquals(10L, cnt);
+ }
+
/** @throws Exception If failed. */
public void testOnProjection() throws Exception {
+ fillCaches();
+
GridCacheProjection<Integer, FactPurchase> prj = ignite.<Integer, FactPurchase>cache("partitioned").projection(
new IgnitePredicate<GridCacheEntry<Integer, FactPurchase>>() {
- @Override public boolean apply(GridCacheEntry<Integer, FactPurchase> e) {
+ @Override
+ public boolean apply(GridCacheEntry<Integer, FactPurchase> e) {
return e.getKey() > 12;
}
});
@@ -105,7 +133,9 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
/**
* @throws IgniteCheckedException If failed.
*/
- private void fillCaches() throws IgniteCheckedException {
+ private void fillCaches() throws IgniteCheckedException, InterruptedException {
+ awaitPartitionMapExchange();
+
int idGen = 0;
GridCache<Integer, Object> dimCache = ignite.cache("replicated");