You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/01/12 08:01:37 UTC

[11/24] ignite git commit: ignite-gg-11414 Add iterator wrapper for cursor

ignite-gg-11414 Add iterator wrapper for cursor


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1c15a6d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1c15a6d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1c15a6d

Branch: refs/heads/ignite-gg-11810
Commit: c1c15a6db0bd84de49c0f828cdeab1aed4bcd863
Parents: 8852c80
Author: Dmitriy Govorukhin <dg...@grigain.com>
Authored: Tue Jan 3 19:59:44 2017 +0300
Committer: Dmitriy Govorukhin <dg...@grigain.com>
Committed: Tue Jan 3 19:59:44 2017 +0300

----------------------------------------------------------------------
 .../util/GridCursorIteratorWrapper.java         |   8 +
 .../ignite/internal/util/lang/GridCursor.java   |   1 -
 .../query/h2/database/H2PkHashIndex.java        |  30 ++--
 .../query/h2/database/H2TreeIndex.java          |   5 +-
 .../query/h2/opt/GridH2IndexBase.java           | 163 ++++++++++++++-----
 .../query/h2/opt/GridH2TreeIndex.java           |  12 +-
 .../IgniteDistributedJoinTestSuite.java         |  38 +++++
 7 files changed, 193 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java
index bd30ace..927e365 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCursorIteratorWrapper.java
@@ -9,17 +9,25 @@ import java.util.*;
  * Wrap {@code Iterator} and adapt it to {@code GridCursor}.
  */
 public class GridCursorIteratorWrapper<V> implements GridCursor<V> {
+    /** Iterator. */
     private Iterator<V> iter;
+
+    /** Next. */
     private V next;
 
+    /**
+     * @param iter Iterator.
+     */
     public GridCursorIteratorWrapper(Iterator<V> iter) {
         this.iter = iter;
     }
 
+    /** {@inheritDoc} */
     @Override public V get() throws IgniteCheckedException {
         return next;
     }
 
+    /** {@inheritDoc} */
     @Override public boolean next() throws IgniteCheckedException {
         next = iter.hasNext() ? iter.next() : null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java
index da85f99..37d3a48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridCursor.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.util.lang;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.database.*;
 
 /**
  * Simple cursor abstraction. Initial state must be "before first".

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index b0647b6..eb34be6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -101,9 +101,8 @@ public class H2PkHashIndex extends GridH2IndexBase {
         try {
             List<GridCursor<? extends CacheDataRow>> cursors = new ArrayList<>();
 
-            for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) {
+            for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores())
                 cursors.add(store.cursor(lowerObj, upperObj));
-            }
 
             return new H2Cursor(new CompositeGridCursor<>(cursors.iterator()), p);
         }
@@ -123,9 +122,8 @@ public class H2PkHashIndex extends GridH2IndexBase {
             for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) {
                 CacheDataRow found = store.find(row.key);
 
-                if (found != null) {
+                if (found != null)
                     tbl.rowDescriptor().createRow(row.key(), row.partition(), row.value(), row.version(), 0);
-                }
             }
 
             return null;
@@ -191,7 +189,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
     }
 
     /** {@inheritDoc} */
-    @Override public Cursor findFirstOrLast(Session session, boolean b) {
+    @Override public Cursor findFirstOrLast(Session ses, boolean b) {
         throw new UnsupportedOperationException();
     }
 
@@ -283,30 +281,30 @@ public class H2PkHashIndex extends GridH2IndexBase {
      */
     private static class CompositeGridCursor<T> implements GridCursor<T> {
         /** */
-        private final Iterator<GridCursor<? extends T>> iterator;
+        private final Iterator<GridCursor<? extends T>> iter;
 
         /** */
-        private GridCursor<? extends T> current;
+        private GridCursor<? extends T> curr;
 
         /**
          *
          */
-        public CompositeGridCursor(Iterator<GridCursor<? extends T>> iterator) {
-            this.iterator = iterator;
+        public CompositeGridCursor(Iterator<GridCursor<? extends T>> iter) {
+            this.iter = iter;
 
-            if (iterator.hasNext())
-                current = iterator.next();
+            if (iter.hasNext())
+                curr = iter.next();
         }
 
         /** {@inheritDoc} */
         @Override public boolean next() throws IgniteCheckedException {
-            if (current.next())
+            if (curr.next())
                 return true;
 
-            while (iterator.hasNext()) {
-                current = iterator.next();
+            while (iter.hasNext()) {
+                curr = iter.next();
 
-                if (current.next())
+                if (curr.next())
                     return true;
             }
 
@@ -315,7 +313,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
 
         /** {@inheritDoc} */
         @Override public T get() throws IgniteCheckedException {
-            return current.get();
+            return curr.get();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index c1eb986..31df27c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -219,11 +219,14 @@ public class H2TreeIndex extends GridH2IndexBase {
         return tree;
     }
 
+    /** {@inheritDoc} */
     protected IgniteTree<SearchRow, GridH2Row> treeForRead() {
         return tree;
     }
 
-    protected GridCursor<GridH2Row> doFind0(IgniteTree t,
+    /** {@inheritDoc} */
+    protected GridCursor<GridH2Row> doFind0(
+        IgniteTree t,
         @Nullable SearchRow first,
         boolean includeFirst,
         @Nullable SearchRow last,

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index bd569df..c5e836a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.h2.opt;
 
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -487,6 +486,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
                         maxRows -= range.rows().size();
                 }
 
+                assert !ranges.isEmpty();
+
                 if (src.hasMoreRows()) {
                     // Save source for future fetches.
                     if (msg.bounds() != null)
@@ -497,8 +498,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
                     qctx.putSource(node.id(), msg.batchLookupId(), null);
                 }
 
-                assert !ranges.isEmpty();
-
                 res.ranges(ranges);
                 res.status(STATUS_OK);
             }
@@ -1359,14 +1358,14 @@ public abstract class GridH2IndexBase extends BaseIndex {
         int curRangeId = -1;
 
         /** */
-        GridCursor<GridH2Row> curRange = EMPTY_CURSOR;
-
-        /** */
         final IgniteTree tree;
 
         /** */
         final IndexingQueryFilter filter;
 
+        /** Iterator. */
+        Iterator<GridH2Row> iter = emptyIterator();
+
         /**
          * @param bounds Bounds.
          * @param tree Snapshot.
@@ -1386,7 +1385,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
          * @return {@code true} If there are more rows in this source.
          */
         public boolean hasMoreRows() throws IgniteCheckedException {
-            return boundsIter.hasNext() || curRange.next();
+            return boundsIter.hasNext() || iter.hasNext();
         }
 
         /**
@@ -1396,60 +1395,56 @@ public abstract class GridH2IndexBase extends BaseIndex {
         public GridH2RowRange next(int maxRows) {
             assert maxRows > 0 : maxRows;
 
-            try {
-                for (;;) {
-                    if (curRange.next()) {
-                        // Here we are getting last rows from previously partially fetched range.
-                        List<GridH2RowMessage> rows = new ArrayList<>();
+            for (; ; ) {
+                if (iter.hasNext()) {
+                    // Here we are getting last rows from previously partially fetched range.
+                    List<GridH2RowMessage> rows = new ArrayList<>();
 
-                        GridH2RowRange nextRange = new GridH2RowRange();
+                    GridH2RowRange nextRange = new GridH2RowRange();
 
-                        nextRange.rangeId(curRangeId);
-                        nextRange.rows(rows);
+                    nextRange.rangeId(curRangeId);
+                    nextRange.rows(rows);
 
-                        do {
-                            rows.add(toRowMessage(curRange.get()));
-                        }
-                        while (rows.size() < maxRows && curRange.next());
+                    do {
+                        rows.add(toRowMessage(iter.next()));
+                    }
+                    while (rows.size() < maxRows && iter.hasNext());
 
-                        if (curRange.next())
-                            nextRange.setPartial();
-                        else
-                            curRange = EMPTY_CURSOR;
+                    if (iter.hasNext())
+                        nextRange.setPartial();
+                    else
+                        iter = emptyIterator();
 
-                        return nextRange;
-                    }
+                    return nextRange;
+                }
 
-                    curRange = EMPTY_CURSOR;
+                iter = emptyIterator();
 
-                    if (!boundsIter.hasNext()) {
-                        boundsIter = emptyIterator();
+                if (!boundsIter.hasNext()) {
+                    boundsIter = emptyIterator();
 
-                        return null;
-                    }
+                    return null;
+                }
 
-                    GridH2RowRangeBounds bounds = boundsIter.next();
+                GridH2RowRangeBounds bounds = boundsIter.next();
 
-                    curRangeId = bounds.rangeId();
+                curRangeId = bounds.rangeId();
 
-                    SearchRow first = toSearchRow(bounds.first());
-                    SearchRow last = toSearchRow(bounds.last());
+                SearchRow first = toSearchRow(bounds.first());
+                SearchRow last = toSearchRow(bounds.last());
 
-                    IgniteTree t = tree != null ? tree : treeForRead();
+                IgniteTree t = tree != null ? tree : treeForRead();
 
-                    curRange = doFind0(t, first, true, last, filter);
+                iter = new CursorIteratorWrapper(doFind0(t, first, true, last, filter));
 
-                    if (!curRange.next()) {
-                        // We have to return empty range here.
-                        GridH2RowRange emptyRange = new GridH2RowRange();
+                if (!iter.hasNext()) {
+                    // We have to return empty range here.
+                    GridH2RowRange emptyRange = new GridH2RowRange();
 
-                        emptyRange.rangeId(curRangeId);
+                    emptyRange.rangeId(curRangeId);
 
-                        return emptyRange;
-                    }
+                    return emptyRange;
                 }
-            } catch (IgniteCheckedException e) {
-                throw DbException.convert(e);
             }
         }
     }
@@ -1469,7 +1464,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param filter Filter.
      * @return Iterator over rows in given range.
      */
-    protected GridCursor<GridH2Row> doFind0(IgniteTree t,
+    protected GridCursor<GridH2Row> doFind0(
+        IgniteTree t,
         @Nullable SearchRow first,
         boolean includeFirst,
         @Nullable SearchRow last,
@@ -1492,6 +1488,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
         /** Is value required for filtering predicate? */
         private final boolean isValRequired;
 
+        /** */
         private GridH2Row next;
 
         /**
@@ -1540,6 +1537,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
             return fltr.apply(key, val);
         }
 
+        /** {@inheritDoc} */
         @Override public boolean next() throws IgniteCheckedException {
             next = null;
 
@@ -1555,6 +1553,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
             return false;
         }
 
+        /** {@inheritDoc} */
         @Override public GridH2Row get() throws IgniteCheckedException {
             if (next == null)
                 throw new NoSuchElementException();
@@ -1563,6 +1562,80 @@ public abstract class GridH2IndexBase extends BaseIndex {
         }
     }
 
+    /**
+     *
+     */
+    private static final class CursorIteratorWrapper implements Iterator<GridH2Row> {
+        /** */
+        private final GridCursor<GridH2Row> cursor;
+
+        /** First next. */
+        private GridH2Row firstNext;
+
+        /** Second next. */
+        private GridH2Row secondNext;
+
+        /**
+         * @param cursor Cursor.
+         */
+        private CursorIteratorWrapper(GridCursor<GridH2Row> cursor) {
+            this.cursor = cursor;
+
+            fetch();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return firstNext != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridH2Row next() {
+            try {
+                if (firstNext != null) {
+                    GridH2Row res = firstNext;
+
+                    firstNext = secondNext;
+
+                    if (cursor.next())
+                        secondNext = cursor.get();
+                    else
+                        secondNext = null;
+                    return res;
+                }
+                else
+                    return null;
+            }
+            catch (Exception e) {
+                return null;
+            }
+        }
+
+        /**
+         *
+         */
+        private void fetch() {
+            try {
+                if (firstNext == null && secondNext == null) {
+                    if (cursor.next()) {
+                        firstNext = cursor.get();
+
+                        if (cursor.next())
+                            secondNext = cursor.get();
+                    }
+                }
+            }
+            catch (IgniteCheckedException ignored) {
+
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove() {
+            throw new UnsupportedOperationException("operation is not supported");
+        }
+    }
+
     /** Empty cursor. */
     protected static final GridCursor<GridH2Row> EMPTY_CURSOR = new GridCursor<GridH2Row>() {
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 002b432..014cf2e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -503,20 +503,27 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
      * Adapter from {@link NavigableMap} to {@link IgniteTree}.
      */
     private final class IgniteNavigableMapTree implements IgniteTree<GridSearchRowPointer, GridH2Row> {
-        private NavigableMap<GridSearchRowPointer, GridH2Row> tree;
+        /** Tree. */
+        private final NavigableMap<GridSearchRowPointer, GridH2Row> tree;
 
+        /**
+         * @param tree Tree.
+         */
         public IgniteNavigableMapTree(NavigableMap<GridSearchRowPointer, GridH2Row> tree) {
             this.tree = tree;
         }
 
+        /** {@inheritDoc} */
         @Override public GridH2Row put(GridH2Row value) throws IgniteCheckedException {
             return tree.put(value, value);
         }
 
+        /** {@inheritDoc} */
         @Override public GridH2Row findOne(GridSearchRowPointer key) throws IgniteCheckedException {
             return tree.get(key);
         }
 
+        /** {@inheritDoc} */
         @Override public GridCursor<GridH2Row> find(GridSearchRowPointer lower, GridSearchRowPointer upper)
             throws IgniteCheckedException {
             if (lower == null || upper == null)
@@ -528,14 +535,17 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
             return new GridCursorIteratorWrapper<GridH2Row>(subMap.values().iterator());
         }
 
+        /** {@inheritDoc} */
         @Override public GridH2Row remove(GridSearchRowPointer key) throws IgniteCheckedException {
             return tree.remove(key);
         }
 
+        /** {@inheritDoc} */
         @Override public long size() throws IgniteCheckedException {
             return tree.size();
         }
 
+        /** {@inheritDoc} */
         @Override public IgniteNavigableMapTree clone() {
             AbstractMap copy;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1c15a6d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java
new file mode 100644
index 0000000..dca640f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteDistributedJoinTestSuite.java
@@ -0,0 +1,38 @@
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinCollocatedAndNotTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinCustomAffinityMapper;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinNoIndexTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinPartitionedAndReplicatedTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinQueryConditionsTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartDistributedJoinSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlDistributedJoinSelfTest;
+import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistributedJoinsTest;
+
+/**
+ *
+ */
+public class IgniteDistributedJoinTestSuite extends TestSuite {
+    /**
+     *
+     */
+    public static TestSuite suite() {
+        TestSuite suite = new TestSuite("Distributed Joins Test Suite.");
+
+        suite.addTestSuite(H2CompareBigQueryDistributedJoinsTest.class);
+        suite.addTestSuite(IgniteCacheDistributedJoinCollocatedAndNotTest.class);
+        suite.addTestSuite(IgniteCacheDistributedJoinCustomAffinityMapper.class);
+        suite.addTestSuite(IgniteCacheDistributedJoinNoIndexTest.class);
+        suite.addTestSuite(IgniteCacheDistributedJoinPartitionedAndReplicatedTest.class);
+        suite.addTestSuite(IgniteCacheDistributedJoinQueryConditionsTest.class);
+        suite.addTestSuite(IgniteCacheDistributedJoinTest.class);
+        suite.addTestSuite(IgniteCacheQueryNodeRestartDistributedJoinSelfTest.class);
+        suite.addTestSuite(IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.class);
+        suite.addTestSuite(IgniteSqlDistributedJoinSelfTest.class);
+
+        return suite;
+    }
+}