You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/17 08:24:18 UTC
ignite git commit: IGNITE-6080: DML batches are now grouped by
affinity. This closes #2454.
Repository: ignite
Updated Branches:
refs/heads/master 5283e19fe -> 37e58bade
IGNITE-6080: DML batches are now grouped by affinity. This closes #2454.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/37e58bad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/37e58bad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/37e58bad
Branch: refs/heads/master
Commit: 37e58badefcf36536644344023dc62828f41d0f9
Parents: 5283e19
Author: devozerov <vo...@gridgain.com>
Authored: Thu Aug 17 11:24:06 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Aug 17 11:24:06 2017 +0300
----------------------------------------------------------------------
.../query/h2/DmlStatementsProcessor.java | 274 +++++++++++--------
1 file changed, 163 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/37e58bad/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 4f7c288..0ff9cfe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -32,6 +32,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import javax.cache.processor.EntryProcessor;
@@ -46,7 +47,9 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -83,7 +86,6 @@ import org.h2.command.dml.Update;
import org.h2.table.Column;
import org.h2.util.DateTimeUtils;
import org.h2.util.LocalDateTimeUtils;
-import org.h2.value.DataType;
import org.h2.value.Value;
import org.h2.value.ValueDate;
import org.h2.value.ValueTime;
@@ -365,7 +367,7 @@ public class DmlStatementsProcessor {
QueryCursorImpl<List<?>> cur;
// Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
- // subquery and not some dummy stuff like "select 1, 2, 3;"
+ // sub-query and not some dummy stuff like "select 1, 2, 3;"
if (!loc && !plan.isLocSubqry) {
SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated())
.setArgs(fieldsQry.getArgs())
@@ -493,52 +495,28 @@ public class DmlStatementsProcessor {
@SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
private UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize)
throws IgniteCheckedException {
- // With DELETE, we have only two columns - key and value.
- long res = 0;
+ BatchSender sender = new BatchSender(cctx, pageSize);
- // Keys that failed to DELETE due to concurrent updates.
- List<Object> failedKeys = new ArrayList<>();
+ for (List<?> row : cursor) {
+ if (row.size() != 2) {
+ U.warn(log, "Invalid row size on DELETE - expected 2, got " + row.size());
- SQLException resEx = null;
-
-
- Iterator<List<?>> it = cursor.iterator();
- Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
-
- while (it.hasNext()) {
- List<?> e = it.next();
- if (e.size() != 2) {
- U.warn(log, "Invalid row size on DELETE - expected 2, got " + e.size());
continue;
}
- rows.put(e.get(0), new ModifyingEntryProcessor(e.get(1), RMV));
-
- if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
- PageProcessingResult pageRes = processPage(cctx, rows);
-
- res += pageRes.cnt;
-
- failedKeys.addAll(F.asList(pageRes.errKeys));
+ sender.add(row.get(0), new ModifyingEntryProcessor(row.get(1), RMV));
+ }
- if (pageRes.ex != null) {
- if (resEx == null)
- resEx = pageRes.ex;
- else
- resEx.setNextException(pageRes.ex);
- }
+ sender.flush();
- if (it.hasNext())
- rows.clear(); // No need to clear after the last batch.
- }
- }
+ SQLException resEx = sender.error();
if (resEx != null) {
- if (!F.isEmpty(failedKeys)) {
+ if (!F.isEmpty(sender.failedKeys())) {
// Don't go for a re-run if processing of some keys yielded exceptions and report keys that
// had been modified concurrently right away.
String msg = "Failed to DELETE some keys because they had been modified concurrently " +
- "[keys=" + failedKeys + ']';
+ "[keys=" + sender.failedKeys() + ']';
SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
@@ -550,7 +528,7 @@ public class DmlStatementsProcessor {
throw new IgniteSQLException(resEx);
}
- return new UpdateResult(res, failedKeys.toArray());
+ return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
}
/**
@@ -579,20 +557,10 @@ public class DmlStatementsProcessor {
// or if its list of updated columns includes only _val, i.e. is single element.
boolean hasProps = !hasNewVal || updatedColNames.length > 1;
- long res = 0;
-
- Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
-
- // Keys that failed to UPDATE due to concurrent updates.
- List<Object> failedKeys = new ArrayList<>();
-
- SQLException resEx = null;
-
- Iterator<List<?>> it = cursor.iterator();
+ BatchSender sender = new BatchSender(cctx, pageSize);
- while (it.hasNext()) {
- List<?> e = it.next();
- Object key = e.get(0);
+ for (List<?> row : cursor) {
+ Object key = row.get(0);
Object newVal;
@@ -606,10 +574,10 @@ public class DmlStatementsProcessor {
assert prop != null;
- newColVals.put(plan.colNames[i], convert(e.get(i + 2), desc, prop.type(), plan.colTypes[i]));
+ newColVals.put(plan.colNames[i], convert(row.get(i + 2), desc, prop.type(), plan.colTypes[i]));
}
- newVal = plan.valSupplier.apply(e);
+ newVal = plan.valSupplier.apply(row);
if (newVal == null)
throw new IgniteSQLException("New value for UPDATE must not be null", IgniteQueryErrorCode.NULL_VALUE);
@@ -643,38 +611,24 @@ public class DmlStatementsProcessor {
newVal = ((BinaryObjectBuilder) newVal).build();
}
- Object srcVal = e.get(1);
+ Object srcVal = row.get(1);
if (bin && !(srcVal instanceof BinaryObject))
srcVal = cctx.grid().binary().toBinary(srcVal);
- rows.put(key, new ModifyingEntryProcessor(srcVal, new EntryValueUpdater(newVal)));
-
- if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
- PageProcessingResult pageRes = processPage(cctx, rows);
-
- res += pageRes.cnt;
-
- failedKeys.addAll(F.asList(pageRes.errKeys));
+ sender.add(key, new ModifyingEntryProcessor(srcVal, new EntryValueUpdater(newVal)));
+ }
- if (pageRes.ex != null) {
- if (resEx == null)
- resEx = pageRes.ex;
- else
- resEx.setNextException(pageRes.ex);
- }
+ sender.flush();
- if (it.hasNext())
- rows.clear(); // No need to clear after the last batch.
- }
- }
+ SQLException resEx = sender.error();
if (resEx != null) {
- if (!F.isEmpty(failedKeys)) {
+ if (!F.isEmpty(sender.failedKeys())) {
// Don't go for a re-run if processing of some keys yielded exceptions and report keys that
// had been modified concurrently right away.
String msg = "Failed to UPDATE some keys because they had been modified concurrently " +
- "[keys=" + failedKeys + ']';
+ "[keys=" + sender.failedKeys() + ']';
SQLException dupEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
@@ -686,7 +640,7 @@ public class DmlStatementsProcessor {
throw new IgniteSQLException(resEx);
}
- return new UpdateResult(res, failedKeys.toArray());
+ return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
}
/**
@@ -864,47 +818,22 @@ public class DmlStatementsProcessor {
IgniteQueryErrorCode.DUPLICATE_KEY);
}
else {
- Map<Object, EntryProcessor<Object, Object, Boolean>> rows = plan.isLocSubqry ?
- new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>(plan.rowsNum) :
- new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>();
-
// Keys that failed to INSERT due to duplication.
- List<Object> duplicateKeys = new ArrayList<>();
-
- int resCnt = 0;
-
- SQLException resEx = null;
-
- Iterator<List<?>> it = cursor.iterator();
-
- while (it.hasNext()) {
- List<?> row = it.next();
-
- final IgniteBiTuple t = rowToKeyValue(cctx, row, plan);
-
- rows.put(t.getKey(), new InsertEntryProcessor(t.getValue()));
-
- if (!it.hasNext() || (pageSize > 0 && rows.size() == pageSize)) {
- PageProcessingResult pageRes = processPage(cctx, rows);
+ BatchSender sender = new BatchSender(cctx, pageSize);
- resCnt += pageRes.cnt;
+ for (List<?> row : cursor) {
+ final IgniteBiTuple keyValPair = rowToKeyValue(cctx, row, plan);
- duplicateKeys.addAll(F.asList(pageRes.errKeys));
+ sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue()));
+ }
- if (pageRes.ex != null) {
- if (resEx == null)
- resEx = pageRes.ex;
- else
- resEx.setNextException(pageRes.ex);
- }
+ sender.flush();
- rows.clear();
- }
- }
+ SQLException resEx = sender.error();
- if (!F.isEmpty(duplicateKeys)) {
+ if (!F.isEmpty(sender.failedKeys())) {
String msg = "Failed to INSERT some keys because they are already in cache " +
- "[keys=" + duplicateKeys + ']';
+ "[keys=" + sender.failedKeys() + ']';
SQLException dupEx = new SQLException(msg, null, IgniteQueryErrorCode.DUPLICATE_KEY);
@@ -917,7 +846,7 @@ public class DmlStatementsProcessor {
if (resEx != null)
throw new IgniteSQLException(resEx);
- return resCnt;
+ return sender.updateCount();
}
}
@@ -1133,7 +1062,7 @@ public class DmlStatementsProcessor {
/** Number of processed items. */
final long cnt;
- /** Keys that failed to be UPDATEd or DELETEd due to concurrent modification of values. */
+ /** Keys that failed to be updated or deleted due to concurrent modification of values. */
@NotNull
final Object[] errKeys;
@@ -1150,7 +1079,7 @@ public class DmlStatementsProcessor {
/** Number of successfully processed items. */
final long cnt;
- /** Keys that failed to be UPDATEd or DELETEd due to concurrent modification of values. */
+ /** Keys that failed to be updated or deleted due to concurrent modification of values. */
@NotNull
final Object[] errKeys;
@@ -1193,4 +1122,127 @@ public class DmlStatementsProcessor {
this.ex = ex;
}
}
+
+ /**
+ * Batch sender class.
+ */
+ private static class BatchSender {
+ /** Cache context. */
+ private final GridCacheContext cctx;
+
+ /** Batch size. */
+ private final int size;
+
+ /** Batches. */
+ private final Map<UUID, Map<Object, EntryProcessor<Object, Object, Boolean>>> batches = new HashMap<>();
+
+ /** Result count. */
+ private long updateCnt;
+
+ /** Failed keys. */
+ private List<Object> failedKeys;
+
+ /** Exception. */
+ private SQLException err;
+
+ /**
+ * Constructor.
+ *
+ * @param cctx Cache context.
+ * @param size Batch.
+ */
+ public BatchSender(GridCacheContext cctx, int size) {
+ this.cctx = cctx;
+ this.size = size;
+ }
+
+ /**
+ * Add entry to batch.
+ *
+ * @param key Key.
+ * @param proc Processor.
+ */
+ public void add(Object key, EntryProcessor<Object, Object, Boolean> proc) throws IgniteCheckedException {
+ ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE);
+
+ if (node == null)
+ throw new IgniteCheckedException("Failed to map key to node.");
+
+ UUID nodeId = node.id();
+
+ Map<Object, EntryProcessor<Object, Object, Boolean>> batch = batches.get(nodeId);
+
+ if (batch == null) {
+ batch = new HashMap<>();
+
+ batches.put(nodeId, batch);
+ }
+
+ batch.put(key, proc);
+
+ if (batch.size() >= size) {
+ sendBatch(batch);
+
+ batch.clear();
+ }
+ }
+
+ /**
+ * Flush any remaining entries.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void flush() throws IgniteCheckedException {
+ for (Map<Object, EntryProcessor<Object, Object, Boolean>> batch : batches.values()) {
+ if (!batch.isEmpty())
+ sendBatch(batch);
+ }
+ }
+
+ /**
+ * @return Update count.
+ */
+ public long updateCount() {
+ return updateCnt;
+ }
+
+ /**
+ * @return Failed keys.
+ */
+ public List<Object> failedKeys() {
+ return failedKeys != null ? failedKeys : Collections.emptyList();
+ }
+
+ /**
+ * @return Error.
+ */
+ public SQLException error() {
+ return err;
+ }
+
+ /**
+ * Send the batch.
+ *
+ * @param batch Batch.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void sendBatch(Map<Object, EntryProcessor<Object, Object, Boolean>> batch)
+ throws IgniteCheckedException {
+ PageProcessingResult pageRes = processPage(cctx, batch);
+
+ updateCnt += pageRes.cnt;
+
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>();
+
+ failedKeys.addAll(F.asList(pageRes.errKeys));
+
+ if (pageRes.ex != null) {
+ if (err == null)
+ err = pageRes.ex;
+ else
+ err.setNextException(pageRes.ex);
+ }
+ }
+ }
}