You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/14 10:30:53 UTC

[43/50] ignite git commit: WIP on better abstractions.

WIP on better abstractions.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74e25e31
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74e25e31
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74e25e31

Branch: refs/heads/ignite-5991-6019
Commit: 74e25e319c55f182a34f74c729c88d7625ca5fcd
Parents: 8917957
Author: devozerov <vo...@gridgain.com>
Authored: Mon Aug 14 13:03:45 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Aug 14 13:03:45 2017 +0300

----------------------------------------------------------------------
 .../h2/twostep/GridMergeIndexesIterator.java    |  84 +++++---
 .../h2/twostep/GridReduceQueryExecutor.java     |  31 +--
 .../GridMergeIndexesIteratorSelfTest.java       | 211 -------------------
 .../IgniteCacheQuerySelfTestSuite.java          |   3 -
 4 files changed, 65 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/74e25e31/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexesIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexesIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexesIterator.java
index 9a13d49..1bb97ed 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexesIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexesIterator.java
@@ -18,13 +18,14 @@
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapterEx;
-import org.apache.ignite.lang.IgniteOutClosure;
 import org.h2.index.Cursor;
 import org.h2.result.Row;
 
@@ -32,6 +33,21 @@ import org.h2.result.Row;
  * Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex}es.
  */
 class GridMergeIndexesIterator extends GridCloseableIteratorAdapterEx<List<?>> {
+    /** Reduce query executor. */
+    private final GridReduceQueryExecutor rdcExec;
+
+    /** Participating nodes. */
+    private final Collection<ClusterNode> nodes;
+
+    /** Query run. */
+    private final ReduceQueryRun run;
+
+    /** Query request ID. */
+    private final long qryReqId;
+
+    /** Distributed joins. */
+    private final boolean distributedJoins;
+
     /** Iterator over indexes. */
     private final Iterator<GridMergeIndex> idxs;
 
@@ -41,25 +57,31 @@ class GridMergeIndexesIterator extends GridCloseableIteratorAdapterEx<List<?>> {
     /** Next row to return. */
     private List<Object> next;
 
-    /** Closure to run when this iterator is closed, or when all indexes have been traversed. */
-    private final IgniteOutClosure<?> clo;
-
-    /** Cleanup flag - {@code null} iff {@link #clo} is null. */
-    private final AtomicBoolean cleanupDone;
+    /** Close flag. */
+    private boolean closed;
 
     /**
      * Constructor.
-     * @param idxs Indexes to iterate over.
-     * @param clo Closure to run when this iterator is closed, or when all indexes have been traversed.
+     *
+     * @param rdcExec Reduce query executor.
+     * @param nodes Participating nodes.
+     * @param run Query run.
+     * @param qryReqId Query request ID.
+     * @param distributedJoins Distributed joins.
      * @throws IgniteCheckedException if failed.
      */
-    GridMergeIndexesIterator(Iterable<GridMergeIndex> idxs, IgniteOutClosure<?> clo) throws IgniteCheckedException {
-        this.idxs = idxs.iterator();
-        this.clo = clo;
-
-        cleanupDone = (clo != null ? new AtomicBoolean() : null);
-
-        goNext();
+    GridMergeIndexesIterator(GridReduceQueryExecutor rdcExec, Collection<ClusterNode> nodes, ReduceQueryRun run,
+        long qryReqId, boolean distributedJoins)
+        throws IgniteCheckedException {
+        this.rdcExec = rdcExec;
+        this.nodes = nodes;
+        this.run = run;
+        this.qryReqId = qryReqId;
+        this.distributedJoins = distributedJoins;
+
+        this.idxs = run.indexes().iterator();
+
+        next0();
     }
 
     /** {@inheritDoc} */
@@ -74,7 +96,7 @@ class GridMergeIndexesIterator extends GridCloseableIteratorAdapterEx<List<?>> {
         if (res == null)
             throw new NoSuchElementException();
 
-        goNext();
+        next0();
 
         return res;
     }
@@ -83,24 +105,23 @@ class GridMergeIndexesIterator extends GridCloseableIteratorAdapterEx<List<?>> {
      * Retrieve next row.
      * @throws IgniteCheckedException if failed.
      */
-    private void goNext() throws IgniteCheckedException {
+    private void next0() throws IgniteCheckedException {
         next = null;
 
         try {
-            boolean gotNext = false;
+            boolean hasNext = false;
 
-            while (cur == null || !(gotNext = cur.next())) {
+            while (cur == null || !(hasNext = cur.next())) {
                 if (idxs.hasNext())
                     cur = idxs.next().findInStream(null, null);
                 else {
-                    // We're out of records, let's clean up.
-                    doCleanup();
+                    close0();
 
                     break;
                 }
             }
 
-            if (gotNext) {
+            if (hasNext) {
                 Row row = cur.get();
 
                 int cols = row.getColumnCount();
@@ -114,21 +135,26 @@ class GridMergeIndexesIterator extends GridCloseableIteratorAdapterEx<List<?>> {
             }
         }
         catch (Throwable e) {
-            doCleanup();
+            close0();
 
             throw new IgniteCheckedException(e);
         }
     }
 
-    /** Run {@link #clo} if it's present and hasn't been invoked yet. */
-    private void doCleanup() {
-        if (clo != null && cleanupDone.compareAndSet(false, true))
-            clo.apply();
+    /**
+     * Close routine.
+     */
+    private void close0() {
+        if (!closed) {
+            rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins);
+
+            closed = true;
+        }
     }
 
     /** {@inheritDoc} */
     @Override protected void onClose() throws IgniteCheckedException {
-        doCleanup();
+        close0();
 
         super.onClose();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/74e25e31/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 1a9066f..1091883 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -84,7 +84,6 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.Session;
@@ -168,6 +167,7 @@ public class GridReduceQueryExecutor {
         log = ctx.log(GridReduceQueryExecutor.class);
 
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+            @SuppressWarnings("deprecation")
             @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
                 if (!busyLock.enterBusy())
                     return;
@@ -623,19 +623,6 @@ public class GridReduceQueryExecutor {
 
             final Collection<ClusterNode> finalNodes = nodes;
 
-            // This will be executed either at the end of lazy iteration or after merge table is filled.
-            final IgniteOutClosure<Void> clo = new IgniteOutClosure<Void>() {
-                @Override public Void apply() {
-                    // Make sure any activity related to current attempt is cancelled.
-                    cancelRemoteQueriesIfNeeded(finalNodes, r, qryReqId, qry.distributedJoins());
-
-                    if (!runs.remove(qryReqId, r))
-                        U.warn(log, "Query run was already removed: " + qryReqId);
-
-                    return null;
-                }
-            };
-
             for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
                 GridMergeIndex idx;
 
@@ -769,7 +756,7 @@ public class GridReduceQueryExecutor {
 
                 if (!retry) {
                     if (skipMergeTbl)
-                        resIter = new GridMergeIndexesIterator(r.indexes(), clo);
+                        resIter = new GridMergeIndexesIterator(this, finalNodes, r, qryReqId, qry.distributedJoins());
                     else {
                         cancel.checkCancelled();
 
@@ -835,7 +822,7 @@ public class GridReduceQueryExecutor {
             finally {
                 // If we have fetched all data to merge table, then let's free all resources here.
                 if (!skipMergeTbl) {
-                    clo.apply();
+                    releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins());
 
                     for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
                         fakeTable(null, i).innerTable(null); // Drop all merge tables.
@@ -873,16 +860,15 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * Release remote resources if needed.
+     *
      * @param nodes Query nodes.
      * @param r Query run.
      * @param qryReqId Query id.
      * @param distributedJoins Distributed join flag.
      */
-    private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes,
-        ReduceQueryRun r,
-        long qryReqId,
-        boolean distributedJoins)
-    {
+    public void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId,
+        boolean distributedJoins) {
         // For distributedJoins need always send cancel request to cleanup resources.
         if (distributedJoins)
             send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
@@ -895,6 +881,9 @@ public class GridReduceQueryExecutor {
                 }
             }
         }
+
+        if (!runs.remove(qryReqId, r))
+            U.warn(log, "Query run was already removed: " + qryReqId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/74e25e31/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexesIteratorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexesIteratorSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexesIteratorSelfTest.java
deleted file mode 100644
index a9dfe02..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexesIteratorSelfTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.h2.engine.Session;
-import org.h2.result.Row;
-import org.h2.result.RowImpl;
-import org.h2.result.SearchRow;
-import org.h2.result.SortOrder;
-import org.h2.table.Column;
-import org.h2.table.TableFilter;
-import org.h2.value.Value;
-import org.h2.value.ValueString;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Test to check correct work of {@link GridMergeIndexesIterator}.
- * Note that it checks transparent iteration over data in all indexes while being agnostic to
- * what's happening inside {@link GridMergeIndex} implementations themselves - it goes along well with how
- * this iterator is used in {@link GridReduceQueryExecutor}.
- */
-@SuppressWarnings("unchecked")
-public class GridMergeIndexesIteratorSelfTest extends GridCommonAbstractTest {
-    /**
-     * Test no data cases.
-     */
-    public void testEmpty() {
-        doTest();
-
-        doTest(e());
-
-        doTest(e(), e(), e());
-    }
-
-    /**
-     * Test simple case.
-     */
-    public void testSimple() {
-        doTest(l("A"), l("B", "C", "D"), l("E", "F"));
-    }
-
-    /**
-     * Test complex case with few non-empty item lists with some empty ones in between.
-     */
-    public void testEmptyCombinations() {
-        doTest(e(), l("A"), e(), e(), l("B", "C", "D"), e(), l("E", "F"), e(), e(), e(), l("E", "F"), e());
-    }
-
-    /**
-     * Test that contents of iterator build on top of indexes built on top of given sets of items
-     * match those of an iterator created from list containing all items.
-     * @param data data to fill iterator.
-     */
-    private void doTest(List<String>... data) {
-        List<String> exList = new ArrayList<>();
-
-        List<GridMergeIndex> idxs = new ArrayList<>(data.length);
-
-        for (List<String> aData : data) {
-            exList.addAll(aData);
-
-            idxs.add(new Index(aData));
-        }
-
-        GridMergeIndexesIterator it;
-
-        try {
-            it = new GridMergeIndexesIterator(idxs, null);
-        }
-        catch (IgniteCheckedException e) {
-            throw new AssertionError(e);
-        }
-
-        Iterator<String> exIt = exList.iterator();
-
-        while (it.hasNext() && exIt.hasNext())
-            assertEquals(exIt.next(), it.next().get(0));
-
-        assertFalse(it.hasNext());
-
-        assertFalse(exIt.hasNext());
-    }
-
-    /**
-     * @param items Items to wrap into list.
-     * @return List of {@code items}.
-     */
-    private static List<String> l(String... items) {
-        return Arrays.asList(items);
-    }
-
-    /**
-     * @return Empty list.
-     */
-    private static List<String> e() {
-        return Collections.emptyList();
-    }
-
-    /**
-     * Dummy index.
-     */
-    private final static class Index extends GridMergeIndex {
-        /** Cursor. */
-        private final Cursor cur;
-
-        /**
-         * Constructor.
-         * @param items items.
-         */
-        protected Index(Iterable<String> items) {
-            super(null);
-
-            cur = new Cursor(items);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void addPage0(GridResultPage page) {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean fetchedAll() {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected org.h2.index.Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
-            return cur;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected org.h2.index.Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first,
-            @Nullable SearchRow last) {
-            return cur;
-        }
-
-        /** {@inheritDoc} */
-        @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter,
-            SortOrder sortOrder, HashSet<Column> allColsSet) {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    /**
-     * Dummy cursor.
-     */
-    private final static class Cursor implements org.h2.index.Cursor {
-        /** Current item. */
-        private String cur;
-
-        /** Items iterator. */
-        private final Iterator<String> it;
-
-        /**
-         * Constructor.
-         * @param items items.
-         */
-        Cursor(Iterable<String> items) {
-            it = items.iterator();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Row get() {
-            return new RowImpl(new Value[] { ValueString.get(cur) }, 0);
-        }
-
-        /** {@inheritDoc} */
-        @Override public SearchRow getSearchRow() {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            if (!it.hasNext())
-                return false;
-
-            cur = it.next();
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean previous() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/74e25e31/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 5829bd8..decc7d5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -134,7 +134,6 @@ import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest
 import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
 import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistributedJoinsTest;
 import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest;
-import org.apache.ignite.internal.processors.query.h2.twostep.GridMergeIndexesIteratorSelfTest;
 import org.apache.ignite.internal.processors.sql.SqlConnectorConfigurationValidationSelfTest;
 import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;
 import org.apache.ignite.testframework.IgniteTestSuite;
@@ -225,8 +224,6 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteCacheMultipleIndexedTypesTest.class);
 
-        suite.addTestSuite(GridMergeIndexesIteratorSelfTest.class);
-
         // DML.
         suite.addTestSuite(IgniteCacheMergeSqlQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheInsertSqlQuerySelfTest.class);