You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/05 12:38:37 UTC

[33/52] ignite git commit: IGNITE-4287 DmlStatementsProcessor logic fix (always force keepBinary for SELECTs inside DML logic). This closes #1280.

IGNITE-4287 DmlStatementsProcessor logic fix (always force keepBinary for SELECTs inside DML logic). This closes #1280.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7c0d453
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7c0d453
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7c0d453

Branch: refs/heads/master
Commit: e7c0d453e53821d11f92f102ccfc06e2426d2dce
Parents: dda4fc9
Author: Alexander Paschenko <al...@gmail.com>
Authored: Fri Nov 25 19:07:50 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Nov 25 19:07:50 2016 +0300

----------------------------------------------------------------------
 .../query/h2/DmlStatementsProcessor.java        | 209 +++++++++----------
 .../processors/query/h2/sql/DmlAstUtils.java    |  19 +-
 .../IgniteCacheAbstractSqlDmlQuerySelfTest.java |  46 ++--
 .../IgniteCacheDeleteSqlQuerySelfTest.java      |  25 +++
 4 files changed, 162 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c0d453/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 7634965..469e36c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -46,7 +46,6 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
@@ -130,9 +129,34 @@ public class DmlStatementsProcessor {
 
         UpdatePlan plan = getPlanForStatement(spaceName, stmt, null);
 
+        GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context();
+
         for (int i = 0; i < DFLT_DML_RERUN_ATTEMPTS; i++) {
-            UpdateResult r = executeUpdateStatement(plan.tbl.rowDescriptor().context(), stmt, fieldsQry, loc, filters,
-                cancel, errKeys);
+            CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+            // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+            if (cctx.binaryMarshaller()) {
+                CacheOperationContext newOpCtx = null;
+
+                if (opCtx == null)
+                    // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
+                    newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
+                else if (!opCtx.isKeepBinary())
+                    newOpCtx = opCtx.keepBinary();
+
+                if (newOpCtx != null)
+                    cctx.operationContextPerCall(newOpCtx);
+            }
+
+            UpdateResult r;
+
+            try {
+                r = executeUpdateStatement(cctx, stmt, fieldsQry, loc, filters,
+                    cancel, errKeys);
+            }
+            finally {
+                cctx.operationContextPerCall(opCtx);
+            }
 
             if (F.isEmpty(r.errKeys))
                 return r.cnt + items;
@@ -195,12 +219,16 @@ public class DmlStatementsProcessor {
         throws IgniteCheckedException {
         Integer errKeysPos = null;
 
-        if (!F.isEmpty(failedKeys))
-            errKeysPos = F.isEmpty(fieldsQry.getArgs()) ? 1 : fieldsQry.getArgs().length + 1;
+        Object[] params = fieldsQry.getArgs();
 
-        UpdatePlan plan = getPlanForStatement(cctx.name(), prepStmt, errKeysPos);
+        if (!F.isEmpty(failedKeys)) {
+            int paramsCnt = F.isEmpty(params) ? 0 : params.length;
+            params = Arrays.copyOf(U.firstNotNull(params, X.EMPTY_OBJECT_ARRAY), paramsCnt + 1);
+            params[paramsCnt] = failedKeys;
+            errKeysPos = paramsCnt; // Last position
+        }
 
-        Object[] params = fieldsQry.getArgs();
+        UpdatePlan plan = getPlanForStatement(cctx.name(), prepStmt, errKeysPos);
 
         if (plan.fastUpdateArgs != null) {
             assert F.isEmpty(failedKeys) && errKeysPos == null;
@@ -351,78 +379,58 @@ public class DmlStatementsProcessor {
         // With DELETE, we have only two columns - key and value.
         long res = 0;
 
-        CacheOperationContext opCtx = cctx.operationContextPerCall();
-
-        // Force keepBinary for operation context to avoid binary deserialization inside entry processor
-        if (cctx.binaryMarshaller()) {
-            CacheOperationContext newOpCtx = null;
-
-            if (opCtx == null)
-                // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
-                newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
-            else if (!opCtx.isKeepBinary())
-                newOpCtx = opCtx.keepBinary();
-
-            if (newOpCtx != null)
-                cctx.operationContextPerCall(newOpCtx);
-        }
-
         // Keys that failed to DELETE due to concurrent updates.
         List<Object> failedKeys = new ArrayList<>();
 
         SQLException resEx = null;
 
-        try {
-            Iterator<List<?>> it = cursor.iterator();
-            Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
 
-            while (it.hasNext()) {
-                List<?> e = it.next();
-                if (e.size() != 2) {
-                    U.warn(indexing.getLogger(), "Invalid row size on DELETE - expected 2, got " + e.size());
-                    continue;
-                }
+        Iterator<List<?>> it = cursor.iterator();
+        Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
 
-                rows.put(e.get(0), new ModifyingEntryProcessor(e.get(1), RMV));
+        while (it.hasNext()) {
+            List<?> e = it.next();
+            if (e.size() != 2) {
+                U.warn(indexing.getLogger(), "Invalid row size on DELETE - expected 2, got " + e.size());
+                continue;
+            }
 
-                if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
-                    PageProcessingResult pageRes = processPage(cctx, rows);
+            rows.put(e.get(0), new ModifyingEntryProcessor(e.get(1), RMV));
 
-                    res += pageRes.cnt;
+            if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
+                PageProcessingResult pageRes = processPage(cctx, rows);
 
-                    failedKeys.addAll(F.asList(pageRes.errKeys));
+                res += pageRes.cnt;
 
-                    if (pageRes.ex != null) {
-                        if (resEx == null)
-                            resEx = pageRes.ex;
-                        else
-                            resEx.setNextException(pageRes.ex);
-                    }
+                failedKeys.addAll(F.asList(pageRes.errKeys));
 
-                    if (it.hasNext())
-                        rows.clear(); // No need to clear after the last batch.
+                if (pageRes.ex != null) {
+                    if (resEx == null)
+                        resEx = pageRes.ex;
+                    else
+                        resEx.setNextException(pageRes.ex);
                 }
-            }
 
-            if (resEx != null) {
-                if (!F.isEmpty(failedKeys)) {
-                    // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
-                    // had been modified concurrently right away.
-                    String msg = "Failed to DELETE some keys because they had been modified concurrently " +
-                        "[keys=" + failedKeys + ']';
+                if (it.hasNext())
+                    rows.clear(); // No need to clear after the last batch.
+            }
+        }
 
-                    SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+        if (resEx != null) {
+            if (!F.isEmpty(failedKeys)) {
+                // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
+                // had been modified concurrently right away.
+                String msg = "Failed to DELETE some keys because they had been modified concurrently " +
+                    "[keys=" + failedKeys + ']';
 
-                    conEx.setNextException(resEx);
+                SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
 
-                    resEx = conEx;
-                }
+                conEx.setNextException(resEx);
 
-                throw new IgniteSQLException(resEx);
+                resEx = conEx;
             }
-        }
-        finally {
-            cctx.operationContextPerCall(opCtx);
+
+            throw new IgniteSQLException(resEx);
         }
 
         return new UpdateResult(res, failedKeys.toArray());
@@ -689,22 +697,6 @@ public class DmlStatementsProcessor {
                     IgniteQueryErrorCode.DUPLICATE_KEY);
         }
         else {
-            CacheOperationContext opCtx = cctx.operationContextPerCall();
-
-            // Force keepBinary for operation context to avoid binary deserialization inside entry processor
-            if (cctx.binaryMarshaller()) {
-                CacheOperationContext newOpCtx = null;
-
-                if (opCtx == null)
-                    // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
-                    newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
-                else if (!opCtx.isKeepBinary())
-                    newOpCtx = opCtx.keepBinary();
-
-                if (newOpCtx != null)
-                    cctx.operationContextPerCall(newOpCtx);
-            }
-
             Map<Object, EntryProcessor<Object, Object, Boolean>> rows = plan.isLocSubqry ?
                 new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>(plan.rowsNum) :
                 new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>();
@@ -716,55 +708,50 @@ public class DmlStatementsProcessor {
 
             SQLException resEx = null;
 
-            try {
-                Iterator<List<?>> it = cursor.iterator();
-
-                while (it.hasNext()) {
-                    List<?> row = it.next();
+            Iterator<List<?>> it = cursor.iterator();
 
-                    final IgniteBiTuple t = rowToKeyValue(cctx, row.toArray(), plan.colNames, plan.keySupplier,
-                        plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc.type());
+            while (it.hasNext()) {
+                List<?> row = it.next();
 
-                    rows.put(t.getKey(), new InsertEntryProcessor(t.getValue()));
+                final IgniteBiTuple t = rowToKeyValue(cctx, row.toArray(), plan.colNames, plan.keySupplier,
+                    plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc.type());
 
-                    if (!it.hasNext() || (pageSize > 0 && rows.size() == pageSize)) {
-                        PageProcessingResult pageRes = processPage(cctx, rows);
+                rows.put(t.getKey(), new InsertEntryProcessor(t.getValue()));
 
-                        resCnt += pageRes.cnt;
+                if (!it.hasNext() || (pageSize > 0 && rows.size() == pageSize)) {
+                    PageProcessingResult pageRes = processPage(cctx, rows);
 
-                        duplicateKeys.addAll(F.asList(pageRes.errKeys));
+                    resCnt += pageRes.cnt;
 
-                        if (pageRes.ex != null) {
-                            if (resEx == null)
-                                resEx = pageRes.ex;
-                            else
-                                resEx.setNextException(pageRes.ex);
-                        }
+                    duplicateKeys.addAll(F.asList(pageRes.errKeys));
 
-                        rows.clear();
+                    if (pageRes.ex != null) {
+                        if (resEx == null)
+                            resEx = pageRes.ex;
+                        else
+                            resEx.setNextException(pageRes.ex);
                     }
+
+                    rows.clear();
                 }
+            }
 
-                if (!F.isEmpty(duplicateKeys)) {
-                    String msg = "Failed to INSERT some keys because they are already in cache " +
-                        "[keys=" + duplicateKeys + ']';
+            if (!F.isEmpty(duplicateKeys)) {
+                String msg = "Failed to INSERT some keys because they are already in cache " +
+                    "[keys=" + duplicateKeys + ']';
 
-                    SQLException dupEx = new SQLException(msg, null, IgniteQueryErrorCode.DUPLICATE_KEY);
+                SQLException dupEx = new SQLException(msg, null, IgniteQueryErrorCode.DUPLICATE_KEY);
 
-                    if (resEx == null)
-                        resEx = dupEx;
-                    else
-                        resEx.setNextException(dupEx);
-                }
+                if (resEx == null)
+                    resEx = dupEx;
+                else
+                    resEx.setNextException(dupEx);
+            }
 
-                if (resEx != null)
-                    throw new IgniteSQLException(resEx);
+            if (resEx != null)
+                throw new IgniteSQLException(resEx);
 
-                return resCnt;
-            }
-            finally {
-                cctx.operationContextPerCall(opCtx);
-            }
+            return resCnt;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c0d453/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java
index 39b1b74..5ff715e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java
@@ -417,7 +417,24 @@ public final class DmlAstUtils {
      * @return New condition.
      */
     private static GridSqlElement injectKeysFilterParam(GridSqlElement where, GridSqlColumn keyCol, int paramIdx) {
-        GridSqlElement e = new GridSqlOperation(GridSqlOperationType.IN, keyCol, new GridSqlParameter(paramIdx));
+        // Yes, we need a subquery for "WHERE _key IN ?" to work with param being an array without dirty query rewriting.
+        GridSqlSelect sel = new GridSqlSelect();
+
+        GridSqlFunction from = new GridSqlFunction(GridSqlFunctionType.TABLE);
+
+        sel.from(from);
+
+        GridSqlColumn col = new GridSqlColumn(null, from, "_IGNITE_ERR_KEYS", "TABLE._IGNITE_ERR_KEYS");
+
+        sel.addColumn(col, true);
+
+        GridSqlAlias alias = new GridSqlAlias("_IGNITE_ERR_KEYS", new GridSqlParameter(paramIdx));
+
+        alias.resultType(keyCol.resultType());
+
+        from.addChild(alias);
+
+        GridSqlElement e = new GridSqlOperation(GridSqlOperationType.IN, keyCol, new GridSqlSubquery(sel));
 
         if (where == null)
             return e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c0d453/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java
index 2dbf1b4..22116a9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java
@@ -88,45 +88,41 @@ public abstract class IgniteCacheAbstractSqlDmlQuerySelfTest extends GridCommonA
     @Override protected void beforeTestsStarted() throws Exception {
         startGridsMultiThreaded(3, true);
 
-        ignite(0).createCache(cacheConfig());
+        ignite(0).createCache(cacheConfig("S2P", true, false).setIndexedTypes(String.class, Person.class));
+        ignite(0).createCache(createBinCacheConfig());
     }
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
-        ignite(0).cache("S2P").put("FirstKey", createPerson(1, "John", "White"));
-        ignite(0).cache("S2P").put("SecondKey", createPerson(2, "Joe", "Black"));
-        ignite(0).cache("S2P").put("k3", createPerson(3, "Sylvia", "Green"));
-        ignite(0).cache("S2P").put("f0u4thk3y", createPerson(4, "Jane", "Silver"));
+        ignite(0).cache("S2P").put("FirstKey", new Person(1, "John", "White"));
+        ignite(0).cache("S2P").put("SecondKey", new Person(2, "Joe", "Black"));
+        ignite(0).cache("S2P").put("k3", new Person(3, "Sylvia", "Green"));
+        ignite(0).cache("S2P").put("f0u4thk3y", new Person(4, "Jane", "Silver"));
+
+        ignite(0).cache("S2P-bin").put("FirstKey", createPerson(1, "John", "White"));
+        ignite(0).cache("S2P-bin").put("SecondKey", createPerson(2, "Joe", "Black"));
+        ignite(0).cache("S2P-bin").put("k3", createPerson(3, "Sylvia", "Green"));
+        ignite(0).cache("S2P-bin").put("f0u4thk3y", createPerson(4, "Jane", "Silver"));
     }
 
     /** */
     Object createPerson(int id, String name, String secondName) {
-        if (!isBinaryMarshaller())
-            return new Person(id, name, secondName);
-        else {
-            BinaryObjectBuilder bldr = ignite(0).binary().builder("Person");
-
-            bldr.setField("id", id);
-            bldr.setField("name", name);
-            bldr.setField("secondName", secondName);
-
-            return bldr.build();
-        }
+        BinaryObjectBuilder bldr = ignite(0).binary().builder("Person");
 
-    }
+        bldr.setField("id", id);
+        bldr.setField("name", name);
+        bldr.setField("secondName", secondName);
 
-    /** */
-    protected IgniteCache<?, ?> cache() {
-        return ignite(0).cache("S2P").withKeepBinary();
+        return bldr.build();
     }
 
     /** */
-    protected CacheConfiguration cacheConfig() {
+    protected IgniteCache cache() {
         if (!isBinaryMarshaller())
-            return cacheConfig("S2P", true, false).setIndexedTypes(String.class, Person.class);
+            return ignite(0).cache("S2P");
         else
-            return createBinCacheConfig();
+            return ignite(0).cache("S2P-bin").withKeepBinary();
     }
 
     /** {@inheritDoc} */
@@ -153,7 +149,7 @@ public abstract class IgniteCacheAbstractSqlDmlQuerySelfTest extends GridCommonA
      *
      */
     private static CacheConfiguration createBinCacheConfig() {
-        CacheConfiguration ccfg = cacheConfig("S2P", true, false);
+        CacheConfiguration ccfg = cacheConfig("S2P-bin", true, false);
 
         QueryEntity e = new QueryEntity(String.class.getName(), "Person");
 
@@ -177,7 +173,7 @@ public abstract class IgniteCacheAbstractSqlDmlQuerySelfTest extends GridCommonA
     /**
      *
      */
-    private static class Person implements Serializable {
+    static class Person implements Serializable {
         /** */
         public Person(int id, String name, String secondName) {
             this.id = id;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c0d453/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDeleteSqlQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDeleteSqlQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDeleteSqlQuerySelfTest.java
index 8b0a033..12662db 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDeleteSqlQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDeleteSqlQuerySelfTest.java
@@ -78,4 +78,29 @@ public class IgniteCacheDeleteSqlQuerySelfTest extends IgniteCacheAbstractSqlDml
         assertEqualsCollections(Arrays.asList("f0u4thk3y", createPerson(4, "Jane", "Silver"), 4, "Jane", "Silver"),
             leftovers.get(2));
     }
+
+    /**
+     * In binary mode, this test checks that inner forcing of keepBinary works - without it, EntryProcessors
+     * inside DML engine would compare binary and non-binary objects with the same keys and thus fail.
+     */
+    public void testDeleteSimpleWithoutKeepBinary() {
+        IgniteCache p = ignite(0).cache("S2P");
+
+        QueryCursor<List<?>> c = p.query(new SqlFieldsQuery("delete from Person p where length(p._key) = 2 " +
+            "or p.secondName like '%ite'"));
+
+        c.iterator();
+
+        c = p.query(new SqlFieldsQuery("select * from Person order by id"));
+
+        List<List<?>> leftovers = c.getAll();
+
+        assertEquals(2, leftovers.size());
+
+        assertEqualsCollections(Arrays.asList("SecondKey", new Person(2, "Joe", "Black"), 2, "Joe", "Black"),
+            leftovers.get(0));
+
+        assertEqualsCollections(Arrays.asList("f0u4thk3y", new Person(4, "Jane", "Silver"), 4, "Jane", "Silver"),
+            leftovers.get(1));
+    }
 }