You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/01/12 08:01:37 UTC
[11/24] ignite git commit: ignite-gg-11414 Add iterator wrapper for
cursor
ignite-gg-11414 Add iterator wrapper for cursor
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1c15a6d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1c15a6d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1c15a6d
Branch: refs/heads/ignite-gg-11810
Commit: c1c15a6db0bd84de49c0f828cdeab1aed4bcd863
Parents: 8852c80
Author: Dmitriy Govorukhin <dg...@grigain.com>
Authored: Tue Jan 3 19:59:44 2017 +0300
Committer: Dmitriy Govorukhin <dg...@grigain.com>
Committed: Tue Jan 3 19:59:44 2017 +0300
----------------------------------------------------------------------
.../util/GridCursorIteratorWrapper.java | 8 +
.../ignite/internal/util/lang/GridCursor.java | 1 -
.../query/h2/database/H2PkHashIndex.java | 30 ++--
.../query/h2/database/H2TreeIndex.java | 5 +-
.../query/h2/opt/GridH2IndexBase.java | 163 ++++++++++++++-----
.../query/h2/opt/GridH2TreeIndex.java | 12 +-
.../IgniteDistributedJoinTestSuite.java | 38 +++++
7 files changed, 193 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java
index bd30ace..927e365 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java
@@ -9,17 +9,25 @@ import java.util.*;
* Wrap {@code Iterator} and adapt it to {@code GridCursor}.
*/
public class GridCursorIteratorWrapper<V> implements GridCursor<V> {
+ /** Iterator. */
private Iterator<V> iter;
+
+ /** Next. */
private V next;
+ /**
+ * @param iter Iterator.
+ */
public GridCursorIteratorWrapper(Iterator<V> iter) {
this.iter = iter;
}
+ /** {@inheritDoc} */
@Override public V get() throws IgniteCheckedException {
return next;
}
+ /** {@inheritDoc} */
@Override public boolean next() throws IgniteCheckedException {
next = iter.hasNext() ? iter.next() : null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java
index da85f99..37d3a48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.util.lang;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.database.*;
/**
* Simple cursor abstraction. Initial state must be "before first".
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index b0647b6..eb34be6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -101,9 +101,8 @@ public class H2PkHashIndex extends GridH2IndexBase {
try {
List<GridCursor<? extends CacheDataRow>> cursors = new ArrayList<>();
- for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) {
+ for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores())
cursors.add(store.cursor(lowerObj, upperObj));
- }
return new H2Cursor(new CompositeGridCursor<>(cursors.iterator()), p);
}
@@ -123,9 +122,8 @@ public class H2PkHashIndex extends GridH2IndexBase {
for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) {
CacheDataRow found = store.find(row.key);
- if (found != null) {
+ if (found != null)
tbl.rowDescriptor().createRow(row.key(), row.partition(), row.value(), row.version(), 0);
- }
}
return null;
@@ -191,7 +189,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
}
/** {@inheritDoc} */
- @Override public Cursor findFirstOrLast(Session session, boolean b) {
+ @Override public Cursor findFirstOrLast(Session ses, boolean b) {
throw new UnsupportedOperationException();
}
@@ -283,30 +281,30 @@ public class H2PkHashIndex extends GridH2IndexBase {
*/
private static class CompositeGridCursor<T> implements GridCursor<T> {
/** */
- private final Iterator<GridCursor<? extends T>> iterator;
+ private final Iterator<GridCursor<? extends T>> iter;
/** */
- private GridCursor<? extends T> current;
+ private GridCursor<? extends T> curr;
/**
*
*/
- public CompositeGridCursor(Iterator<GridCursor<? extends T>> iterator) {
- this.iterator = iterator;
+ public CompositeGridCursor(Iterator<GridCursor<? extends T>> iter) {
+ this.iter = iter;
- if (iterator.hasNext())
- current = iterator.next();
+ if (iter.hasNext())
+ curr = iter.next();
}
/** {@inheritDoc} */
@Override public boolean next() throws IgniteCheckedException {
- if (current.next())
+ if (curr.next())
return true;
- while (iterator.hasNext()) {
- current = iterator.next();
+ while (iter.hasNext()) {
+ curr = iter.next();
- if (current.next())
+ if (curr.next())
return true;
}
@@ -315,7 +313,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
/** {@inheritDoc} */
@Override public T get() throws IgniteCheckedException {
- return current.get();
+ return curr.get();
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index c1eb986..31df27c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -219,11 +219,14 @@ public class H2TreeIndex extends GridH2IndexBase {
return tree;
}
+ /** {@inheritDoc} */
protected IgniteTree<SearchRow, GridH2Row> treeForRead() {
return tree;
}
- protected GridCursor<GridH2Row> doFind0(IgniteTree t,
+ /** {@inheritDoc} */
+ protected GridCursor<GridH2Row> doFind0(
+ IgniteTree t,
@Nullable SearchRow first,
boolean includeFirst,
@Nullable SearchRow last,
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index bd569df..c5e836a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.h2.opt;
import java.util.*;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -487,6 +486,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
maxRows -= range.rows().size();
}
+ assert !ranges.isEmpty();
+
if (src.hasMoreRows()) {
// Save source for future fetches.
if (msg.bounds() != null)
@@ -497,8 +498,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
qctx.putSource(node.id(), msg.batchLookupId(), null);
}
- assert !ranges.isEmpty();
-
res.ranges(ranges);
res.status(STATUS_OK);
}
@@ -1359,14 +1358,14 @@ public abstract class GridH2IndexBase extends BaseIndex {
int curRangeId = -1;
/** */
- GridCursor<GridH2Row> curRange = EMPTY_CURSOR;
-
- /** */
final IgniteTree tree;
/** */
final IndexingQueryFilter filter;
+ /** Iterator. */
+ Iterator<GridH2Row> iter = emptyIterator();
+
/**
* @param bounds Bounds.
* @param tree Snapshot.
@@ -1386,7 +1385,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @return {@code true} If there are more rows in this source.
*/
public boolean hasMoreRows() throws IgniteCheckedException {
- return boundsIter.hasNext() || curRange.next();
+ return boundsIter.hasNext() || iter.hasNext();
}
/**
@@ -1396,60 +1395,56 @@ public abstract class GridH2IndexBase extends BaseIndex {
public GridH2RowRange next(int maxRows) {
assert maxRows > 0 : maxRows;
- try {
- for (;;) {
- if (curRange.next()) {
- // Here we are getting last rows from previously partially fetched range.
- List<GridH2RowMessage> rows = new ArrayList<>();
+ for (; ; ) {
+ if (iter.hasNext()) {
+ // Here we are getting last rows from previously partially fetched range.
+ List<GridH2RowMessage> rows = new ArrayList<>();
- GridH2RowRange nextRange = new GridH2RowRange();
+ GridH2RowRange nextRange = new GridH2RowRange();
- nextRange.rangeId(curRangeId);
- nextRange.rows(rows);
+ nextRange.rangeId(curRangeId);
+ nextRange.rows(rows);
- do {
- rows.add(toRowMessage(curRange.get()));
- }
- while (rows.size() < maxRows && curRange.next());
+ do {
+ rows.add(toRowMessage(iter.next()));
+ }
+ while (rows.size() < maxRows && iter.hasNext());
- if (curRange.next())
- nextRange.setPartial();
- else
- curRange = EMPTY_CURSOR;
+ if (iter.hasNext())
+ nextRange.setPartial();
+ else
+ iter = emptyIterator();
- return nextRange;
- }
+ return nextRange;
+ }
- curRange = EMPTY_CURSOR;
+ iter = emptyIterator();
- if (!boundsIter.hasNext()) {
- boundsIter = emptyIterator();
+ if (!boundsIter.hasNext()) {
+ boundsIter = emptyIterator();
- return null;
- }
+ return null;
+ }
- GridH2RowRangeBounds bounds = boundsIter.next();
+ GridH2RowRangeBounds bounds = boundsIter.next();
- curRangeId = bounds.rangeId();
+ curRangeId = bounds.rangeId();
- SearchRow first = toSearchRow(bounds.first());
- SearchRow last = toSearchRow(bounds.last());
+ SearchRow first = toSearchRow(bounds.first());
+ SearchRow last = toSearchRow(bounds.last());
- IgniteTree t = tree != null ? tree : treeForRead();
+ IgniteTree t = tree != null ? tree : treeForRead();
- curRange = doFind0(t, first, true, last, filter);
+ iter = new CursorIteratorWrapper(doFind0(t, first, true, last, filter));
- if (!curRange.next()) {
- // We have to return empty range here.
- GridH2RowRange emptyRange = new GridH2RowRange();
+ if (!iter.hasNext()) {
+ // We have to return empty range here.
+ GridH2RowRange emptyRange = new GridH2RowRange();
- emptyRange.rangeId(curRangeId);
+ emptyRange.rangeId(curRangeId);
- return emptyRange;
- }
+ return emptyRange;
}
- } catch (IgniteCheckedException e) {
- throw DbException.convert(e);
}
}
}
@@ -1469,7 +1464,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
* @param filter Filter.
* @return Iterator over rows in given range.
*/
- protected GridCursor<GridH2Row> doFind0(IgniteTree t,
+ protected GridCursor<GridH2Row> doFind0(
+ IgniteTree t,
@Nullable SearchRow first,
boolean includeFirst,
@Nullable SearchRow last,
@@ -1492,6 +1488,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
/** Is value required for filtering predicate? */
private final boolean isValRequired;
+ /** */
private GridH2Row next;
/**
@@ -1540,6 +1537,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
return fltr.apply(key, val);
}
+ /** {@inheritDoc} */
@Override public boolean next() throws IgniteCheckedException {
next = null;
@@ -1555,6 +1553,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
return false;
}
+ /** {@inheritDoc} */
@Override public GridH2Row get() throws IgniteCheckedException {
if (next == null)
throw new NoSuchElementException();
@@ -1563,6 +1562,80 @@ public abstract class GridH2IndexBase extends BaseIndex {
}
}
+ /**
+ *
+ */
+ private static final class CursorIteratorWrapper implements Iterator<GridH2Row> {
+ /** */
+ private final GridCursor<GridH2Row> cursor;
+
+ /** First next. */
+ private GridH2Row firstNext;
+
+ /** Second next. */
+ private GridH2Row secondNext;
+
+ /**
+ * @param cursor Cursor.
+ */
+ private CursorIteratorWrapper(GridCursor<GridH2Row> cursor) {
+ this.cursor = cursor;
+
+ fetch();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return firstNext != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridH2Row next() {
+ try {
+ if (firstNext != null) {
+ GridH2Row res = firstNext;
+
+ firstNext = secondNext;
+
+ if (cursor.next())
+ secondNext = cursor.get();
+ else
+ secondNext = null;
+ return res;
+ }
+ else
+ return null;
+ }
+ catch (Exception e) {
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private void fetch() {
+ try {
+ if (firstNext == null && secondNext == null) {
+ if (cursor.next()) {
+ firstNext = cursor.get();
+
+ if (cursor.next())
+ secondNext = cursor.get();
+ }
+ }
+ }
+ catch (IgniteCheckedException ignored) {
+
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException("operation is not supported");
+ }
+ }
+
/** Empty cursor. */
protected static final GridCursor<GridH2Row> EMPTY_CURSOR = new GridCursor<GridH2Row>() {
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 002b432..014cf2e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -503,20 +503,27 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
* Adapter from {@link NavigableMap} to {@link IgniteTree}.
*/
private final class IgniteNavigableMapTree implements IgniteTree<GridSearchRowPointer, GridH2Row> {
- private NavigableMap<GridSearchRowPointer, GridH2Row> tree;
+ /** Tree. */
+ private final NavigableMap<GridSearchRowPointer, GridH2Row> tree;
+ /**
+ * @param tree Tree.
+ */
public IgniteNavigableMapTree(NavigableMap<GridSearchRowPointer, GridH2Row> tree) {
this.tree = tree;
}
+ /** {@inheritDoc} */
@Override public GridH2Row put(GridH2Row value) throws IgniteCheckedException {
return tree.put(value, value);
}
+ /** {@inheritDoc} */
@Override public GridH2Row findOne(GridSearchRowPointer key) throws IgniteCheckedException {
return tree.get(key);
}
+ /** {@inheritDoc} */
@Override public GridCursor<GridH2Row> find(GridSearchRowPointer lower, GridSearchRowPointer upper)
throws IgniteCheckedException {
if (lower == null || upper == null)
@@ -528,14 +535,17 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
return new GridCursorIteratorWrapper<GridH2Row>(subMap.values().iterator());
}
+ /** {@inheritDoc} */
@Override public GridH2Row remove(GridSearchRowPointer key) throws IgniteCheckedException {
return tree.remove(key);
}
+ /** {@inheritDoc} */
@Override public long size() throws IgniteCheckedException {
return tree.size();
}
+ /** {@inheritDoc} */
@Override public IgniteNavigableMapTree clone() {
AbstractMap copy;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java
new file mode 100644
index 0000000..dca640f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java
@@ -0,0 +1,38 @@
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinCollocatedAndNotTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinCustomAffinityMapper;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinNoIndexTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinPartitionedAndReplicatedTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinQueryConditionsTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartDistributedJoinSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlDistributedJoinSelfTest;
+import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistributedJoinsTest;
+
+/**
+ *
+ */
+public class IgniteDistributedJoinTestSuite extends TestSuite {
+ /**
+ *
+ */
+ public static TestSuite suite() {
+ TestSuite suite = new TestSuite("Distributed Joins Test Suite.");
+
+ suite.addTestSuite(H2CompareBigQueryDistributedJoinsTest.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinCollocatedAndNotTest.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinCustomAffinityMapper.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinNoIndexTest.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinPartitionedAndReplicatedTest.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinQueryConditionsTest.class);
+ suite.addTestSuite(IgniteCacheDistributedJoinTest.class);
+ suite.addTestSuite(IgniteCacheQueryNodeRestartDistributedJoinSelfTest.class);
+ suite.addTestSuite(IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.class);
+ suite.addTestSuite(IgniteSqlDistributedJoinSelfTest.class);
+
+ return suite;
+ }
+}