You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/17 08:24:18 UTC

ignite git commit: IGNITE-6080: DML batches are now grouped by affinity. This closes #2454.

Repository: ignite
Updated Branches:
  refs/heads/master 5283e19fe -> 37e58bade


IGNITE-6080: DML batches are now grouped by affinity. This closes #2454.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/37e58bad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/37e58bad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/37e58bad

Branch: refs/heads/master
Commit: 37e58badefcf36536644344023dc62828f41d0f9
Parents: 5283e19
Author: devozerov <vo...@gridgain.com>
Authored: Thu Aug 17 11:24:06 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Aug 17 11:24:06 2017 +0300

----------------------------------------------------------------------
 .../query/h2/DmlStatementsProcessor.java        | 274 +++++++++++--------
 1 file changed, 163 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/37e58bad/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 4f7c288..0ff9cfe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -32,6 +32,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import javax.cache.processor.EntryProcessor;
@@ -46,7 +47,9 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -83,7 +86,6 @@ import org.h2.command.dml.Update;
 import org.h2.table.Column;
 import org.h2.util.DateTimeUtils;
 import org.h2.util.LocalDateTimeUtils;
-import org.h2.value.DataType;
 import org.h2.value.Value;
 import org.h2.value.ValueDate;
 import org.h2.value.ValueTime;
@@ -365,7 +367,7 @@ public class DmlStatementsProcessor {
         QueryCursorImpl<List<?>> cur;
 
         // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
-        // subquery and not some dummy stuff like "select 1, 2, 3;"
+        // sub-query and not some dummy stuff like "select 1, 2, 3;"
         if (!loc && !plan.isLocSubqry) {
             SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated())
                 .setArgs(fieldsQry.getArgs())
@@ -493,52 +495,28 @@ public class DmlStatementsProcessor {
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize)
         throws IgniteCheckedException {
-        // With DELETE, we have only two columns - key and value.
-        long res = 0;
+        BatchSender sender = new BatchSender(cctx, pageSize);
 
-        // Keys that failed to DELETE due to concurrent updates.
-        List<Object> failedKeys = new ArrayList<>();
+        for (List<?> row : cursor) {
+            if (row.size() != 2) {
+                U.warn(log, "Invalid row size on DELETE - expected 2, got " + row.size());
 
-        SQLException resEx = null;
-
-
-        Iterator<List<?>> it = cursor.iterator();
-        Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
-
-        while (it.hasNext()) {
-            List<?> e = it.next();
-            if (e.size() != 2) {
-                U.warn(log, "Invalid row size on DELETE - expected 2, got " + e.size());
                 continue;
             }
 
-            rows.put(e.get(0), new ModifyingEntryProcessor(e.get(1), RMV));
-
-            if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
-                PageProcessingResult pageRes = processPage(cctx, rows);
-
-                res += pageRes.cnt;
-
-                failedKeys.addAll(F.asList(pageRes.errKeys));
+            sender.add(row.get(0), new ModifyingEntryProcessor(row.get(1), RMV));
+        }
 
-                if (pageRes.ex != null) {
-                    if (resEx == null)
-                        resEx = pageRes.ex;
-                    else
-                        resEx.setNextException(pageRes.ex);
-                }
+        sender.flush();
 
-                if (it.hasNext())
-                    rows.clear(); // No need to clear after the last batch.
-            }
-        }
+        SQLException resEx = sender.error();
 
         if (resEx != null) {
-            if (!F.isEmpty(failedKeys)) {
+            if (!F.isEmpty(sender.failedKeys())) {
                 // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
                 // had been modified concurrently right away.
                 String msg = "Failed to DELETE some keys because they had been modified concurrently " +
-                    "[keys=" + failedKeys + ']';
+                    "[keys=" + sender.failedKeys() + ']';
 
                 SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
 
@@ -550,7 +528,7 @@ public class DmlStatementsProcessor {
             throw new IgniteSQLException(resEx);
         }
 
-        return new UpdateResult(res, failedKeys.toArray());
+        return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
     }
 
     /**
@@ -579,20 +557,10 @@ public class DmlStatementsProcessor {
         // or if its list of updated columns includes only _val, i.e. is single element.
         boolean hasProps = !hasNewVal || updatedColNames.length > 1;
 
-        long res = 0;
-
-        Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
-
-        // Keys that failed to UPDATE due to concurrent updates.
-        List<Object> failedKeys = new ArrayList<>();
-
-        SQLException resEx = null;
-
-        Iterator<List<?>> it = cursor.iterator();
+        BatchSender sender = new BatchSender(cctx, pageSize);
 
-        while (it.hasNext()) {
-            List<?> e = it.next();
-            Object key = e.get(0);
+        for (List<?> row : cursor) {
+            Object key = row.get(0);
 
             Object newVal;
 
@@ -606,10 +574,10 @@ public class DmlStatementsProcessor {
 
                 assert prop != null;
 
-                newColVals.put(plan.colNames[i], convert(e.get(i + 2), desc, prop.type(), plan.colTypes[i]));
+                newColVals.put(plan.colNames[i], convert(row.get(i + 2), desc, prop.type(), plan.colTypes[i]));
             }
 
-            newVal = plan.valSupplier.apply(e);
+            newVal = plan.valSupplier.apply(row);
 
             if (newVal == null)
                 throw new IgniteSQLException("New value for UPDATE must not be null", IgniteQueryErrorCode.NULL_VALUE);
@@ -643,38 +611,24 @@ public class DmlStatementsProcessor {
                 newVal = ((BinaryObjectBuilder) newVal).build();
             }
 
-            Object srcVal = e.get(1);
+            Object srcVal = row.get(1);
 
             if (bin && !(srcVal instanceof BinaryObject))
                 srcVal = cctx.grid().binary().toBinary(srcVal);
 
-            rows.put(key, new ModifyingEntryProcessor(srcVal, new EntryValueUpdater(newVal)));
-
-            if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
-                PageProcessingResult pageRes = processPage(cctx, rows);
-
-                res += pageRes.cnt;
-
-                failedKeys.addAll(F.asList(pageRes.errKeys));
+            sender.add(key, new ModifyingEntryProcessor(srcVal, new EntryValueUpdater(newVal)));
+        }
 
-                if (pageRes.ex != null) {
-                    if (resEx == null)
-                        resEx = pageRes.ex;
-                    else
-                        resEx.setNextException(pageRes.ex);
-                }
+        sender.flush();
 
-                if (it.hasNext())
-                    rows.clear(); // No need to clear after the last batch.
-            }
-        }
+        SQLException resEx = sender.error();
 
         if (resEx != null) {
-            if (!F.isEmpty(failedKeys)) {
+            if (!F.isEmpty(sender.failedKeys())) {
                 // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
                 // had been modified concurrently right away.
                 String msg = "Failed to UPDATE some keys because they had been modified concurrently " +
-                    "[keys=" + failedKeys + ']';
+                    "[keys=" + sender.failedKeys() + ']';
 
                 SQLException dupEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
 
@@ -686,7 +640,7 @@ public class DmlStatementsProcessor {
             throw new IgniteSQLException(resEx);
         }
 
-        return new UpdateResult(res, failedKeys.toArray());
+        return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
     }
 
     /**
@@ -864,47 +818,22 @@ public class DmlStatementsProcessor {
                     IgniteQueryErrorCode.DUPLICATE_KEY);
         }
         else {
-            Map<Object, EntryProcessor<Object, Object, Boolean>> rows = plan.isLocSubqry ?
-                new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>(plan.rowsNum) :
-                new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>();
-
             // Keys that failed to INSERT due to duplication.
-            List<Object> duplicateKeys = new ArrayList<>();
-
-            int resCnt = 0;
-
-            SQLException resEx = null;
-
-            Iterator<List<?>> it = cursor.iterator();
-
-            while (it.hasNext()) {
-                List<?> row = it.next();
-
-                final IgniteBiTuple t = rowToKeyValue(cctx, row, plan);
-
-                rows.put(t.getKey(), new InsertEntryProcessor(t.getValue()));
-
-                if (!it.hasNext() || (pageSize > 0 && rows.size() == pageSize)) {
-                    PageProcessingResult pageRes = processPage(cctx, rows);
+            BatchSender sender = new BatchSender(cctx, pageSize);
 
-                    resCnt += pageRes.cnt;
+            for (List<?> row : cursor) {
+                final IgniteBiTuple keyValPair = rowToKeyValue(cctx, row, plan);
 
-                    duplicateKeys.addAll(F.asList(pageRes.errKeys));
+                sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue()));
+            }
 
-                    if (pageRes.ex != null) {
-                        if (resEx == null)
-                            resEx = pageRes.ex;
-                        else
-                            resEx.setNextException(pageRes.ex);
-                    }
+            sender.flush();
 
-                    rows.clear();
-                }
-            }
+            SQLException resEx = sender.error();
 
-            if (!F.isEmpty(duplicateKeys)) {
+            if (!F.isEmpty(sender.failedKeys())) {
                 String msg = "Failed to INSERT some keys because they are already in cache " +
-                    "[keys=" + duplicateKeys + ']';
+                    "[keys=" + sender.failedKeys() + ']';
 
                 SQLException dupEx = new SQLException(msg, null, IgniteQueryErrorCode.DUPLICATE_KEY);
 
@@ -917,7 +846,7 @@ public class DmlStatementsProcessor {
             if (resEx != null)
                 throw new IgniteSQLException(resEx);
 
-            return resCnt;
+            return sender.updateCount();
         }
     }
 
@@ -1133,7 +1062,7 @@ public class DmlStatementsProcessor {
         /** Number of processed items. */
         final long cnt;
 
-        /** Keys that failed to be UPDATEd or DELETEd due to concurrent modification of values. */
+        /** Keys that failed to be updated or deleted due to concurrent modification of values. */
         @NotNull
         final Object[] errKeys;
 
@@ -1150,7 +1079,7 @@ public class DmlStatementsProcessor {
         /** Number of successfully processed items. */
         final long cnt;
 
-        /** Keys that failed to be UPDATEd or DELETEd due to concurrent modification of values. */
+        /** Keys that failed to be updated or deleted due to concurrent modification of values. */
         @NotNull
         final Object[] errKeys;
 
@@ -1193,4 +1122,127 @@ public class DmlStatementsProcessor {
             this.ex = ex;
         }
     }
+
+    /**
+     * Batch sender class.
+     */
+    private static class BatchSender {
+        /** Cache context. */
+        private final GridCacheContext cctx;
+
+        /** Batch size. */
+        private final int size;
+
+        /** Batches. */
+        private final Map<UUID, Map<Object, EntryProcessor<Object, Object, Boolean>>> batches = new HashMap<>();
+
+        /** Result count. */
+        private long updateCnt;
+
+        /** Failed keys. */
+        private List<Object> failedKeys;
+
+        /** Exception. */
+        private SQLException err;
+
+        /**
+         * Constructor.
+         *
+         * @param cctx Cache context.
+         * @param size Batch.
+         */
+        public BatchSender(GridCacheContext cctx, int size) {
+            this.cctx = cctx;
+            this.size = size;
+        }
+
+        /**
+         * Add entry to batch.
+         *
+         * @param key Key.
+         * @param proc Processor.
+         */
+        public void add(Object key, EntryProcessor<Object, Object, Boolean> proc) throws IgniteCheckedException {
+            ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE);
+
+            if (node == null)
+                throw new IgniteCheckedException("Failed to map key to node.");
+
+            UUID nodeId = node.id();
+
+            Map<Object, EntryProcessor<Object, Object, Boolean>> batch = batches.get(nodeId);
+
+            if (batch == null) {
+                batch = new HashMap<>();
+
+                batches.put(nodeId, batch);
+            }
+
+            batch.put(key, proc);
+
+            if (batch.size() >= size) {
+                sendBatch(batch);
+
+                batch.clear();
+            }
+        }
+
+        /**
+         * Flush any remaining entries.
+         *
+         * @throws IgniteCheckedException If failed.
+         */
+        public void flush() throws IgniteCheckedException {
+            for (Map<Object, EntryProcessor<Object, Object, Boolean>> batch : batches.values()) {
+                if (!batch.isEmpty())
+                    sendBatch(batch);
+            }
+        }
+
+        /**
+         * @return Update count.
+         */
+        public long updateCount() {
+            return updateCnt;
+        }
+
+        /**
+         * @return Failed keys.
+         */
+        public List<Object> failedKeys() {
+            return failedKeys != null ? failedKeys : Collections.emptyList();
+        }
+
+        /**
+         * @return Error.
+         */
+        public SQLException error() {
+            return err;
+        }
+
+        /**
+         * Send the batch.
+         *
+         * @param batch Batch.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void sendBatch(Map<Object, EntryProcessor<Object, Object, Boolean>> batch)
+            throws IgniteCheckedException {
+            PageProcessingResult pageRes = processPage(cctx, batch);
+
+            updateCnt += pageRes.cnt;
+
+            if (failedKeys == null)
+                failedKeys = new ArrayList<>();
+
+            failedKeys.addAll(F.asList(pageRes.errKeys));
+
+            if (pageRes.ex != null) {
+                if (err == null)
+                    err = pageRes.ex;
+                else
+                    err.setNextException(pageRes.ex);
+            }
+        }
+    }
 }