You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2020/12/29 08:42:25 UTC

[ignite] branch master updated: IGNITE-13904 Don't use rows buffers by reduce index for simple (plain) query that uses one-way unsorted reducer (#8607)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b5c807d  IGNITE-13904 Don't use rows buffers by reduce index for simple (plain) query that uses one-way unsorted reducer (#8607)
b5c807d is described below

commit b5c807dddf11a2d3a905199c29b2907dca962c0e
Author: tledkov <tl...@gridgain.com>
AuthorDate: Tue Dec 29 11:42:07 2020 +0300

    IGNITE-13904 Don't use rows buffers by reduce index for simple (plain) query that uses one-way unsorted reducer (#8607)
---
 .../apache/ignite/testframework/GridTestUtils.java |  27 ++++-
 .../query/h2/twostep/GridReduceQueryExecutor.java  |   2 +-
 .../query/h2/twostep/UnsortedBaseReducer.java      | 108 +++++++++++++++++++
 .../query/h2/twostep/UnsortedOneWayReducer.java    | 118 +++++++++++++++++++++
 .../query/h2/twostep/UnsortedReducer.java          |  81 +-------------
 .../processors/query/ReducerRowsBufferTest.java    | 108 +++++++++++++++++++
 .../IgniteBinaryCacheQueryTestSuite2.java          |   3 +
 7 files changed, 362 insertions(+), 85 deletions(-)

diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 0b19056..ea7b0ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1609,14 +1609,31 @@ public final class GridTestUtils {
     private static Object findField(Class<?> cls, Object obj,
         String fieldName) throws NoSuchFieldException, IllegalAccessException {
         // Resolve inner field.
-        Field field = cls.getDeclaredField(fieldName);
 
-        boolean accessible = field.isAccessible();
+        NoSuchFieldException ex = null;
 
-        if (!accessible)
-            field.setAccessible(true);
+        while (cls != null) {
+            try {
+                Field field = cls.getDeclaredField(fieldName);
+
+                boolean accessible = field.isAccessible();
+
+                if (!accessible)
+                    field.setAccessible(true);
+
+                return field.get(obj);
+            }
+            catch (NoSuchFieldException ex0) {
+                if (ex == null)
+                    ex = ex0;
+                else
+                    ex.addSuppressed(ex0);
+
+                cls = cls.getSuperclass();
+            }
+        }
 
-        return field.get(obj);
+        throw ex;
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index ea02bc3..c248330 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -722,7 +722,7 @@ public class GridReduceQueryExecutor {
             Reducer reducer;
 
             if (skipMergeTbl)
-                reducer = UnsortedReducer.createDummy(ctx);
+                reducer = UnsortedOneWayReducer.createDummy(ctx);
             else {
                 ReduceTable tbl;
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedBaseReducer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedBaseReducer.java
new file mode 100644
index 0000000..c0970a0
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedBaseReducer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
+import org.h2.index.Cursor;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+import org.h2.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Base unsorted merge index.
+ */
+public abstract class UnsortedBaseReducer extends AbstractReducer {
+    /** */
+    protected final AtomicInteger activeSourcesCnt = new AtomicInteger(-1);
+
+    /** */
+    protected final PollableQueue<ReduceResultPage> queue = new PollableQueue<>();
+
+    /** */
+    protected Iterator<Value[]> iter = Collections.emptyIterator();
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     */
+    public UnsortedBaseReducer(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
+        super.setSources(nodes, segmentsCnt);
+
+        int x = srcNodes.size() * segmentsCnt;
+
+        assert x > 0 : x;
+
+        activeSourcesCnt.set(x);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean fetchedAll() {
+        int x = activeSourcesCnt.get();
+
+        assert x >= 0 : x; // This method must not be called if the sources were not set.
+
+        return x == 0 && queue.isEmpty();
+    }
+
+    /**
+     * @param page Page.
+     */
+    @Override protected void addPage0(ReduceResultPage page) {
+        assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
+
+        // Do not add empty page to avoid premature stream termination.
+        if (page.rowsInPage() != 0 || page.isFail())
+            queue.add(page);
+
+        if (page.isLast()) {
+            int x = activeSourcesCnt.decrementAndGet();
+
+            assert x >= 0 : x;
+
+            if (x == 0) // Always terminate with empty iterator.
+                queue.add(createDummyLastPage(page));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) {
+        // This index is unsorted: have to ignore bounds.
+        return new GridH2Cursor(fetched.iterator());
+    }
+
+    /** */
+    private static class PollableQueue<X> extends LinkedBlockingQueue<X> implements AbstractReducer.Pollable<X> {
+        // No-op.
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedOneWayReducer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedOneWayReducer.java
new file mode 100644
index 0000000..c0b9bfb
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedOneWayReducer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.Iterator;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRowFactory;
+import org.h2.index.Cursor;
+import org.h2.message.DbException;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Unsorted one-way merge index.
+ */
+public class UnsortedOneWayReducer extends UnsortedBaseReducer {
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     */
+    public UnsortedOneWayReducer(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /**
+     * @param ctx Context.
+     * @return Dummy index instance.
+     */
+    public static UnsortedOneWayReducer createDummy(GridKernalContext ctx) {
+        return new UnsortedOneWayReducer(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Cursor findInStream(SearchRow first, SearchRow last) {
+        assert first == null && last == null : "Invalid usage dummy reducer: [first=" + first + ", last=" + last + ']';
+
+        return new OneWayFetchingCursor(new Iterator<Row>() {
+            @Override public boolean hasNext() {
+                iter = pollNextIterator(queue, iter);
+
+                return iter.hasNext();
+            }
+
+            @Override public Row next() {
+                return H2PlainRowFactory.create(iter.next());
+            }
+
+            @Override public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        });
+    }
+
+    /**
+     * Fetching cursor.
+     */
+    private class OneWayFetchingCursor implements Cursor {
+        /** */
+        private Iterator<Row> stream;
+
+        /** */
+        private Row cur;
+
+        /**
+         * @param stream Stream of all the rows from remote nodes.
+         */
+        OneWayFetchingCursor(Iterator<Row> stream) {
+            assert stream != null;
+
+            this.stream = stream;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (!stream.hasNext())
+                return false;
+
+            cur = requireNonNull(stream.next());
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Row get() {
+            return cur;
+        }
+
+        /** {@inheritDoc} */
+        @Override public SearchRow getSearchRow() {
+            return get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean previous() {
+            // Should never be called.
+            throw DbException.getUnsupportedException("previous");
+        }
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReducer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReducer.java
index 7c7d428..91f00b6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReducer.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReducer.java
@@ -17,38 +17,22 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.cluster.ClusterNode;
+
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
 import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRowFactory;
 import org.h2.index.Cursor;
 import org.h2.message.DbException;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
-import org.h2.value.Value;
-import org.jetbrains.annotations.Nullable;
 
 import static java.util.Objects.requireNonNull;
 
 /**
  * Unsorted merge index.
  */
-public class UnsortedReducer extends AbstractReducer {
-    /** */
-    private final PollableQueue<ReduceResultPage> queue = new PollableQueue<>();
-
-    /** */
-    private final AtomicInteger activeSourcesCnt = new AtomicInteger(-1);
-
-    /** */
-    private Iterator<Value[]> iter = Collections.emptyIterator();
-
+public class UnsortedReducer extends UnsortedBaseReducer {
     /**
      * Constructor.
      *
@@ -58,60 +42,6 @@ public class UnsortedReducer extends AbstractReducer {
         super(ctx);
     }
 
-    /**
-     * @param ctx Context.
-     * @return Dummy index instance.
-     */
-    public static UnsortedReducer createDummy(GridKernalContext ctx) {
-        return new UnsortedReducer(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
-        super.setSources(nodes, segmentsCnt);
-
-        int x = srcNodes.size() * segmentsCnt;
-
-        assert x > 0 : x;
-
-        activeSourcesCnt.set(x);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean fetchedAll() {
-        int x = activeSourcesCnt.get();
-
-        assert x >= 0 : x; // This method must not be called if the sources were not set.
-
-        return x == 0 && queue.isEmpty();
-    }
-
-    /**
-     * @param page Page.
-     */
-    @Override protected void addPage0(ReduceResultPage page) {
-        assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
-
-        // Do not add empty page to avoid premature stream termination.
-        if (page.rowsInPage() != 0 || page.isFail())
-            queue.add(page);
-
-        if (page.isLast()) {
-            int x = activeSourcesCnt.decrementAndGet();
-
-            assert x >= 0 : x;
-
-            if (x == 0) // Always terminate with empty iterator.
-                queue.add(createDummyLastPage(page));
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) {
-        // This index is unsorted: have to ignore bounds.
-        return new GridH2Cursor(fetched.iterator());
-    }
-
     /** {@inheritDoc} */
     @Override protected Cursor findInStream(SearchRow first, SearchRow last) {
         // This index is unsorted: have to ignore bounds.
@@ -210,11 +140,4 @@ public class UnsortedReducer extends AbstractReducer {
             throw DbException.getUnsupportedException("previous");
         }
     }
-
-    /**
-     *
-     */
-    private static class PollableQueue<X> extends LinkedBlockingQueue<X> implements AbstractReducer.Pollable<X> {
-        // No-op.
-    }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/ReducerRowsBufferTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/ReducerRowsBufferTest.java
new file mode 100644
index 0000000..df098e7
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/ReducerRowsBufferTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.query.h2.twostep.ReduceBlockList;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.h2.result.Row;
+import org.junit.Test;
+
+/** */
+public class ReducerRowsBufferTest extends GridCommonAbstractTest {
+    /** Table size. */
+    private static final int TBL_SIZE = 1_000;
+
+    /** */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(3);
+
+        createSchema();
+
+        populateData();
+    }
+
+    /** */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+    }
+
+    /** */
+    @Test
+    public void plainQuery() {
+        Iterator<List<?>> it = query("select * from TEST", true).iterator();
+
+        it.next();
+
+        ReduceBlockList<Row> fetched = GridTestUtils.getFieldValue(
+            it,
+            "cursor", "iter", "iter", "cursor", "this$0", "fetched");
+
+        int cnt = 1;
+        while (it.hasNext()) {
+            it.next();
+
+            assertEquals(0, fetched.size());
+
+            cnt++;
+        }
+
+        assertEquals(TBL_SIZE, cnt);
+    }
+
+    /** */
+    private void populateData() {
+        for (int i = 0; i < TBL_SIZE; ++i)
+            execSql("insert into TEST VALUES (?, ?, ?)", i, i % 100, UUID.randomUUID().toString());
+    }
+
+    /** */
+    private void createSchema() {
+        execSql("create table TEST (id int primary key, ref_key int, name varchar)");
+    }
+
+    /**
+     * @param sql SQL query
+     * @param args Query parameters.
+     */
+    private void execSql(String sql, Object... args) {
+        grid(0).context().query().querySqlFields(
+            new SqlFieldsQuery(sql).setArgs(args), false).getAll();
+    }
+
+    /**
+     * @param sql SQL query
+     * @return Results set.
+     */
+    FieldsQueryCursor<List<?>> query(String sql, boolean lazy) {
+        return grid(0).context().query().querySqlFields(
+            new SqlFieldsQueryEx(sql, null)
+                .setLazy(lazy)
+                .setEnforceJoinOrder(true)
+                .setPageSize(100), false);
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
index 6676e47..5e1e560 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.query.IgniteSqlCreateTableTemplateT
 import org.apache.ignite.internal.processors.query.LazyOnDmlTest;
 import org.apache.ignite.internal.processors.query.LocalQueryLazyTest;
 import org.apache.ignite.internal.processors.query.LongRunningQueryTest;
+import org.apache.ignite.internal.processors.query.ReducerRowsBufferTest;
 import org.apache.ignite.internal.processors.query.SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest;
 import org.apache.ignite.internal.processors.query.SqlIndexConsistencyAfterInterruptTxCacheOperationTest;
 import org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest;
@@ -93,6 +94,8 @@ import org.junit.runners.Suite;
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
+    ReducerRowsBufferTest.class,
+
     LazyOnDmlTest.class,
 
     DefaultQueryTimeoutTestSuite.class,