You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/30 13:11:47 UTC
[02/18] incubator-ignite git commit: # ignite-44
# ignite-44
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/71ee2ee1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/71ee2ee1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/71ee2ee1
Branch: refs/heads/ignite-1
Commit: 71ee2ee194aea90f81a7b4299bb9e7163a59f143
Parents: 982d441
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 23 17:05:07 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 23 17:30:04 2014 +0300
----------------------------------------------------------------------
.../processors/cache/IgniteCacheProxy.java | 32 +-
.../processors/cache/CacheInvokeEntry.java | 72 ++++
.../processors/cache/CacheInvokeResult.java | 95 +++++
.../processors/cache/GridCacheAdapter.java | 18 +-
.../processors/cache/GridCacheEntryEx.java | 2 +
.../processors/cache/GridCacheMapEntry.java | 58 ++-
.../processors/cache/GridCacheMessage.java | 54 +++
.../processors/cache/GridCacheProjectionEx.java | 21 ++
.../cache/GridCacheProjectionImpl.java | 15 +
.../processors/cache/GridCacheProxyImpl.java | 29 ++
.../cache/GridCacheUpdateAtomicResult.java | 18 +-
.../dht/atomic/GridDhtAtomicCache.java | 279 +++++++++++---
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 19 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 121 +++---
.../dht/atomic/GridNearAtomicUpdateFuture.java | 36 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 185 +++++++---
.../distributed/near/GridNearAtomicCache.java | 9 +-
...eCacheAtomicPrimaryWriteOrderInvokeTest.java | 47 +++
...micPrimaryWriteOrderWithStoreInvokeTest.java | 23 ++
.../cache/IgniteCacheInvokeAbstractTest.java | 367 +++++++++++++++++++
.../processors/cache/GridCacheTestEntryEx.java | 11 +-
.../junits/common/GridCommonAbstractTest.java | 43 +++
22 files changed, 1383 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 3945234..410fb9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -494,16 +494,40 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
throws EntryProcessorException {
- // TODO IGNITE-1.
- throw new UnsupportedOperationException();
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args).get();
+
+ return res.get();
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheException(e);
+ }
}
/** {@inheritDoc} */
@Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
Object... args) {
- // TODO IGNITE-1.
- throw new UnsupportedOperationException();
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.invokeAll(keys, entryProcessor, args).get();
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheException(e);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
new file mode 100644
index 0000000..1f3900d
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
@@ -0,0 +1,72 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import javax.cache.processor.*;
+
+/**
+ * Implementation of {@link MutableEntry} passed to the {@link EntryProcessor#process(MutableEntry, Object...)}.
+ */
+public class CacheInvokeEntry<K, V> implements MutableEntry<K, V> {
+ /** */
+ @GridToStringInclude
+ private final K key;
+
+ /** */
+ @GridToStringInclude
+ private V val;
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ */
+ public CacheInvokeEntry(K key, V val) {
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean exists() {
+ return val != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ val = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setValue(V val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public K getKey() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getValue() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T unwrap(Class<T> clazz) {
+ throw new IllegalArgumentException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheInvokeEntry.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
new file mode 100644
index 0000000..50af119
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java
@@ -0,0 +1,95 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.apache.ignite.marshaller.optimized.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+
+/**
+ * Implementation of {@link EntryProcessorResult}.
+ */
+public class CacheInvokeResult<T> implements EntryProcessorResult<T>, Externalizable, IgniteOptimizedMarshallable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"})
+ private static Object GG_CLASS_ID;
+
+ /** */
+ @GridToStringInclude
+ private T res;
+
+ /** */
+ private Exception err;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public CacheInvokeResult() {
+ // No-op.
+ }
+
+ /**
+ * @param res Computed result.
+ */
+ public CacheInvokeResult(@Nullable T res) {
+ this.res = res;
+ }
+
+ /**
+ * @param err Exception thrown by {@link EntryProcessor#process(MutableEntry, Object...)}.
+ */
+ public CacheInvokeResult(Exception err) {
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object ggClassId() {
+ return GG_CLASS_ID;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T get() throws EntryProcessorException {
+ if (err != null) {
+ if (err instanceof EntryProcessorException)
+ throw (EntryProcessorException)err;
+
+ throw new EntryProcessorException(err);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(res);
+
+ out.writeObject(err);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ res = (T)in.readObject();
+
+ err = (Exception)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheInvokeResult.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index f55ee0e..b3e567c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -43,6 +43,7 @@ import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -2193,6 +2194,21 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
}
/** {@inheritDoc} */
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
+ throws EntryProcessorException {
+ // TODO IGNITE-44.
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ // TODO IGNITE-44.
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public void transform(final K key, final IgniteClosure<V, V> transformer) throws IgniteCheckedException {
A.notNull(key, "key", transformer, "valTransform");
@@ -4516,7 +4532,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
* @param key Cache key.
* @throws IllegalArgumentException If validation fails.
*/
- private void validateCacheKey(Object key) {
+ protected void validateCacheKey(Object key) {
if (keyCheck) {
CU.validateCacheKey(log, key);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index 5fb0b95..1b71eec 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -392,6 +392,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
* @param op Update operation.
* @param val Value. Type depends on operation.
* @param valBytes Value bytes. Can be non-null only if operation is UPDATE.
+ * @param invokeArgs Optional arguments for entry processor.
* @param writeThrough Write through flag.
* @param retval Return value flag.
* @param expiryPlc Expiry policy.
@@ -424,6 +425,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
GridCacheOperation op,
@Nullable Object val,
@Nullable byte[] valBytes,
+ @Nullable Object[] invokeArgs,
boolean writeThrough,
boolean retval,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 0fd64de..f14bba5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -29,6 +29,7 @@ import org.jetbrains.annotations.*;
import sun.misc.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
import java.io.*;
import java.nio.*;
import java.util.*;
@@ -1586,6 +1587,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
GridCacheOperation op,
@Nullable Object writeObj,
@Nullable byte[] valBytes,
+ @Nullable Object[] invokeArgs,
boolean writeThrough,
boolean retval,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
@@ -1615,6 +1617,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
GridDrResolveResult<V> drRes = null;
+ EntryProcessorResult<?> invokeRes = null;
+
long newTtl = -1L;
long newExpireTime = 0L;
long newDrExpireTime = -1L; // Explicit DR expire time which possibly will be sent to DHT node.
@@ -1644,7 +1648,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
if (drRes.isUseOld()) {
old = retval ? rawGetOrUnmarshalUnlocked(false) : val;
- return new GridCacheUpdateAtomicResult<>(false, old, null, -1L, -1L, null, null, false);
+ return new GridCacheUpdateAtomicResult<>(false,
+ old,
+ null,
+ invokeRes,
+ -1L,
+ -1L,
+ null,
+ null,
+ false);
}
newTtl = drRes.newTtl();
@@ -1692,7 +1704,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
old = retval ? rawGetOrUnmarshalUnlocked(false) : val;
- return new GridCacheUpdateAtomicResult<>(false, old, null, -1L, -1L, null, null, false);
+ return new GridCacheUpdateAtomicResult<>(false,
+ old,
+ null,
+ invokeRes,
+ -1L,
+ -1L,
+ null,
+ null,
+ false);
}
}
else
@@ -1744,6 +1764,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
return new GridCacheUpdateAtomicResult<>(false,
retval ? old : null,
null,
+ invokeRes,
-1L,
-1L,
null,
@@ -1760,11 +1781,26 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
if (op == GridCacheOperation.TRANSFORM) {
transformClo = writeObj;
- IgniteClosure<V, V> transform = (IgniteClosure<V, V>)writeObj;
+ EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)writeObj;
- updated = cctx.unwrapTemporary(transform.apply(old));
+ CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(key, old);
+
+ try {
+ Object computed = entryProcessor.process(entry, invokeArgs);
+
+ updated = cctx.unwrapTemporary(entry.getValue());
+
+ invokeRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed));
- valBytes = null;
+ valBytes = null;
+ }
+ catch (Exception e) {
+ invokeRes = new CacheInvokeResult<>(e);
+
+ updated = old;
+
+ valBytes = oldBytes.getIfMarshaled();
+ }
}
else
updated = (V)writeObj;
@@ -1794,6 +1830,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
return new GridCacheUpdateAtomicResult<>(false,
retval ? old : null,
null,
+ invokeRes,
-1L,
-1L,
null,
@@ -1899,6 +1936,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
return new GridCacheUpdateAtomicResult<>(false,
cctx.<V>unwrapTemporary(interceptRes.get2()),
null,
+ invokeRes,
-1L,
-1L,
null,
@@ -2001,7 +2039,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
if (log.isDebugEnabled())
log.debug("Updated cache entry [val=" + val + ", old=" + old + ", entry=" + this + ']');
- return new GridCacheUpdateAtomicResult<>(res, old, updated, newTtl, newDrExpireTime, enqueueVer, drRes, true);
+ return new GridCacheUpdateAtomicResult<>(res,
+ old,
+ updated,
+ invokeRes,
+ newTtl,
+ newDrExpireTime,
+ enqueueVer,
+ drRes,
+ true);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
index ab98dcb..45eda5f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMessage.java
@@ -372,6 +372,60 @@ public abstract class GridCacheMessage<K, V> extends GridTcpCommunicationMessage
}
/**
+ * @param args Arguments to marshal.
+ * @param ctx Context.
+ * @return Marshalled collection.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected final byte[][] marshalInvokeArguments(@Nullable Object[] args,
+ GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ assert ctx != null;
+
+ if (args == null || args.length == 0)
+ return null;
+
+ byte[][] argsBytes = new byte[args.length][];
+
+ for (int i = 0; i < args.length; i++) {
+ Object arg = args[i];
+
+ if (ctx.deploymentEnabled())
+ prepareObject(arg, ctx);
+
+ argsBytes[i] = arg == null ? null : CU.marshal(ctx, arg);
+ }
+
+ return argsBytes;
+ }
+
+
+ /**
+ * @param byteCol Collection to unmarshal.
+ * @param ctx Context.
+ * @param ldr Loader.
+ * @return Unmarshalled collection.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected final Object[] unmarshalInvokeArguments(@Nullable byte[][] byteCol,
+ GridCacheSharedContext<K, V> ctx,
+ ClassLoader ldr) throws IgniteCheckedException {
+ assert ldr != null;
+ assert ctx != null;
+
+ if (byteCol == null)
+ return null;
+
+ Object[] args = new Object[byteCol.length];
+
+ IgniteMarshaller marsh = ctx.marshaller();
+
+ for (int i = 0; i < byteCol.length; i++)
+ args[i] = byteCol[i] == null ? null : marsh.unmarshal(byteCol[i], ldr);
+
+ return args;
+ }
+
+ /**
* @param filter Collection to marshal.
* @param ctx Context.
* @return Marshalled collection.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
index 2362f57..1a98192 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
@@ -18,6 +18,7 @@ import org.gridgain.grid.kernal.processors.cache.dr.*;
import org.jetbrains.annotations.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
import java.util.*;
/**
@@ -393,4 +394,24 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> {
* @return New projection based on this one, but with the specified expiry policy.
*/
public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc);
+
+ /**
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param args Arguments.
+ * @return Future.
+ */
+ public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args);
+
+ /**
+ * @param keys Keys.
+ * @param entryProcessor Entry processor.
+ * @param args Arguments.
+ * @return Future.
+ */
+ public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
index 5bd973c..ad5cde3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
@@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -788,6 +789,20 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
/** {@inheritDoc} */
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ return cache.invoke(key, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ return cache.invokeAll(keys, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteFuture<Boolean> putxAsync(K key, V val,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
return putxAsync(key, val, null, -1, filter);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
index 136e078..90aeb0b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
@@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -737,6 +738,34 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.invoke(key, entryProcessor, args);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.invokeAll(keys, entryProcessor, args);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteFuture<Boolean> putxAsync(K key, V val,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
index 43ca819..34dbe52 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -9,11 +9,12 @@
package org.gridgain.grid.kernal.processors.cache;
-import org.gridgain.grid.kernal.processors.dr.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.grid.util.tostring.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
+
/**
* Cache entry atomic update result.
*/
@@ -46,14 +47,18 @@ public class GridCacheUpdateAtomicResult<K, V> {
/** Whether update should be propagated to DHT node. */
private final boolean sndToDht;
+ /** Value computed by entry processor. */
+ private EntryProcessorResult<?> res;
+
/**
* Constructor.
*
* @param success Success flag.
* @param oldVal Old value.
* @param newVal New value.
+ * @param res Value computed by the {@link EntryProcessor}.
* @param newTtl New TTL.
- * @param drExpireTime Explict DR expire time (if any).
+ * @param drExpireTime Explicit DR expire time (if any).
* @param rmvVer Version for deferred delete.
* @param drRes DR resolution result.
* @param sndToDht Whether update should be propagated to DHT node.
@@ -61,6 +66,7 @@ public class GridCacheUpdateAtomicResult<K, V> {
public GridCacheUpdateAtomicResult(boolean success,
@Nullable V oldVal,
@Nullable V newVal,
+ @Nullable EntryProcessorResult<?> res,
long newTtl,
long drExpireTime,
@Nullable GridCacheVersion rmvVer,
@@ -69,6 +75,7 @@ public class GridCacheUpdateAtomicResult<K, V> {
this.success = success;
this.oldVal = oldVal;
this.newVal = newVal;
+ this.res = res;
this.newTtl = newTtl;
this.drExpireTime = drExpireTime;
this.rmvVer = rmvVer;
@@ -77,6 +84,13 @@ public class GridCacheUpdateAtomicResult<K, V> {
}
/**
+ * @return Value computed by the {@link EntryProcessor}.
+ */
+ @Nullable public EntryProcessorResult<?> computedResult() {
+ return res;
+ }
+
+ /**
* @return Success flag.
*/
public boolean success() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2c988e7..fec59b2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -34,6 +34,7 @@ import org.jetbrains.annotations.*;
import sun.misc.*;
import javax.cache.expiry.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -306,14 +307,32 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("unchecked")
@Override public IgniteFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry,
long ttl, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
- return updateAllAsync0(F0.asMap(key, val), null, null, null, true, false, entry, ttl, filter);
+ return updateAllAsync0(F0.asMap(key, val),
+ null,
+ null,
+ null,
+ null,
+ true,
+ false,
+ entry,
+ ttl,
+ filter);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx<K, V> entry, long ttl,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) {
- return updateAllAsync0(F0.asMap(key, val), null, null, null, false, false, entry, ttl, filter);
+ return updateAllAsync0(F0.asMap(key, val),
+ null,
+ null,
+ null,
+ null,
+ false,
+ false,
+ entry,
+ ttl,
+ filter);
}
/** {@inheritDoc} */
@@ -385,7 +404,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) {
- return updateAllAsync0(F.asMap(key, newVal), null, null, null, true, true, null, 0,
+ return updateAllAsync0(F.asMap(key, newVal),
+ null,
+ null,
+ null,
+ null,
+ true,
+ true,
+ null,
+ 0,
ctx.equalsPeekArray(oldVal));
}
@@ -398,7 +425,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteFuture<?> putAllAsync(Map<? extends K, ? extends V> m,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
- return updateAllAsync0(m, null, null, null, false, false, null, 0, filter);
+ return updateAllAsync0(m,
+ null,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ 0,
+ filter);
}
/** {@inheritDoc} */
@@ -410,7 +446,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public IgniteFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) {
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
- return updateAllAsync0(null, null, drMap, null, false, false, null, 0, null);
+ return updateAllAsync0(null,
+ null,
+ null,
+ drMap,
+ null,
+ false,
+ false,
+ null,
+ 0,
+ null);
}
/** {@inheritDoc} */
@@ -421,16 +466,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public <R> R transformAndCompute(K key, IgniteClosure<V, IgniteBiTuple<V, R>> transformer)
throws IgniteCheckedException {
+ /*
return (R)updateAllAsync0(null,
Collections.singletonMap(key, new GridCacheTransformComputeClosure<>(transformer)), null, null, true,
false, null, 0, null).get();
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public IgniteFuture<?> transformAsync(K key, IgniteClosure<V, V> transformer,
@Nullable GridCacheEntryEx<K, V> entry, long ttl) {
+ /*
return updateAllAsync0(null, Collections.singletonMap(key, transformer), null, null, false, false, entry, ttl,
null);
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@@ -440,10 +493,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public IgniteFuture<?> transformAllAsync(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) {
+ /*
if (F.isEmpty(m))
return new GridFinishedFuture<Object>(ctx.kernalContext());
return updateAllAsync0(null, m, null, null, false, false, null, 0, null);
+ */
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@@ -579,11 +636,83 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
"GridCacheAtomicityMode.ATOMIC mode (use GridCacheAtomicityMode.TRANSACTIONAL instead)"));
}
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ A.notNull(key, "key", entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKey(key);
+
+ ctx.denyOnLocalRead();
+
+ Map<? extends K, EntryProcessor> transformMap =
+ Collections.singletonMap(key, (EntryProcessor)entryProcessor);
+
+ IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
+ transformMap,
+ args,
+ null,
+ null,
+ true,
+ false,
+ null,
+ -1L,
+ null);
+
+ return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
+ @Override public EntryProcessorResult<T> applyx(IgniteFuture<Map<K, EntryProcessorResult<T>>> fut)
+ throws IgniteCheckedException {
+ Map<K, EntryProcessorResult<T>> resMap = fut.get();
+
+ assert resMap != null;
+ assert resMap.size() == 1 : resMap.size();
+
+ return resMap.values().iterator().next();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ final EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ A.notNull(keys, "keys", entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKeys(keys);
+
+ ctx.denyOnLocalRead();
+
+ Map<? extends K, EntryProcessor> transformMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+ @Override public EntryProcessor apply(K k) {
+ return entryProcessor;
+ }
+ });
+
+ IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
+ transformMap,
+ args,
+ null,
+ null,
+ true,
+ false,
+ null,
+ -1L,
+ null);
+
+ return fut;
+ }
+
/**
* Entry point for all public API put/transform methods.
*
* @param map Put map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed.
* @param transformMap Transform map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param drPutMap DR put map.
* @param drRmvMap DR remove map.
* @param retval Return value required flag.
@@ -595,7 +724,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private IgniteFuture updateAllAsync0(
@Nullable final Map<? extends K, ? extends V> map,
- @Nullable final Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+ @Nullable final Map<? extends K, EntryProcessor> transformMap,
+ @Nullable Object[] invokeArgs,
@Nullable final Map<? extends K, GridCacheDrInfo<V>> drPutMap,
@Nullable final Map<? extends K, GridCacheVersion> drRmvMap,
final boolean retval,
@@ -623,6 +753,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : drPutMap != null ?
drPutMap.keySet() : drRmvMap.keySet(),
map != null ? map.values() : transformMap != null ? transformMap.values() : null,
+ invokeArgs,
drPutMap != null ? drPutMap.values() : null,
drRmvMap != null ? drRmvMap.values() : null,
retval,
@@ -682,6 +813,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
keys != null ? keys : drMap.keySet(),
null,
null,
+ null,
keys != null ? null : drMap.values(),
retval,
rawRetval,
@@ -692,8 +824,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
taskNameHash);
return asyncOp(new CO<IgniteFuture<Object>>() {
- @Override
- public IgniteFuture<Object> apply() {
+ @Override public IgniteFuture<Object> apply() {
updateFut.map();
return updateFut;
@@ -858,11 +989,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
IgniteFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
if (forceFut.isDone())
- updateAllAsyncInternal0(nodeId, req, cached, completionCb);
+ updateAllAsyncInternal0(nodeId, req, completionCb);
else {
forceFut.listenAsync(new CI1<IgniteFuture<Object>>() {
@Override public void apply(IgniteFuture<Object> t) {
- updateAllAsyncInternal0(nodeId, req, cached, completionCb);
+ updateAllAsyncInternal0(nodeId, req, completionCb);
}
});
}
@@ -873,13 +1004,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*
* @param nodeId Node ID.
* @param req Update request.
- * @param cached Cached entry if updating single local entry.
* @param completionCb Completion callback.
*/
public void updateAllAsyncInternal0(
UUID nodeId,
GridNearAtomicUpdateRequest<K, V> req,
- @Nullable GridCacheEntryEx<K, V> cached,
CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb
) {
GridNearAtomicUpdateResponse<K, V> res = new GridNearAtomicUpdateResponse<>(ctx.cacheId(), nodeId,
@@ -887,7 +1016,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
List<K> keys = req.keys();
- assert !req.returnValue() || keys.size() == 1;
+ assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1);
GridDhtAtomicUpdateFuture<K, V> dhtFut = null;
@@ -966,6 +1095,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
deleted = updRes.deleted();
dhtFut = updRes.dhtFuture();
+
+ if (req.operation() == TRANSFORM)
+ retVal = new GridCacheReturn<>((Object)updRes.invokeResults(), true);
}
else {
UpdateSingleResult<K, V> updRes = updateSingle(node,
@@ -1075,8 +1207,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry
) throws GridCacheEntryRemovedException {
- // Cannot update in batches during DR due to possible conflicts.
- assert !req.returnValue(); // Should not request return values for putAll.
+ assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
+ assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
if (!F.isEmpty(req.filter())) {
try {
@@ -1092,11 +1224,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
int size = req.keys().size();
Map<K, V> putMap = null;
- Map<K, IgniteClosure<V, V>> transformMap = null;
+ Map<K, EntryProcessor<K, V, ?>> entryProcessorMap = null;
Collection<K> rmvKeys = null;
UpdateBatchResult<K, V> updRes = new UpdateBatchResult<>();
List<GridDhtCacheEntry<K, V>> filtered = new ArrayList<>(size);
GridCacheOperation op = req.operation();
+ Map<Object, Object> invokeResMap = op == TRANSFORM ? U.newHashMap(size) : null;
int firstEntryIdx = 0;
@@ -1136,7 +1269,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (op == TRANSFORM) {
- IgniteClosure<V, V> transform = req.transformClosure(i);
+ EntryProcessor<K, V, ?> entryProcessor = req.entryProcessor(i);
V old = entry.innerGet(
null,
@@ -1148,17 +1281,35 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/*event*/true,
/*temporary*/true,
req.subjectId(),
- transform,
+ entryProcessor,
taskName,
CU.<K, V>empty(),
null);
- if (transformMap == null)
- transformMap = new HashMap<>();
+ if (entryProcessorMap == null)
+ entryProcessorMap = new HashMap<>();
+
+ entryProcessorMap.put(entry.key(), entryProcessor);
+
+ CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old);
- transformMap.put(entry.key(), transform);
+ V updated;
+ CacheInvokeResult invokeRes;
- V updated = transform.apply(old);
+ try {
+ Object computed = entryProcessor.process(invokeEntry, req.invokeArguments());
+
+ updated = ctx.unwrapTemporary(invokeEntry.getValue());
+
+ invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed));
+ }
+ catch (Exception e) {
+ invokeRes = new CacheInvokeResult<>(e);
+
+ updated = old;
+ }
+
+ invokeResMap.put(entry.key(), invokeRes);
if (updated == null) {
if (intercept) {
@@ -1179,7 +1330,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
node,
putMap,
null,
- transformMap,
+ entryProcessorMap,
dhtFut,
completionCb,
req,
@@ -1192,7 +1343,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
firstEntryIdx = i + 1;
putMap = null;
- transformMap = null;
+ entryProcessorMap = null;
filtered = new ArrayList<>();
}
@@ -1221,7 +1372,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
node,
null,
rmvKeys,
- transformMap,
+ entryProcessorMap,
dhtFut,
completionCb,
req,
@@ -1234,7 +1385,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
firstEntryIdx = i + 1;
rmvKeys = null;
- transformMap = null;
+ entryProcessorMap = null;
filtered = new ArrayList<>();
}
@@ -1331,7 +1482,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
node,
putMap,
rmvKeys,
- transformMap,
+ entryProcessorMap,
dhtFut,
completionCb,
req,
@@ -1346,6 +1497,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updRes.dhtFuture(dhtFut);
+ updRes.invokeResult(invokeResMap);
+
return updRes;
}
@@ -1442,6 +1595,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
+ Map<K, EntryProcessorResult<?>> computedMap = null;
+
// Avoid iterator creation.
for (int i = 0; i < keys.size(); i++) {
K k = keys.get(i);
@@ -1487,6 +1642,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op,
writeVal,
newValBytes,
+ req.invokeArguments(),
primary && storeEnabled(),
req.returnValue(),
expiry,
@@ -1524,16 +1680,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
newValBytes = null; // Value has been changed.
}
- IgniteClosure<V, V> transformC = null;
+ EntryProcessor<K, V, ?> entryProcessor = null;
if (req.forceTransformBackups() && op == TRANSFORM)
- transformC = (IgniteClosure<V, V>)writeVal;
+ entryProcessor = (EntryProcessor<K, V, ?>)writeVal;
if (!readersOnly) {
dhtFut.addWriteEntry(entry,
updRes.newValue(),
newValBytes,
- transformC,
+ entryProcessor,
updRes.newTtl(),
expireTime,
newDrVer);
@@ -1544,7 +1700,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entry,
updRes.newValue(),
newValBytes,
- transformC,
+ entryProcessor,
ttl,
expireTime);
}
@@ -1599,17 +1755,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
deleted.add(F.t(entry, updRes.removeVersion()));
}
- // Create only once.
- if (retVal == null) {
- Object ret = updRes.oldValue();
+ if (op == TRANSFORM) {
+ assert req.returnValue();
- if (op == TRANSFORM && writeVal instanceof GridCacheTransformComputeClosure) {
- assert req.returnValue();
+ if (retVal == null) {
+ computedMap = U.newHashMap(keys.size());
- ret = ((GridCacheTransformComputeClosure<V, ?>)writeVal).returnValue();
+ retVal = new GridCacheReturn<>((Object)computedMap, updRes.success());
}
- retVal = new GridCacheReturn<>(req.returnValue() ? ret : null, updRes.success());
+ computedMap.put(k, updRes.computedResult());
+ }
+ else {
+ // Create only once.
+ if (retVal == null) {
+ Object ret = updRes.oldValue();
+
+ retVal = new GridCacheReturn<>(req.returnValue() ? ret : null, updRes.success());
+ }
}
}
catch (IgniteCheckedException e) {
@@ -1628,7 +1791,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param node Originating node.
* @param putMap Values to put.
* @param rmvKeys Keys to remove.
- * @param transformMap Transform closures.
+ * @param entryProcessorMap Entry processors.
* @param dhtFut DHT update future if has backups.
* @param completionCb Completion callback to invoke when DHT future is completed.
* @param req Request.
@@ -1648,7 +1811,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ClusterNode node,
@Nullable Map<K, V> putMap,
@Nullable Collection<K> rmvKeys,
- @Nullable Map<K, IgniteClosure<V, V>> transformMap,
+ @Nullable Map<K, EntryProcessor<K, V, ?>> entryProcessorMap,
@Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut,
CI2<GridNearAtomicUpdateRequest<K, V>, GridNearAtomicUpdateResponse<K, V>> completionCb,
final GridNearAtomicUpdateRequest<K, V> req,
@@ -1740,6 +1903,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op,
writeVal,
null,
+ null,
false,
false,
expiry,
@@ -1784,13 +1948,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
byte[] valBytes = valBytesTuple.getIfMarshaled();
- IgniteClosure<V, V> transformC = transformMap == null ? null : transformMap.get(entry.key());
+ EntryProcessor<K, V, ?> entryProcessor =
+ entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
if (!batchRes.readersOnly())
dhtFut.addWriteEntry(entry,
writeVal,
valBytes,
- transformC,
+ entryProcessor,
updRes.newTtl(),
-1,
null);
@@ -1800,7 +1965,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entry,
writeVal,
valBytes,
- transformC,
+ entryProcessor,
updRes.newTtl(),
-1);
}
@@ -2086,6 +2251,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.operation(),
req.keys(),
vals,
+ req.invokeArguments(),
drPutVals,
drRmvVals,
req.returnValue(),
@@ -2229,9 +2395,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
V val = req.value(i);
byte[] valBytes = req.valueBytes(i);
- IgniteClosure<V, V> transform = req.transformClosure(i);
+ EntryProcessor<K, V, ?> entryProcessor = req.entryProcessor(i);
- GridCacheOperation op = transform != null ? TRANSFORM :
+ GridCacheOperation op = entryProcessor != null ? TRANSFORM :
(val != null || valBytes != null) ?
UPDATE :
DELETE;
@@ -2247,8 +2413,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
nodeId,
nodeId,
op,
- op == TRANSFORM ? transform : val,
+ op == TRANSFORM ? entryProcessor : val,
valBytes,
+ op == TRANSFORM ? req.invokeArguments() : null,
/*write-through*/false,
/*retval*/false,
/*expiry policy*/null,
@@ -2487,12 +2654,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** */
private boolean readersOnly;
+ /** */
+ private Map<Object, Object> invokeRes;
+
/**
* @param entry Entry.
* @param updRes Entry update result.
* @param entries All entries.
*/
- private void addDeleted(GridDhtCacheEntry<K, V> entry, GridCacheUpdateAtomicResult<K, V> updRes,
+ private void addDeleted(GridDhtCacheEntry<K, V> entry,
+ GridCacheUpdateAtomicResult<K, V> updRes,
Collection<GridDhtCacheEntry<K, V>> entries) {
if (updRes.removeVersion() != null) {
if (deleted == null)
@@ -2517,6 +2688,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * @param invokeRes Result for invoke operation.
+ */
+ private void invokeResult(Map<Object, Object> invokeRes) {
+ this.invokeRes = invokeRes;
+ }
+
+ /**
+ * @return Result for invoke operation.
+ */
+ Map<Object, Object> invokeResults() {
+ return invokeRes;
+ }
+
+ /**
* @param dhtFut DHT future.
*/
private void dhtFuture(@Nullable GridDhtAtomicUpdateFuture<K, V> dhtFut) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 8602ae3..88cc8f6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -22,6 +22,7 @@ import org.gridgain.grid.util.typedef.internal.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
@@ -197,7 +198,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
* @param entry Entry to map.
* @param val Value to write.
* @param valBytes Value bytes.
- * @param transformC Transform closure.
+ * @param entryProcessor Entry processor.
* @param ttl TTL (optional).
* @param drExpireTime DR expire time (optional).
* @param drVer DR version (optional).
@@ -205,7 +206,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
public void addWriteEntry(GridDhtCacheEntry<K, V> entry,
@Nullable V val,
@Nullable byte[] valBytes,
- IgniteClosure<V, V> transformC,
+ EntryProcessor<K, V, ?> entryProcessor,
long ttl,
long drExpireTime,
@Nullable GridCacheVersion drVer) {
@@ -236,7 +237,8 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
topVer,
forceTransformBackups,
this.updateReq.subjectId(),
- this.updateReq.taskNameHash());
+ this.updateReq.taskNameHash(),
+ forceTransformBackups ? this.updateReq.invokeArguments() : null);
mappings.put(nodeId, updateReq);
}
@@ -245,7 +247,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
entry.keyBytes(),
val,
valBytes,
- transformC,
+ entryProcessor,
ttl,
drExpireTime,
drVer);
@@ -258,7 +260,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
* @param entry Entry.
* @param val Value.
* @param valBytes Value bytes.
- * @param transformC Transform closure.
+ * @param entryProcessor Entry processor..
* @param ttl TTL for near cache update (optional).
* @param expireTime Expire time for near cache update (optional).
*/
@@ -266,7 +268,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
GridDhtCacheEntry<K, V> entry,
@Nullable V val,
@Nullable byte[] valBytes,
- IgniteClosure<V, V> transformC,
+ EntryProcessor<K, V, ?> entryProcessor,
long ttl,
long expireTime) {
GridCacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
@@ -294,7 +296,8 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
topVer,
forceTransformBackups,
this.updateReq.subjectId(),
- this.updateReq.taskNameHash());
+ this.updateReq.taskNameHash(),
+ forceTransformBackups ? this.updateReq.invokeArguments() : null);
mappings.put(nodeId, updateReq);
}
@@ -308,7 +311,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
entry.keyBytes(),
val,
valBytes,
- transformC,
+ entryProcessor,
ttl,
expireTime);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 2b68734..9441bd3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -10,7 +10,6 @@
package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic;
import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
@@ -20,6 +19,7 @@ import org.gridgain.grid.util.tostring.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
+import javax.cache.processor.*;
import java.io.*;
import java.nio.*;
import java.util.*;
@@ -111,23 +111,30 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
@GridDirectVersion(2)
private boolean forceTransformBackups;
- /** Transform closures. */
+ /** Entry processors. */
@GridDirectTransient
- private List<IgniteClosure<V, V>> transformClos;
+ private List<EntryProcessor<K, V, ?>> entryProcessors;
- /** Transform closure bytes. */
+ /** Entry processors bytes. */
@GridDirectCollection(byte[].class)
@GridDirectVersion(2)
- private List<byte[]> transformClosBytes;
+ private List<byte[]> entryProcessorsBytes;
/** Near transform closures. */
@GridDirectTransient
- private List<IgniteClosure<V, V>> nearTransformClos;
+ private List<EntryProcessor<K, V, ?>> nearEntryProcessors;
/** Near transform closures bytes. */
@GridDirectCollection(byte[].class)
@GridDirectVersion(2)
- private List<byte[]> nearTransformClosBytes;
+ private List<byte[]> nearEntryProcessorsBytes;
+
+ /** Optional arguments for entry processor. */
+ @GridDirectTransient
+ private Object[] invokeArgs;
+
+ /** Filter bytes. */
+ private byte[][] invokeArgsBytes;
/** Subject ID. */
@GridDirectVersion(3)
@@ -151,6 +158,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
* @param nodeId Node ID.
* @param futVer Future version.
* @param writeVer Write version for cache values.
+ * @param invokeArgs Optional arguments for entry processor.
* @param syncMode Cache write synchronization mode.
* @param topVer Topology version.
* @param forceTransformBackups Force transform backups flag.
@@ -166,8 +174,11 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
long topVer,
boolean forceTransformBackups,
UUID subjId,
- int taskNameHash
+ int taskNameHash,
+ Object[] invokeArgs
) {
+ assert invokeArgs == null || forceTransformBackups;
+
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futVer = futVer;
@@ -177,13 +188,14 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
this.forceTransformBackups = forceTransformBackups;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
+ this.invokeArgs = invokeArgs;
keys = new ArrayList<>();
keyBytes = new ArrayList<>();
if (forceTransformBackups) {
- transformClos = new ArrayList<>();
- transformClosBytes = new ArrayList<>();
+ entryProcessors = new ArrayList<>();
+ entryProcessorsBytes = new ArrayList<>();
}
else {
vals = new ArrayList<>();
@@ -203,7 +215,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
* @param keyBytes Key bytes, if key was already serialized.
* @param val Value, {@code null} if should be removed.
* @param valBytes Value bytes, {@code null} if should be removed.
- * @param transformC Transform closure.
+ * @param entryProcessor Entry processor.
* @param ttl TTL (optional).
* @param drExpireTime DR expire time (optional).
* @param drVer DR version (optional).
@@ -212,15 +224,15 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
@Nullable byte[] keyBytes,
@Nullable V val,
@Nullable byte[] valBytes,
- IgniteClosure<V, V> transformC,
+ EntryProcessor<K, V, ?> entryProcessor,
long ttl,
long drExpireTime,
@Nullable GridCacheVersion drVer) {
keys.add(key);
this.keyBytes.add(keyBytes);
- if (forceTransformBackups && transformC != null)
- transformClos.add(transformC);
+ if (forceTransformBackups && entryProcessor != null)
+ entryProcessors.add(entryProcessor);
else {
vals.add(val);
this.valBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null);
@@ -270,7 +282,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
* @param keyBytes Key bytes, if key was already serialized.
* @param val Value, {@code null} if should be removed.
* @param valBytes Value bytes, {@code null} if should be removed.
- * @param transformC Transform closure.
+ * @param entryProcessor Entry processor.
* @param ttl TTL.
* @param expireTime Expire time.
*/
@@ -278,7 +290,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
@Nullable byte[] keyBytes,
@Nullable V val,
@Nullable byte[] valBytes,
- IgniteClosure<V, V> transformC,
+ EntryProcessor<K, V, ?> entryProcessor,
long ttl,
long expireTime)
{
@@ -287,8 +299,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
nearKeyBytes = new ArrayList<>();
if (forceTransformBackups) {
- nearTransformClos = new ArrayList<>();
- nearTransformClosBytes = new ArrayList<>();
+ nearEntryProcessors = new ArrayList<>();
+ nearEntryProcessorsBytes = new ArrayList<>();
}
else {
nearVals = new ArrayList<>();
@@ -300,9 +312,9 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
nearKeyBytes.add(keyBytes);
if (forceTransformBackups) {
- assert transformC != null;
+ assert entryProcessor != null;
- nearTransformClos.add(transformC);
+ nearEntryProcessors.add(entryProcessor);
}
else {
nearVals.add(val);
@@ -465,10 +477,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
/**
* @param idx Key index.
- * @return Transform closure.
+ * @return Entry processor.
*/
- @Nullable public IgniteClosure<V, V> transformClosure(int idx) {
- return transformClos == null ? null : transformClos.get(idx);
+ @Nullable public EntryProcessor<K, V, ?> entryProcessor(int idx) {
+ return entryProcessors == null ? null : entryProcessors.get(idx);
}
/**
@@ -497,8 +509,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
* @param idx Key index.
* @return Transform closure.
*/
- @Nullable public IgniteClosure<V, V> nearTransformClosure(int idx) {
- return nearTransformClos == null ? null : nearTransformClos.get(idx);
+ @Nullable public EntryProcessor<K, V, ?> nearEntryProcessor(int idx) {
+ return nearEntryProcessors == null ? null : nearEntryProcessors.get(idx);
}
/**
@@ -615,6 +627,13 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
return -1L;
}
+ /**
+ * @return Optional arguments for entry processor.
+ */
+ @Nullable public Object[] invokeArguments() {
+ return invokeArgs;
+ }
+
/** {@inheritDoc}
* @param ctx*/
@Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
@@ -623,14 +642,17 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
keyBytes = marshalCollection(keys, ctx);
valBytes = marshalValuesCollection(vals, ctx);
- if (forceTransformBackups)
- transformClosBytes = marshalCollection(transformClos, ctx);
+ if (forceTransformBackups) {
+ invokeArgsBytes = marshalInvokeArguments(invokeArgs, ctx);
+
+ entryProcessorsBytes = marshalCollection(entryProcessors, ctx);
+ }
nearKeyBytes = marshalCollection(nearKeys, ctx);
nearValBytes = marshalValuesCollection(nearVals, ctx);
if (forceTransformBackups)
- nearTransformClosBytes = marshalCollection(nearTransformClos, ctx);
+ nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, ctx);
}
/** {@inheritDoc} */
@@ -640,14 +662,17 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
keys = unmarshalCollection(keyBytes, ctx, ldr);
vals = unmarshalValueBytesCollection(valBytes, ctx, ldr);
- if (forceTransformBackups)
- transformClos = unmarshalCollection(transformClosBytes, ctx, ldr);
+ if (forceTransformBackups) {
+ entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+ invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+ }
nearKeys = unmarshalCollection(nearKeyBytes, ctx, ldr);
nearVals = unmarshalValueBytesCollection(nearValBytes, ctx, ldr);
if (forceTransformBackups)
- nearTransformClos = unmarshalCollection(nearTransformClosBytes, ctx, ldr);
+ nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
}
/** {@inheritDoc} */
@@ -683,10 +708,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
_clone.nearVals = nearVals;
_clone.nearValBytes = nearValBytes;
_clone.forceTransformBackups = forceTransformBackups;
- _clone.transformClos = transformClos;
- _clone.transformClosBytes = transformClosBytes;
- _clone.nearTransformClos = nearTransformClos;
- _clone.nearTransformClosBytes = nearTransformClosBytes;
+ _clone.entryProcessors = entryProcessors;
+ _clone.entryProcessorsBytes = entryProcessorsBytes;
+ _clone.nearEntryProcessors = nearEntryProcessors;
+ _clone.nearEntryProcessorsBytes = nearEntryProcessorsBytes;
_clone.nearExpireTimes = nearExpireTimes;
_clone.nearTtls = nearTtls;
_clone.subjId = subjId;
@@ -872,7 +897,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
if (commState.cur == NULL)
commState.cur = commState.it.next();
- if (!commState.putValueBytes((GridCacheValueBytes)commState.cur))
+ if (!commState.putValueBytes((GridCacheValueBytes) commState.cur))
return false;
commState.cur = NULL;
@@ -893,12 +918,12 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
case 16:
- if (nearTransformClosBytes != null) {
+ if (nearEntryProcessorsBytes != null) {
if (commState.it == null) {
- if (!commState.putInt(nearTransformClosBytes.size()))
+ if (!commState.putInt(nearEntryProcessorsBytes.size()))
return false;
- commState.it = nearTransformClosBytes.iterator();
+ commState.it = nearEntryProcessorsBytes.iterator();
}
while (commState.it.hasNext() || commState.cur != NULL) {
@@ -920,12 +945,12 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
commState.idx++;
case 17:
- if (transformClosBytes != null) {
+ if (entryProcessorsBytes != null) {
if (commState.it == null) {
- if (!commState.putInt(transformClosBytes.size()))
+ if (!commState.putInt(entryProcessorsBytes.size()))
return false;
- commState.it = transformClosBytes.iterator();
+ commState.it = entryProcessorsBytes.iterator();
}
while (commState.it.hasNext() || commState.cur != NULL) {
@@ -1213,8 +1238,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
}
if (commState.readSize >= 0) {
- if (nearTransformClosBytes == null)
- nearTransformClosBytes = new ArrayList<>(commState.readSize);
+ if (nearEntryProcessorsBytes == null)
+ nearEntryProcessorsBytes = new ArrayList<>(commState.readSize);
for (int i = commState.readItems; i < commState.readSize; i++) {
byte[] _val = commState.getByteArray();
@@ -1222,7 +1247,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
if (_val == BYTE_ARR_NOT_READ)
return false;
- nearTransformClosBytes.add((byte[])_val);
+ nearEntryProcessorsBytes.add((byte[]) _val);
commState.readItems++;
}
@@ -1242,8 +1267,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
}
if (commState.readSize >= 0) {
- if (transformClosBytes == null)
- transformClosBytes = new ArrayList<>(commState.readSize);
+ if (entryProcessorsBytes == null)
+ entryProcessorsBytes = new ArrayList<>(commState.readSize);
for (int i = commState.readItems; i < commState.readSize; i++) {
byte[] _val = commState.getByteArray();
@@ -1251,7 +1276,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
if (_val == BYTE_ARR_NOT_READ)
return false;
- transformClosBytes.add((byte[])_val);
+ entryProcessorsBytes.add((byte[])_val);
commState.readItems++;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/71ee2ee1/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 6b233e7..1bc013a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -70,6 +70,9 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<?> vals;
+ /** Optional arguments for entry processor. */
+ private Object[] invokeArgs;
+
/** DR put values. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheDrInfo<V>> drPutVals;
@@ -158,6 +161,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
* @param syncMode Write synchronization mode.
* @param keys Keys to update.
* @param vals Values or transform closure.
+ * @param invokeArgs Optional arguments for entry processor.
* @param drPutVals DR put values (optional).
* @param drRmvVals DR remove values (optional).
* @param retval Return value require flag.
@@ -175,6 +179,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
GridCacheOperation op,
Collection<? extends K> keys,
@Nullable Collection<?> vals,
+ @Nullable Object[] invokeArgs,
@Nullable Collection<GridCacheDrInfo<V>> drPutVals,
@Nullable Collection<GridCacheVersion> drRmvVals,
final boolean retval,
@@ -186,6 +191,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
int taskNameHash
) {
super(cctx.kernalContext());
+
this.rawRetval = rawRetval;
assert vals == null || vals.size() == keys.size();
@@ -200,6 +206,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
this.op = op;
this.keys = keys;
this.vals = vals;
+ this.invokeArgs = invokeArgs;
this.drPutVals = drPutVals;
this.drRmvVals = drRmvVals;
this.retval = retval;
@@ -366,7 +373,12 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
if (res.error() != null)
addFailedKeys(req.keys(), res.error());
else {
- if (req.fastMap() && req.hasPrimary())
+ if (op == TRANSFORM) {
+ assert !req.fastMap();
+
+ addInvokeResults(res.returnValue());
+ }
+ else if (req.fastMap() && req.hasPrimary())
opRes = res.returnValue();
}
@@ -464,7 +476,9 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
* @param remap Flag indicating if this is partial remap for this future.
* @param oldNodeId Old node ID if was remap.
*/
- private void map0(GridDiscoveryTopologySnapshot topSnapshot, Collection<? extends K> keys, boolean remap,
+ private void map0(GridDiscoveryTopologySnapshot topSnapshot,
+ Collection<? extends K> keys,
+ boolean remap,
@Nullable UUID oldNodeId) {
assert oldNodeId == null || remap;
@@ -560,6 +574,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
retval,
op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP),
expiryPlc,
+ invokeArgs,
filter,
subjId,
taskNameHash);
@@ -666,6 +681,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
retval,
op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP),
expiryPlc,
+ invokeArgs,
filter,
subjId,
taskNameHash);
@@ -820,6 +836,22 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
}
/**
+ * @param ret Result from single node.
+ */
+ private synchronized void addInvokeResults(GridCacheReturn<Object> ret) {
+ assert op == TRANSFORM : op;
+ assert ret.value() instanceof Map : ret.value();
+
+ if (opRes != null) {
+ Map<Object, Object> map = (Map<Object, Object>)opRes.value();
+
+ map.putAll((Map<Object, Object>)ret.value());
+ }
+ else
+ opRes = ret;
+ }
+
+ /**
* @param failedKeys Failed keys.
* @param err Error cause.
* @return Root {@link GridCachePartialUpdateException}.