You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2020/09/22 10:00:37 UTC

[ignite] branch ignite-12248 updated: IGNITE-13463: Calcite improvements. Rework RootNode rows exchange. This closes #8261

This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 74230a4  IGNITE-13463: Calcite improvements. Rework RootNode rows exchange. This closes #8261
74230a4 is described below

commit 74230a45e0c5213bf48bcda1be333389702af52d
Author: zstan <st...@gmail.com>
AuthorDate: Tue Sep 22 13:00:16 2020 +0300

    IGNITE-13463: Calcite improvements. Rework RootNode rows exchange. This closes #8261
---
 .../query/calcite/exec/rel/RootNode.java           | 176 +++++++++------------
 .../query/calcite/CalciteQueryProcessorTest.java   |  50 ++++++
 .../processors/query/calcite/CancelTest.java       |   4 +-
 3 files changed, 126 insertions(+), 104 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index f38c613..75a6962 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -25,7 +25,6 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -33,39 +32,43 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
+import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG;
+
 /**
  * Client iterator.
  */
 public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row>, Iterator<Row> {
     /** */
-    private final ReentrantLock lock;
+    private final ReentrantLock lock = new ReentrantLock();
 
     /** */
-    private final Condition cond;
+    private final Condition cond = lock.newCondition();
 
     /** */
-    private final Deque<Row> buff;
+    private final Runnable onClose;
 
     /** */
-    private final Runnable onClose;
+    private final AtomicReference<Throwable> ex = new AtomicReference<>();
 
     /** */
-    private volatile State state = State.RUNNING;
+    private int waiting;
 
     /** */
-    private final AtomicReference<Throwable> ex = new AtomicReference<>();
+    private Deque<Row> inBuff = new ArrayDeque<>(IN_BUFFER_SIZE);
 
     /** */
-    private Row row;
+    private Deque<Row> outBuff = new ArrayDeque<>(IN_BUFFER_SIZE);
 
     /** */
-    private int waiting;
+    private volatile boolean closed;
 
     /**
      * @param ctx Execution context.
      */
     public RootNode(ExecutionContext<Row> ctx) {
-        this(ctx, null);
+        super(ctx);
+
+        onClose = this::closeInternal;
     }
 
     /**
@@ -74,10 +77,6 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
     public RootNode(ExecutionContext<Row> ctx, Runnable onClose) {
         super(ctx);
 
-        buff = new ArrayDeque<>(IN_BUFFER_SIZE);
-        lock = new ReentrantLock();
-        cond = lock.newCondition();
-
         this.onClose = onClose;
     }
 
@@ -88,14 +87,15 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
 
     /** {@inheritDoc} */
     @Override public void close() {
+        if (closed)
+            return;
+
         lock.lock();
         try {
-            if (state == State.RUNNING)
-                state = State.CANCELLED;
-            else if (state == State.END)
-                state = State.CLOSED;
-            else
-                return;
+            if (waiting != -1)
+                ex.compareAndSet(null, new IgniteSQLException(ERR_MSG, IgniteQueryErrorCode.QUERY_CANCELED));
+
+            closed = true; // an exception has to be set first to get right check order
 
             cond.signalAll();
         }
@@ -103,84 +103,60 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
             lock.unlock();
         }
 
-        if (onClose == null)
-            closeInternal();
-        else
-            onClose.run();
+        onClose.run();
     }
 
     /** {@inheritDoc} */
     @Override protected boolean isClosed() {
-        return state == State.CANCELLED || state == State.CLOSED;
+        return closed;
     }
 
     /** {@inheritDoc} */
     @Override public void closeInternal() {
-        context().execute(() -> {
-            buff.clear();
-
-            U.closeQuiet(super::close);
-        });
+        context().execute(() -> sources().forEach(U::closeQuiet));
     }
 
     /** {@inheritDoc} */
     @Override public void push(Row row) {
         assert waiting > 0;
 
+        lock.lock();
         try {
             checkState();
 
-            int req = 0;
-
-            lock.lock();
-            try {
-                if (state != State.RUNNING)
-                    return;
-
-                waiting--;
-
-                buff.offer(row);
+            waiting--;
 
-                if (waiting == 0)
-                    waiting = req = IN_BUFFER_SIZE - buff.size();
+            inBuff.offer(row);
 
+            if (inBuff.size() == IN_BUFFER_SIZE)
                 cond.signalAll();
-            }
-            finally {
-                lock.unlock();
-            }
-
-            if (req > 0)
-                source().request(req);
         }
         catch (Exception e) {
             onError(e);
         }
+        finally {
+            lock.unlock();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void end() {
+        assert waiting > 0;
+
+        lock.lock();
         try {
             checkState();
 
-            lock.lock();
-            try {
-                assert waiting > 0 : "waiting=" + waiting;
-
-                waiting = -1;
+            waiting = -1;
 
-                if (state != State.RUNNING)
-                    return;
-
-                cond.signalAll();
-            }
-            finally {
-                lock.unlock();
-            }
+            cond.signalAll();
         }
         catch (Exception e) {
             onError(e);
         }
+        finally {
+            lock.unlock();
+        }
     }
 
     /** {@inheritDoc} */
@@ -193,12 +169,17 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
 
     /** {@inheritDoc} */
     @Override public boolean hasNext() {
-        if (row != null)
+        checkException();
+
+        if (!outBuff.isEmpty())
             return true;
-        else if (state == State.END || state == State.CLOSED)
+
+        if (closed && ex.get() == null)
             return false;
-        else
-            return (row = take()) != null;
+
+        exchangeBuffers();
+
+        return !outBuff.isEmpty();
     }
 
     /** {@inheritDoc} */
@@ -206,10 +187,7 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
         if (!hasNext())
             throw new NoSuchElementException();
 
-        Row cur0 = row;
-        row = null;
-
-        return cur0;
+        return outBuff.remove();
     }
 
     /** {@inheritDoc} */
@@ -236,61 +214,53 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
     }
 
     /** */
-    private Row take() {
+    private void exchangeBuffers() {
         assert !F.isEmpty(sources()) && sources().size() == 1;
 
         lock.lock();
         try {
-            while (true) {
-                checkCancelled();
-                assert state == State.RUNNING;
+            while (ex.get() == null) {
+                assert outBuff.isEmpty();
 
-                if (!buff.isEmpty())
-                    return buff.poll();
-                else if (waiting == -1)
-                    break;
-                else if (waiting == 0) {
+                if (inBuff.size() == IN_BUFFER_SIZE || waiting == -1) {
+                    Deque<Row> tmp = inBuff;
+                    inBuff = outBuff;
+                    outBuff = tmp;
+                }
+
+                if (waiting == -1)
+                    close();
+                else if (inBuff.isEmpty() && waiting == 0) {
                     int req = waiting = IN_BUFFER_SIZE;
                     context().execute(() -> source().request(req));
                 }
 
+                if (!outBuff.isEmpty() || waiting == -1)
+                    break;
+
                 cond.await();
             }
-
-            state = State.END;
         }
         catch (InterruptedException e) {
-            throw new IgniteInterruptedException(e);
+            onError(new IgniteInterruptedException(e));
         }
         finally {
             lock.unlock();
         }
 
-        assert state == State.END;
-
-        close();
-        
-        return null;
+        checkException();
     }
 
     /** */
-    private void checkCancelled() {
-        if (state == State.CANCELLED) {
-            ex.compareAndSet(null, new IgniteSQLException("The query was cancelled while executing.", IgniteQueryErrorCode.QUERY_CANCELED));
+    private void checkException() {
+        Throwable e = ex.get();
 
-            throw sqlException(ex.get());
-        }
-    }
+        if (e == null)
+            return;
 
-    /** */
-    private IgniteSQLException sqlException(Throwable e) {
-        return e instanceof IgniteSQLException
-            ? (IgniteSQLException)e
-            : new IgniteSQLException("An error occurred while query executing.", IgniteQueryErrorCode.UNKNOWN, e);
-    }
-
-    /** */
-    private enum State {
-        RUNNING, CANCELLED, END, CLOSED
+        if (e instanceof IgniteSQLException)
+            throw (IgniteSQLException)e;
+        else
+            throw new IgniteSQLException("An error occurred while query executing.", IgniteQueryErrorCode.UNKNOWN, e);
     }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 2886ad9..12448a9 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
@@ -407,6 +408,55 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
         assertNull(row);
     }
+    
+    public void testThroughput() {
+        IgniteCache<Integer, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
+            .setCacheMode(CacheMode.REPLICATED)
+            .setName("developer")
+            .setSqlSchema("PUBLIC")
+            .setIndexedTypes(Integer.class, Developer.class)
+            .setBackups(2)
+        );
+
+        int numIterations = 1000;
+
+        int prId = -1;
+
+        for (int i = 0; i < 5000; i++) {
+            if (i % 1000 == 0)
+                prId++;
+
+            developer.put(i, new Developer("Name" + i, prId));
+        }
+
+        QueryEngine engine = Commons.lookupComponent(ignite.context(), QueryEngine.class);
+
+        // warmup
+        for (int i = 0; i < numIterations; i++) {
+            List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
+            query.get(0).getAll();
+        }
+
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < numIterations; i++) {
+            List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
+            query.get(0).getAll();
+        }
+        System.out.println("Calcite duration = " + (System.currentTimeMillis() - start));
+
+        // warmup
+        for (int i = 0; i < numIterations; i++) {
+            List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
+            query.get(0).getAll();
+        }
+
+        start = System.currentTimeMillis();
+        for (int i = 0; i < numIterations; i++) {
+            List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
+            query.get(0).getAll();
+        }
+        System.out.println("H2 duration = " + (System.currentTimeMillis() - start));
+    }
 
     /** */
     public static class Key {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
index 0ddac9e..48af61c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.internal.processors.query.calcite;
 
 import java.util.Iterator;
@@ -43,6 +44,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
 import static java.util.Collections.singletonList;
+import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG;
 
 /**
  * Cancel query test.
@@ -110,7 +112,7 @@ public class CancelTest extends GridCommonAbstractTest {
 
                 return null;
             },
-            IgniteSQLException.class, "The query was cancelled while executing"
+            IgniteSQLException.class, ERR_MSG
         );
 
         awaitReservationsRelease("TEST");