You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2019/02/18 18:53:47 UTC
[ignite] branch ignite-invokeAll updated: invokeAll
This is an automated email from the ASF dual-hosted git repository.
sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-invokeAll by this push:
new aaf280f invokeAll
aaf280f is described below
commit aaf280fa8da9704e4d8cda5954c1c64581674364
Author: sboikov <sb...@apache.org>
AuthorDate: Mon Feb 18 21:53:36 2019 +0300
invokeAll
---
.../processors/cache/IgniteCacheOffheapManager.java | 16 ++++++++++++++++
.../distributed/dht/atomic/GridDhtAtomicCache.java | 18 +++++++++++-------
2 files changed, 27 insertions(+), 7 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 4f72e81..7822319 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -886,10 +886,26 @@ public interface IgniteCacheOffheapManager {
*/
public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException;
+ /**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @return Cache search row.
+ * @throws IgniteCheckedException If failed.
+ */
public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
+ /**
+ * @return Rows comparator.
+ * @throws IgniteCheckedException If failed.
+ */
public Comparator<CacheSearchRow> rowsComparator() throws IgniteCheckedException;
+ /**
+ * @param cctx Cache context.
+ * @param rows Rows sorted according to cache tree sort order..
+ * @param map Update closures map.
+ * @throws IgniteCheckedException If failed.
+ */
public void invokeAll(GridCacheContext cctx,
Collection<? extends CacheSearchRow> rows,
Map<? extends CacheSearchRow, ? extends OffheapInvokeClosure> map)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index fd31b16..9c8ae3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2551,13 +2551,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
int cnt = req.size() - dhtUpdRes.processedEntriesCount();
- Map<UUID, CacheContinuousQueryListener> lsnrs = ctx.continuousQueries().updateListeners(!ctx.userCache(), false);
-
boolean retval = sndPrevVal || req.returnValue();
GridDrType drType = replicate ? DR_PRIMARY : DR_NONE;
- if (cnt > 1) {
+ if (cnt >= 1) {
+ Map<UUID, CacheContinuousQueryListener> lsnrs = ctx.continuousQueries().updateListeners(!ctx.userCache(), false);
+
+ boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM ||
+ !F.isEmptyOrNulls(req.filter());
+
Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheBatchUpdateClosure>> byPart = new HashMap<>();
for (int i = dhtUpdRes.processedEntriesCount(); i < req.size(); i++) {
@@ -2582,6 +2585,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
+ // Possibly read value from store.
+ boolean readFromStore = !req.skipStore() && needVal && (ctx.readThrough() &&
+ (op == GridCacheOperation.TRANSFORM || ctx.loadPreviousValue()));
+
AtomicCacheBatchUpdateClosure c = new AtomicCacheBatchUpdateClosure(
i,
entry,
@@ -2590,7 +2597,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.operation(),
writeVal,
req.invokeArguments(),
- !req.skipStore(),
+ readFromStore,
writeThrough() && !req.skipStore(),
req.keepBinary(),
expiry,
@@ -2622,9 +2629,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
for (Map.Entry<CacheSearchRow, AtomicCacheBatchUpdateClosure> e : map.entrySet()) {
AtomicCacheBatchUpdateClosure c = e.getValue();
- boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM
- || !F.isEmptyOrNulls(req.filter());
-
Object writeVal = op == TRANSFORM ? req.entryProcessor(c.reqIdx) : req.writeValue(c.reqIdx);
GridCacheUpdateAtomicResult updRes = c.entry().finishInnerUpdate(