You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/09/28 17:35:18 UTC

[ignite] branch master updated: IGNITE-16510 SQL Calcite: Keep binary flag support - Fixes #10272.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5149af3c33e IGNITE-16510 SQL Calcite: Keep binary flag support - Fixes #10272.
5149af3c33e is described below

commit 5149af3c33ea6640063774b95771f0c4a4d70212
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Wed Sep 28 20:33:42 2022 +0300

    IGNITE-16510 SQL Calcite: Keep binary flag support - Fixes #10272.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../query/calcite/exec/ExecutionServiceImpl.java   |  26 ++++-
 .../calcite/util/ConvertingClosableIterator.java   |  16 ++-
 .../query/calcite/util/ListFieldsQueryCursor.java  |   4 +-
 .../integration/KeepBinaryIntegrationTest.java     | 125 +++++++++++++++++++++
 .../ignite/testsuites/IntegrationTestSuite.java    |   2 +
 .../processors/query/GridQueryProcessor.java       |   6 +-
 .../internal/processors/query/QueryProperties.java |  36 ++++++
 7 files changed, 207 insertions(+), 8 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 490c2644cf8..5aa192c73d8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -39,11 +40,14 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryProperties;
 import org.apache.ignite.internal.processors.query.QueryState;
 import org.apache.ignite.internal.processors.query.RunningQuery;
 import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
@@ -84,6 +88,7 @@ import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.ConvertingClosableIterator;
 import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.util.typedef.F;
@@ -112,6 +117,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     /** */
     private GridCachePartitionExchangeManager<?, ?> exchangeMgr;
 
+    /** */
+    private CacheObjectValueContext objValCtx;
+
     /** */
     private QueryPlanCache qryPlanCache;
 
@@ -332,6 +340,13 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
         this.exchangeMgr = exchangeMgr;
     }
 
+    /**
+     * @param objValCtx Cache object value context.
+     */
+    public void cacheObjectValueContext(CacheObjectValueContext objValCtx) {
+        this.objValCtx = objValCtx;
+    }
+
     /**
      * @return Exchange manager.
      */
@@ -362,6 +377,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     @Override public void onStart(GridKernalContext ctx) {
         localNodeId(ctx.localNodeId());
         exchangeManager(ctx.cache().context().exchange());
+        cacheObjectValueContext(ctx.query().objectContext());
         eventManager(ctx.event());
         iteratorsHolder(new ClosableIteratorsHolder(log));
 
@@ -592,7 +608,15 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
             }
         }
 
-        return new ListFieldsQueryCursor<>(plan, iteratorsHolder().iterator(qry.iterator()), ectx);
+        QueryProperties qryProps = qry.context().unwrap(QueryProperties.class);
+
+        Function<Object, Object> fieldConverter = (qryProps == null || qryProps.keepBinary()) ? null :
+            o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false, true, null);
+
+        Iterator<List<?>> it = new ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
+            fieldConverter);
+
+        return new ListFieldsQueryCursor<>(plan, it, ectx);
     }
 
     /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
index ca3d3b3bc4c..e3d19e18f6b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
@@ -20,14 +20,16 @@ package org.apache.ignite.internal.processors.query.calcite.util;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Function;
 
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-class ConvertingClosableIterator<Row> implements Iterator<List<?>>, AutoCloseable {
+public class ConvertingClosableIterator<Row> implements Iterator<List<?>>, AutoCloseable {
     /** */
     private final Iterator<Row> it;
 
@@ -35,9 +37,17 @@ class ConvertingClosableIterator<Row> implements Iterator<List<?>>, AutoCloseabl
     private final RowHandler<Row> rowHnd;
 
     /** */
-    public ConvertingClosableIterator(Iterator<Row> it, ExecutionContext<Row> ectx) {
+    @Nullable private final Function<Object, Object> fieldConverter;
+
+    /** */
+    public ConvertingClosableIterator(
+        Iterator<Row> it,
+        ExecutionContext<Row> ectx,
+        @Nullable Function<Object, Object> fieldConverter
+    ) {
         this.it = it;
         rowHnd = ectx.rowHandler();
+        this.fieldConverter = fieldConverter;
     }
 
     /**
@@ -58,7 +68,7 @@ class ConvertingClosableIterator<Row> implements Iterator<List<?>>, AutoCloseabl
         List<Object> res = new ArrayList<>(rowSize);
 
         for (int i = 0; i < rowSize; i++)
-            res.add(rowHnd.get(i, next));
+            res.add(fieldConverter == null ? rowHnd.get(i, next) : fieldConverter.apply(rowHnd.get(i, next)));
 
         return res;
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
index 95ed48a307e..a3885a6f9b9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ListFieldsQueryCursor.java
@@ -49,13 +49,13 @@ public class ListFieldsQueryCursor<Row> implements FieldsQueryCursor<List<?>>, Q
      * @param it Iterator.
      * @param ectx Row converter.
      */
-    public ListFieldsQueryCursor(MultiStepPlan plan, Iterator<Row> it, ExecutionContext<Row> ectx) {
+    public ListFieldsQueryCursor(MultiStepPlan plan, Iterator<List<?>> it, ExecutionContext<Row> ectx) {
         FieldsMetadata metadata0 = plan.fieldsMetadata();
         assert metadata0 != null;
         fieldsMeta = metadata0.queryFieldsMetadata(ectx.getTypeFactory());
         isQry = plan.type() == QueryPlan.Type.QUERY;
 
-        this.it = new ConvertingClosableIterator<>(it, ectx);
+        this.it = it;
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KeepBinaryIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KeepBinaryIntegrationTest.java
new file mode 100644
index 00000000000..f3aff6c99c2
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KeepBinaryIntegrationTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+/**
+ * Test "keep binary" in cache queries.
+ */
+public class KeepBinaryIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    private static final String CACHE_NAME = "cache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+        cfg.getSqlConfiguration().setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration());
+        cfg.setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME).setIndexedTypes(Integer.class, Person.class));
+
+        return cfg;
+    }
+
+    /** */
+    @Test
+    public void testKeepBinary() {
+        IgniteCache<Integer, Person> cache = client.cache(CACHE_NAME);
+
+        Person p0 = new Person(0, "name0", null);
+        Person p1 = new Person(1, "name1", new Person(2, "name2",
+            new Person(3, "name3", null)));
+
+        Person p2 = new Person(2, "name2", F.asList(new Person(3, "name3", null),
+            new Person(4, "name4", null)));
+
+        cache.put(0, p0);
+        cache.put(1, p1);
+        cache.put(2, p2);
+
+        List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT _VAL, obj, name FROM Person ORDER BY id")).getAll();
+
+        assertEquals(3, res.size());
+        assertEquals(p0, res.get(0).get(0));
+        assertEquals(p1, res.get(1).get(0));
+        assertEquals(p2, res.get(2).get(0));
+        assertEquals(p0.obj, res.get(0).get(1));
+        assertEquals(p1.obj, res.get(1).get(1));
+        assertEquals(p2.obj, res.get(2).get(1));
+        assertEquals(p0.name, res.get(0).get(2));
+        assertEquals(p1.name, res.get(1).get(2));
+        assertEquals(p2.name, res.get(2).get(2));
+
+        res = cache.withKeepBinary().query(new SqlFieldsQuery("SELECT _VAL, obj, name FROM Person ORDER BY id")).getAll();
+
+        assertEquals(3, res.size());
+        assertTrue(res.get(0).get(0) instanceof BinaryObject);
+        assertTrue(res.get(1).get(0) instanceof BinaryObject);
+        assertTrue(res.get(1).get(0) instanceof BinaryObject);
+        assertNull(res.get(0).get(1));
+        assertTrue(res.get(1).get(1) instanceof BinaryObject);
+        assertTrue(res.get(1).get(1) instanceof BinaryObject);
+        assertEquals(p0.name, res.get(0).get(2));
+        assertEquals(p1.name, res.get(1).get(2));
+        assertEquals(p2.name, res.get(2).get(2));
+    }
+
+    /** */
+    private static class Person {
+        /** */
+        @QuerySqlField
+        private final int id;
+
+        /** */
+        @QuerySqlField
+        private final String name;
+
+        /** */
+        @QuerySqlField
+        private final Object obj;
+
+        /** */
+        private Person(int id, String name, Object obj) {
+            this.id = id;
+            this.name = name;
+            this.obj = obj;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            Person person = (Person)o;
+            return id == person.id && name.equals(person.name) && Objects.equals(obj, person.obj);
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 712589eb52a..ded1524ad90 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.IndexScan
 import org.apache.ignite.internal.processors.query.calcite.integration.IndexSpoolIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.IntervalTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.JoinIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.KeepBinaryIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.KillCommandDdlIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.KillQueryCommandDdlIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
@@ -105,6 +106,7 @@ import org.junit.runners.Suite;
     IndexRebuildIntegrationTest.class,
     QueryEngineConfigurationIntegrationTest.class,
     SearchSargOnIndexIntegrationTest.class,
+    KeepBinaryIntegrationTest.class,
 })
 public class IntegrationTestSuite {
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 35aa38929b5..fd01dc62aeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -3080,9 +3080,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                         QueryEngine qryEngine = engineForQuery(cliCtx, qry);
 
                         if (qryEngine != null) {
+                            QueryProperties qryProps = new QueryProperties(keepBinary);
+
                             if (qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isBatched()) {
                                 res = qryEngine.queryBatched(
-                                    QueryContext.of(qry, cliCtx, cancel),
+                                    QueryContext.of(qry, cliCtx, cancel, qryProps),
                                     schemaName,
                                     qry.getSql(),
                                     ((SqlFieldsQueryEx)qry).batchedArguments()
@@ -3090,7 +3092,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                             }
                             else {
                                 res = qryEngine.query(
-                                    QueryContext.of(qry, cliCtx, cancel),
+                                    QueryContext.of(qry, cliCtx, cancel, qryProps),
                                     schemaName,
                                     qry.getSql(),
                                     qry.getArgs() != null ? qry.getArgs() : X.EMPTY_OBJECT_ARRAY
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
new file mode 100644
index 00000000000..fa41ffb196c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+/**
+ * Additional properties to execute the query (Stored in {@link QueryContext}).
+ */
+public final class QueryProperties {
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    public QueryProperties(boolean keepBinary) {
+        this.keepBinary = keepBinary;
+    }
+
+    /** */
+    public boolean keepBinary() {
+        return keepBinary;
+    }
+}