You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2020/12/29 08:42:25 UTC
[ignite] branch master updated: IGNITE-13904 Don't use rows buffers
by reduce index for simple (plain) query that uses one-way unsorted reducer
(#8607)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b5c807d IGNITE-13904 Don't use rows buffers by reduce index for simple (plain) query that uses one-way unsorted reducer (#8607)
b5c807d is described below
commit b5c807dddf11a2d3a905199c29b2907dca962c0e
Author: tledkov <tl...@gridgain.com>
AuthorDate: Tue Dec 29 11:42:07 2020 +0300
IGNITE-13904 Don't use rows buffers by reduce index for simple (plain) query that uses one-way unsorted reducer (#8607)
---
.../apache/ignite/testframework/GridTestUtils.java | 27 ++++-
.../query/h2/twostep/GridReduceQueryExecutor.java | 2 +-
.../query/h2/twostep/UnsortedBaseReducer.java | 108 +++++++++++++++++++
.../query/h2/twostep/UnsortedOneWayReducer.java | 118 +++++++++++++++++++++
.../query/h2/twostep/UnsortedReducer.java | 81 +-------------
.../processors/query/ReducerRowsBufferTest.java | 108 +++++++++++++++++++
.../IgniteBinaryCacheQueryTestSuite2.java | 3 +
7 files changed, 362 insertions(+), 85 deletions(-)
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 0b19056..ea7b0ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1609,14 +1609,31 @@ public final class GridTestUtils {
private static Object findField(Class<?> cls, Object obj,
String fieldName) throws NoSuchFieldException, IllegalAccessException {
// Resolve inner field.
- Field field = cls.getDeclaredField(fieldName);
- boolean accessible = field.isAccessible();
+ NoSuchFieldException ex = null;
- if (!accessible)
- field.setAccessible(true);
+ while (cls != null) {
+ try {
+ Field field = cls.getDeclaredField(fieldName);
+
+ boolean accessible = field.isAccessible();
+
+ if (!accessible)
+ field.setAccessible(true);
+
+ return field.get(obj);
+ }
+ catch (NoSuchFieldException ex0) {
+ if (ex == null)
+ ex = ex0;
+ else
+ ex.addSuppressed(ex0);
+
+ cls = cls.getSuperclass();
+ }
+ }
- return field.get(obj);
+ throw ex;
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index ea02bc3..c248330 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -722,7 +722,7 @@ public class GridReduceQueryExecutor {
Reducer reducer;
if (skipMergeTbl)
- reducer = UnsortedReducer.createDummy(ctx);
+ reducer = UnsortedOneWayReducer.createDummy(ctx);
else {
ReduceTable tbl;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedBaseReducer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedBaseReducer.java
new file mode 100644
index 0000000..c0970a0
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedBaseReducer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
+import org.h2.index.Cursor;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+import org.h2.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Base unsorted merge index.
+ */
+public abstract class UnsortedBaseReducer extends AbstractReducer {
+ /** */
+ protected final AtomicInteger activeSourcesCnt = new AtomicInteger(-1);
+
+ /** */
+ protected final PollableQueue<ReduceResultPage> queue = new PollableQueue<>();
+
+ /** */
+ protected Iterator<Value[]> iter = Collections.emptyIterator();
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ */
+ public UnsortedBaseReducer(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
+ super.setSources(nodes, segmentsCnt);
+
+ int x = srcNodes.size() * segmentsCnt;
+
+ assert x > 0 : x;
+
+ activeSourcesCnt.set(x);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean fetchedAll() {
+ int x = activeSourcesCnt.get();
+
+ assert x >= 0 : x; // This method must not be called if the sources were not set.
+
+ return x == 0 && queue.isEmpty();
+ }
+
+ /**
+ * @param page Page.
+ */
+ @Override protected void addPage0(ReduceResultPage page) {
+ assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
+
+ // Do not add empty page to avoid premature stream termination.
+ if (page.rowsInPage() != 0 || page.isFail())
+ queue.add(page);
+
+ if (page.isLast()) {
+ int x = activeSourcesCnt.decrementAndGet();
+
+ assert x >= 0 : x;
+
+ if (x == 0) // Always terminate with empty iterator.
+ queue.add(createDummyLastPage(page));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) {
+ // This index is unsorted: have to ignore bounds.
+ return new GridH2Cursor(fetched.iterator());
+ }
+
+ /** */
+ private static class PollableQueue<X> extends LinkedBlockingQueue<X> implements AbstractReducer.Pollable<X> {
+ // No-op.
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedOneWayReducer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedOneWayReducer.java
new file mode 100644
index 0000000..c0b9bfb
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedOneWayReducer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.Iterator;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRowFactory;
+import org.h2.index.Cursor;
+import org.h2.message.DbException;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Unsorted one-way merge index.
+ */
+public class UnsortedOneWayReducer extends UnsortedBaseReducer {
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ */
+ public UnsortedOneWayReducer(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /**
+ * @param ctx Context.
+ * @return Dummy index instance.
+ */
+ public static UnsortedOneWayReducer createDummy(GridKernalContext ctx) {
+ return new UnsortedOneWayReducer(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Cursor findInStream(SearchRow first, SearchRow last) {
+ assert first == null && last == null : "Invalid usage dummy reducer: [first=" + first + ", last=" + last + ']';
+
+ return new OneWayFetchingCursor(new Iterator<Row>() {
+ @Override public boolean hasNext() {
+ iter = pollNextIterator(queue, iter);
+
+ return iter.hasNext();
+ }
+
+ @Override public Row next() {
+ return H2PlainRowFactory.create(iter.next());
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ });
+ }
+
+ /**
+ * Fetching cursor.
+ */
+ private class OneWayFetchingCursor implements Cursor {
+ /** */
+ private Iterator<Row> stream;
+
+ /** */
+ private Row cur;
+
+ /**
+ * @param stream Stream of all the rows from remote nodes.
+ */
+ OneWayFetchingCursor(Iterator<Row> stream) {
+ assert stream != null;
+
+ this.stream = stream;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ if (!stream.hasNext())
+ return false;
+
+ cur = requireNonNull(stream.next());
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row get() {
+ return cur;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SearchRow getSearchRow() {
+ return get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean previous() {
+ // Should never be called.
+ throw DbException.getUnsupportedException("previous");
+ }
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReducer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReducer.java
index 7c7d428..91f00b6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReducer.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReducer.java
@@ -17,38 +17,22 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.cluster.ClusterNode;
+
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRowFactory;
import org.h2.index.Cursor;
import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
-import org.h2.value.Value;
-import org.jetbrains.annotations.Nullable;
import static java.util.Objects.requireNonNull;
/**
* Unsorted merge index.
*/
-public class UnsortedReducer extends AbstractReducer {
- /** */
- private final PollableQueue<ReduceResultPage> queue = new PollableQueue<>();
-
- /** */
- private final AtomicInteger activeSourcesCnt = new AtomicInteger(-1);
-
- /** */
- private Iterator<Value[]> iter = Collections.emptyIterator();
-
+public class UnsortedReducer extends UnsortedBaseReducer {
/**
* Constructor.
*
@@ -58,60 +42,6 @@ public class UnsortedReducer extends AbstractReducer {
super(ctx);
}
- /**
- * @param ctx Context.
- * @return Dummy index instance.
- */
- public static UnsortedReducer createDummy(GridKernalContext ctx) {
- return new UnsortedReducer(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
- super.setSources(nodes, segmentsCnt);
-
- int x = srcNodes.size() * segmentsCnt;
-
- assert x > 0 : x;
-
- activeSourcesCnt.set(x);
- }
-
- /** {@inheritDoc} */
- @Override public boolean fetchedAll() {
- int x = activeSourcesCnt.get();
-
- assert x >= 0 : x; // This method must not be called if the sources were not set.
-
- return x == 0 && queue.isEmpty();
- }
-
- /**
- * @param page Page.
- */
- @Override protected void addPage0(ReduceResultPage page) {
- assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
-
- // Do not add empty page to avoid premature stream termination.
- if (page.rowsInPage() != 0 || page.isFail())
- queue.add(page);
-
- if (page.isLast()) {
- int x = activeSourcesCnt.decrementAndGet();
-
- assert x >= 0 : x;
-
- if (x == 0) // Always terminate with empty iterator.
- queue.add(createDummyLastPage(page));
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) {
- // This index is unsorted: have to ignore bounds.
- return new GridH2Cursor(fetched.iterator());
- }
-
/** {@inheritDoc} */
@Override protected Cursor findInStream(SearchRow first, SearchRow last) {
// This index is unsorted: have to ignore bounds.
@@ -210,11 +140,4 @@ public class UnsortedReducer extends AbstractReducer {
throw DbException.getUnsupportedException("previous");
}
}
-
- /**
- *
- */
- private static class PollableQueue<X> extends LinkedBlockingQueue<X> implements AbstractReducer.Pollable<X> {
- // No-op.
- }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/ReducerRowsBufferTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/ReducerRowsBufferTest.java
new file mode 100644
index 0000000..df098e7
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/ReducerRowsBufferTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.query.h2.twostep.ReduceBlockList;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.h2.result.Row;
+import org.junit.Test;
+
+/** */
+public class ReducerRowsBufferTest extends GridCommonAbstractTest {
+ /** Table size. */
+ private static final int TBL_SIZE = 1_000;
+
+ /** */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(3);
+
+ createSchema();
+
+ populateData();
+ }
+
+ /** */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+ }
+
+ /** */
+ @Test
+ public void plainQuery() {
+ Iterator<List<?>> it = query("select * from TEST", true).iterator();
+
+ it.next();
+
+ ReduceBlockList<Row> fetched = GridTestUtils.getFieldValue(
+ it,
+ "cursor", "iter", "iter", "cursor", "this$0", "fetched");
+
+ int cnt = 1;
+ while (it.hasNext()) {
+ it.next();
+
+ assertEquals(0, fetched.size());
+
+ cnt++;
+ }
+
+ assertEquals(TBL_SIZE, cnt);
+ }
+
+ /** */
+ private void populateData() {
+ for (int i = 0; i < TBL_SIZE; ++i)
+ execSql("insert into TEST VALUES (?, ?, ?)", i, i % 100, UUID.randomUUID().toString());
+ }
+
+ /** */
+ private void createSchema() {
+ execSql("create table TEST (id int primary key, ref_key int, name varchar)");
+ }
+
+ /**
+ * @param sql SQL query
+ * @param args Query parameters.
+ */
+ private void execSql(String sql, Object... args) {
+ grid(0).context().query().querySqlFields(
+ new SqlFieldsQuery(sql).setArgs(args), false).getAll();
+ }
+
+ /**
+ * @param sql SQL query
+ * @return Results set.
+ */
+ FieldsQueryCursor<List<?>> query(String sql, boolean lazy) {
+ return grid(0).context().query().querySqlFields(
+ new SqlFieldsQueryEx(sql, null)
+ .setLazy(lazy)
+ .setEnforceJoinOrder(true)
+ .setPageSize(100), false);
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
index 6676e47..5e1e560 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.query.IgniteSqlCreateTableTemplateT
import org.apache.ignite.internal.processors.query.LazyOnDmlTest;
import org.apache.ignite.internal.processors.query.LocalQueryLazyTest;
import org.apache.ignite.internal.processors.query.LongRunningQueryTest;
+import org.apache.ignite.internal.processors.query.ReducerRowsBufferTest;
import org.apache.ignite.internal.processors.query.SqlIndexConsistencyAfterInterruptAtomicCacheOperationTest;
import org.apache.ignite.internal.processors.query.SqlIndexConsistencyAfterInterruptTxCacheOperationTest;
import org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest;
@@ -93,6 +94,8 @@ import org.junit.runners.Suite;
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
+ ReducerRowsBufferTest.class,
+
LazyOnDmlTest.class,
DefaultQueryTimeoutTestSuite.class,