You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/05 12:38:37 UTC
[33/52] ignite git commit: IGNITE-4287 DmlStatementsProcessor logic
fix (always force keepBinary for SELECTs inside DML logic). This closes
#1280.
IGNITE-4287 DmlStatementsProcessor logic fix (always force keepBinary for SELECTs inside DML logic). This closes #1280.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7c0d453
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7c0d453
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7c0d453
Branch: refs/heads/master
Commit: e7c0d453e53821d11f92f102ccfc06e2426d2dce
Parents: dda4fc9
Author: Alexander Paschenko <al...@gmail.com>
Authored: Fri Nov 25 19:07:50 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Nov 25 19:07:50 2016 +0300
----------------------------------------------------------------------
.../query/h2/DmlStatementsProcessor.java | 209 +++++++++----------
.../processors/query/h2/sql/DmlAstUtils.java | 19 +-
.../IgniteCacheAbstractSqlDmlQuerySelfTest.java | 46 ++--
.../IgniteCacheDeleteSqlQuerySelfTest.java | 25 +++
4 files changed, 162 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c0d453/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 7634965..469e36c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -46,7 +46,6 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
@@ -130,9 +129,34 @@ public class DmlStatementsProcessor {
UpdatePlan plan = getPlanForStatement(spaceName, stmt, null);
+ GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context();
+
for (int i = 0; i < DFLT_DML_RERUN_ATTEMPTS; i++) {
- UpdateResult r = executeUpdateStatement(plan.tbl.rowDescriptor().context(), stmt, fieldsQry, loc, filters,
- cancel, errKeys);
+ CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+ // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+ if (cctx.binaryMarshaller()) {
+ CacheOperationContext newOpCtx = null;
+
+ if (opCtx == null)
+ // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
+ newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
+ else if (!opCtx.isKeepBinary())
+ newOpCtx = opCtx.keepBinary();
+
+ if (newOpCtx != null)
+ cctx.operationContextPerCall(newOpCtx);
+ }
+
+ UpdateResult r;
+
+ try {
+ r = executeUpdateStatement(cctx, stmt, fieldsQry, loc, filters,
+ cancel, errKeys);
+ }
+ finally {
+ cctx.operationContextPerCall(opCtx);
+ }
if (F.isEmpty(r.errKeys))
return r.cnt + items;
@@ -195,12 +219,16 @@ public class DmlStatementsProcessor {
throws IgniteCheckedException {
Integer errKeysPos = null;
- if (!F.isEmpty(failedKeys))
- errKeysPos = F.isEmpty(fieldsQry.getArgs()) ? 1 : fieldsQry.getArgs().length + 1;
+ Object[] params = fieldsQry.getArgs();
- UpdatePlan plan = getPlanForStatement(cctx.name(), prepStmt, errKeysPos);
+ if (!F.isEmpty(failedKeys)) {
+ int paramsCnt = F.isEmpty(params) ? 0 : params.length;
+ params = Arrays.copyOf(U.firstNotNull(params, X.EMPTY_OBJECT_ARRAY), paramsCnt + 1);
+ params[paramsCnt] = failedKeys;
+ errKeysPos = paramsCnt; // Last position
+ }
- Object[] params = fieldsQry.getArgs();
+ UpdatePlan plan = getPlanForStatement(cctx.name(), prepStmt, errKeysPos);
if (plan.fastUpdateArgs != null) {
assert F.isEmpty(failedKeys) && errKeysPos == null;
@@ -351,78 +379,58 @@ public class DmlStatementsProcessor {
// With DELETE, we have only two columns - key and value.
long res = 0;
- CacheOperationContext opCtx = cctx.operationContextPerCall();
-
- // Force keepBinary for operation context to avoid binary deserialization inside entry processor
- if (cctx.binaryMarshaller()) {
- CacheOperationContext newOpCtx = null;
-
- if (opCtx == null)
- // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
- newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
- else if (!opCtx.isKeepBinary())
- newOpCtx = opCtx.keepBinary();
-
- if (newOpCtx != null)
- cctx.operationContextPerCall(newOpCtx);
- }
-
// Keys that failed to DELETE due to concurrent updates.
List<Object> failedKeys = new ArrayList<>();
SQLException resEx = null;
- try {
- Iterator<List<?>> it = cursor.iterator();
- Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
- while (it.hasNext()) {
- List<?> e = it.next();
- if (e.size() != 2) {
- U.warn(indexing.getLogger(), "Invalid row size on DELETE - expected 2, got " + e.size());
- continue;
- }
+ Iterator<List<?>> it = cursor.iterator();
+ Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
- rows.put(e.get(0), new ModifyingEntryProcessor(e.get(1), RMV));
+ while (it.hasNext()) {
+ List<?> e = it.next();
+ if (e.size() != 2) {
+ U.warn(indexing.getLogger(), "Invalid row size on DELETE - expected 2, got " + e.size());
+ continue;
+ }
- if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
- PageProcessingResult pageRes = processPage(cctx, rows);
+ rows.put(e.get(0), new ModifyingEntryProcessor(e.get(1), RMV));
- res += pageRes.cnt;
+ if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
+ PageProcessingResult pageRes = processPage(cctx, rows);
- failedKeys.addAll(F.asList(pageRes.errKeys));
+ res += pageRes.cnt;
- if (pageRes.ex != null) {
- if (resEx == null)
- resEx = pageRes.ex;
- else
- resEx.setNextException(pageRes.ex);
- }
+ failedKeys.addAll(F.asList(pageRes.errKeys));
- if (it.hasNext())
- rows.clear(); // No need to clear after the last batch.
+ if (pageRes.ex != null) {
+ if (resEx == null)
+ resEx = pageRes.ex;
+ else
+ resEx.setNextException(pageRes.ex);
}
- }
- if (resEx != null) {
- if (!F.isEmpty(failedKeys)) {
- // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
- // had been modified concurrently right away.
- String msg = "Failed to DELETE some keys because they had been modified concurrently " +
- "[keys=" + failedKeys + ']';
+ if (it.hasNext())
+ rows.clear(); // No need to clear after the last batch.
+ }
+ }
- SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+ if (resEx != null) {
+ if (!F.isEmpty(failedKeys)) {
+ // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
+ // had been modified concurrently right away.
+ String msg = "Failed to DELETE some keys because they had been modified concurrently " +
+ "[keys=" + failedKeys + ']';
- conEx.setNextException(resEx);
+ SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
- resEx = conEx;
- }
+ conEx.setNextException(resEx);
- throw new IgniteSQLException(resEx);
+ resEx = conEx;
}
- }
- finally {
- cctx.operationContextPerCall(opCtx);
+
+ throw new IgniteSQLException(resEx);
}
return new UpdateResult(res, failedKeys.toArray());
@@ -689,22 +697,6 @@ public class DmlStatementsProcessor {
IgniteQueryErrorCode.DUPLICATE_KEY);
}
else {
- CacheOperationContext opCtx = cctx.operationContextPerCall();
-
- // Force keepBinary for operation context to avoid binary deserialization inside entry processor
- if (cctx.binaryMarshaller()) {
- CacheOperationContext newOpCtx = null;
-
- if (opCtx == null)
- // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
- newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
- else if (!opCtx.isKeepBinary())
- newOpCtx = opCtx.keepBinary();
-
- if (newOpCtx != null)
- cctx.operationContextPerCall(newOpCtx);
- }
-
Map<Object, EntryProcessor<Object, Object, Boolean>> rows = plan.isLocSubqry ?
new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>(plan.rowsNum) :
new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>();
@@ -716,55 +708,50 @@ public class DmlStatementsProcessor {
SQLException resEx = null;
- try {
- Iterator<List<?>> it = cursor.iterator();
-
- while (it.hasNext()) {
- List<?> row = it.next();
+ Iterator<List<?>> it = cursor.iterator();
- final IgniteBiTuple t = rowToKeyValue(cctx, row.toArray(), plan.colNames, plan.keySupplier,
- plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc.type());
+ while (it.hasNext()) {
+ List<?> row = it.next();
- rows.put(t.getKey(), new InsertEntryProcessor(t.getValue()));
+ final IgniteBiTuple t = rowToKeyValue(cctx, row.toArray(), plan.colNames, plan.keySupplier,
+ plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc.type());
- if (!it.hasNext() || (pageSize > 0 && rows.size() == pageSize)) {
- PageProcessingResult pageRes = processPage(cctx, rows);
+ rows.put(t.getKey(), new InsertEntryProcessor(t.getValue()));
- resCnt += pageRes.cnt;
+ if (!it.hasNext() || (pageSize > 0 && rows.size() == pageSize)) {
+ PageProcessingResult pageRes = processPage(cctx, rows);
- duplicateKeys.addAll(F.asList(pageRes.errKeys));
+ resCnt += pageRes.cnt;
- if (pageRes.ex != null) {
- if (resEx == null)
- resEx = pageRes.ex;
- else
- resEx.setNextException(pageRes.ex);
- }
+ duplicateKeys.addAll(F.asList(pageRes.errKeys));
- rows.clear();
+ if (pageRes.ex != null) {
+ if (resEx == null)
+ resEx = pageRes.ex;
+ else
+ resEx.setNextException(pageRes.ex);
}
+
+ rows.clear();
}
+ }
- if (!F.isEmpty(duplicateKeys)) {
- String msg = "Failed to INSERT some keys because they are already in cache " +
- "[keys=" + duplicateKeys + ']';
+ if (!F.isEmpty(duplicateKeys)) {
+ String msg = "Failed to INSERT some keys because they are already in cache " +
+ "[keys=" + duplicateKeys + ']';
- SQLException dupEx = new SQLException(msg, null, IgniteQueryErrorCode.DUPLICATE_KEY);
+ SQLException dupEx = new SQLException(msg, null, IgniteQueryErrorCode.DUPLICATE_KEY);
- if (resEx == null)
- resEx = dupEx;
- else
- resEx.setNextException(dupEx);
- }
+ if (resEx == null)
+ resEx = dupEx;
+ else
+ resEx.setNextException(dupEx);
+ }
- if (resEx != null)
- throw new IgniteSQLException(resEx);
+ if (resEx != null)
+ throw new IgniteSQLException(resEx);
- return resCnt;
- }
- finally {
- cctx.operationContextPerCall(opCtx);
- }
+ return resCnt;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c0d453/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java
index 39b1b74..5ff715e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/DmlAstUtils.java
@@ -417,7 +417,24 @@ public final class DmlAstUtils {
* @return New condition.
*/
private static GridSqlElement injectKeysFilterParam(GridSqlElement where, GridSqlColumn keyCol, int paramIdx) {
- GridSqlElement e = new GridSqlOperation(GridSqlOperationType.IN, keyCol, new GridSqlParameter(paramIdx));
+ // Yes, we need a subquery for "WHERE _key IN ?" to work with param being an array without dirty query rewriting.
+ GridSqlSelect sel = new GridSqlSelect();
+
+ GridSqlFunction from = new GridSqlFunction(GridSqlFunctionType.TABLE);
+
+ sel.from(from);
+
+ GridSqlColumn col = new GridSqlColumn(null, from, "_IGNITE_ERR_KEYS", "TABLE._IGNITE_ERR_KEYS");
+
+ sel.addColumn(col, true);
+
+ GridSqlAlias alias = new GridSqlAlias("_IGNITE_ERR_KEYS", new GridSqlParameter(paramIdx));
+
+ alias.resultType(keyCol.resultType());
+
+ from.addChild(alias);
+
+ GridSqlElement e = new GridSqlOperation(GridSqlOperationType.IN, keyCol, new GridSqlSubquery(sel));
if (where == null)
return e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c0d453/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java
index 2dbf1b4..22116a9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java
@@ -88,45 +88,41 @@ public abstract class IgniteCacheAbstractSqlDmlQuerySelfTest extends GridCommonA
@Override protected void beforeTestsStarted() throws Exception {
startGridsMultiThreaded(3, true);
- ignite(0).createCache(cacheConfig());
+ ignite(0).createCache(cacheConfig("S2P", true, false).setIndexedTypes(String.class, Person.class));
+ ignite(0).createCache(createBinCacheConfig());
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
- ignite(0).cache("S2P").put("FirstKey", createPerson(1, "John", "White"));
- ignite(0).cache("S2P").put("SecondKey", createPerson(2, "Joe", "Black"));
- ignite(0).cache("S2P").put("k3", createPerson(3, "Sylvia", "Green"));
- ignite(0).cache("S2P").put("f0u4thk3y", createPerson(4, "Jane", "Silver"));
+ ignite(0).cache("S2P").put("FirstKey", new Person(1, "John", "White"));
+ ignite(0).cache("S2P").put("SecondKey", new Person(2, "Joe", "Black"));
+ ignite(0).cache("S2P").put("k3", new Person(3, "Sylvia", "Green"));
+ ignite(0).cache("S2P").put("f0u4thk3y", new Person(4, "Jane", "Silver"));
+
+ ignite(0).cache("S2P-bin").put("FirstKey", createPerson(1, "John", "White"));
+ ignite(0).cache("S2P-bin").put("SecondKey", createPerson(2, "Joe", "Black"));
+ ignite(0).cache("S2P-bin").put("k3", createPerson(3, "Sylvia", "Green"));
+ ignite(0).cache("S2P-bin").put("f0u4thk3y", createPerson(4, "Jane", "Silver"));
}
/** */
Object createPerson(int id, String name, String secondName) {
- if (!isBinaryMarshaller())
- return new Person(id, name, secondName);
- else {
- BinaryObjectBuilder bldr = ignite(0).binary().builder("Person");
-
- bldr.setField("id", id);
- bldr.setField("name", name);
- bldr.setField("secondName", secondName);
-
- return bldr.build();
- }
+ BinaryObjectBuilder bldr = ignite(0).binary().builder("Person");
- }
+ bldr.setField("id", id);
+ bldr.setField("name", name);
+ bldr.setField("secondName", secondName);
- /** */
- protected IgniteCache<?, ?> cache() {
- return ignite(0).cache("S2P").withKeepBinary();
+ return bldr.build();
}
/** */
- protected CacheConfiguration cacheConfig() {
+ protected IgniteCache cache() {
if (!isBinaryMarshaller())
- return cacheConfig("S2P", true, false).setIndexedTypes(String.class, Person.class);
+ return ignite(0).cache("S2P");
else
- return createBinCacheConfig();
+ return ignite(0).cache("S2P-bin").withKeepBinary();
}
/** {@inheritDoc} */
@@ -153,7 +149,7 @@ public abstract class IgniteCacheAbstractSqlDmlQuerySelfTest extends GridCommonA
*
*/
private static CacheConfiguration createBinCacheConfig() {
- CacheConfiguration ccfg = cacheConfig("S2P", true, false);
+ CacheConfiguration ccfg = cacheConfig("S2P-bin", true, false);
QueryEntity e = new QueryEntity(String.class.getName(), "Person");
@@ -177,7 +173,7 @@ public abstract class IgniteCacheAbstractSqlDmlQuerySelfTest extends GridCommonA
/**
*
*/
- private static class Person implements Serializable {
+ static class Person implements Serializable {
/** */
public Person(int id, String name, String secondName) {
this.id = id;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7c0d453/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDeleteSqlQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDeleteSqlQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDeleteSqlQuerySelfTest.java
index 8b0a033..12662db 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDeleteSqlQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDeleteSqlQuerySelfTest.java
@@ -78,4 +78,29 @@ public class IgniteCacheDeleteSqlQuerySelfTest extends IgniteCacheAbstractSqlDml
assertEqualsCollections(Arrays.asList("f0u4thk3y", createPerson(4, "Jane", "Silver"), 4, "Jane", "Silver"),
leftovers.get(2));
}
+
+ /**
+ * In binary mode, this test checks that inner forcing of keepBinary works - without it, EntryProcessors
+ * inside DML engine would compare binary and non-binary objects with the same keys and thus fail.
+ */
+ public void testDeleteSimpleWithoutKeepBinary() {
+ IgniteCache p = ignite(0).cache("S2P");
+
+ QueryCursor<List<?>> c = p.query(new SqlFieldsQuery("delete from Person p where length(p._key) = 2 " +
+ "or p.secondName like '%ite'"));
+
+ c.iterator();
+
+ c = p.query(new SqlFieldsQuery("select * from Person order by id"));
+
+ List<List<?>> leftovers = c.getAll();
+
+ assertEquals(2, leftovers.size());
+
+ assertEqualsCollections(Arrays.asList("SecondKey", new Person(2, "Joe", "Black"), 2, "Joe", "Black"),
+ leftovers.get(0));
+
+ assertEqualsCollections(Arrays.asList("f0u4thk3y", new Person(4, "Jane", "Silver"), 4, "Jane", "Silver"),
+ leftovers.get(1));
+ }
}