You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2019/02/18 18:53:47 UTC

[ignite] branch ignite-invokeAll updated: invokeAll

This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-invokeAll by this push:
     new aaf280f  invokeAll
aaf280f is described below

commit aaf280fa8da9704e4d8cda5954c1c64581674364
Author: sboikov <sb...@apache.org>
AuthorDate: Mon Feb 18 21:53:36 2019 +0300

    invokeAll
---
 .../processors/cache/IgniteCacheOffheapManager.java    | 16 ++++++++++++++++
 .../distributed/dht/atomic/GridDhtAtomicCache.java     | 18 +++++++++++-------
 2 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 4f72e81..7822319 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -886,10 +886,26 @@ public interface IgniteCacheOffheapManager {
          */
         public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException;
 
+        /**
+         * @param cctx Cache context.
+         * @param key Key.
+         * @return Cache search row.
+         * @throws IgniteCheckedException If failed.
+         */
         public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
 
+        /**
+         * @return Rows comparator.
+         * @throws IgniteCheckedException If failed.
+         */
         public Comparator<CacheSearchRow> rowsComparator() throws IgniteCheckedException;
 
+        /**
+         * @param cctx Cache context.
+         * @param rows Rows sorted according to cache tree sort order..
+         * @param map Update closures map.
+         * @throws IgniteCheckedException If failed.
+         */
         public void invokeAll(GridCacheContext cctx,
             Collection<? extends CacheSearchRow> rows,
             Map<? extends CacheSearchRow, ? extends OffheapInvokeClosure> map)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index fd31b16..9c8ae3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2551,13 +2551,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         int cnt = req.size() - dhtUpdRes.processedEntriesCount();
 
-        Map<UUID, CacheContinuousQueryListener> lsnrs = ctx.continuousQueries().updateListeners(!ctx.userCache(), false);
-
         boolean retval = sndPrevVal || req.returnValue();
 
         GridDrType drType = replicate ? DR_PRIMARY : DR_NONE;
 
-        if (cnt > 1) {
+        if (cnt >= 1) {
+            Map<UUID, CacheContinuousQueryListener> lsnrs = ctx.continuousQueries().updateListeners(!ctx.userCache(), false);
+
+            boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM ||
+                !F.isEmptyOrNulls(req.filter());
+
             Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart = new HashMap<>();
 
             for (int i = dhtUpdRes.processedEntriesCount(); i < req.size(); i++) {
@@ -2582,6 +2585,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
                     Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
 
+                    // Possibly read value from store.
+                    boolean readFromStore = !req.skipStore() && needVal && (ctx.readThrough() &&
+                        (op == GridCacheOperation.TRANSFORM || ctx.loadPreviousValue()));
+
                     AtomicCacheBatchUpdateClosure c = new AtomicCacheBatchUpdateClosure(
                         i,
                         entry,
@@ -2590,7 +2597,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         req.operation(),
                         writeVal,
                         req.invokeArguments(),
-                        !req.skipStore(),
+                        readFromStore,
                         writeThrough() && !req.skipStore(),
                         req.keepBinary(),
                         expiry,
@@ -2622,9 +2629,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     for (Map.Entry<CacheSearchRow, AtomicCacheBatchUpdateClosure> e : map.entrySet()) {
                         AtomicCacheBatchUpdateClosure c = e.getValue();
 
-                        boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM
-                            || !F.isEmptyOrNulls(req.filter());
-
                         Object writeVal = op == TRANSFORM ? req.entryProcessor(c.reqIdx) : req.writeValue(c.reqIdx);
 
                         GridCacheUpdateAtomicResult updRes = c.entry().finishInnerUpdate(