You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/25 15:29:39 UTC
[3/3] incubator-ignite git commit: # ignite-44
# ignite-44
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bcb30d10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bcb30d10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bcb30d10
Branch: refs/heads/ignite-44
Commit: bcb30d10471e2764b8b6e2928cdb50f04971b618
Parents: 928aa3d
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 25 10:33:52 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 25 17:29:09 2014 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 176 +++++++++++++
.../cache/CachePartialUpdateException.java | 36 +++
.../processors/cache/IgniteCacheProxy.java | 255 ++++++++++++++++---
.../grid/cache/GridCacheInterceptor.java | 8 +-
.../cache/GridCachePartialUpdateException.java | 1 +
.../processors/cache/GridCacheAdapter.java | 83 +++++-
.../processors/cache/GridCacheEntryEx.java | 6 +-
.../processors/cache/GridCacheMapEntry.java | 38 ++-
.../processors/cache/GridCacheProcessor.java | 16 +-
.../processors/cache/GridCacheProjectionEx.java | 26 +-
.../cache/GridCacheProjectionImpl.java | 21 +-
.../processors/cache/GridCacheProxyImpl.java | 36 ++-
.../processors/cache/GridCacheStoreManager.java | 7 +-
.../dht/atomic/GridDhtAtomicCache.java | 56 ++--
.../distributed/near/GridNearAtomicCache.java | 28 ++
.../local/atomic/GridLocalAtomicCache.java | 229 +++++++++++++++--
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../transactions/IgniteTxLocalAdapter.java | 49 +++-
.../cache/transactions/IgniteTxLocalEx.java | 2 +-
.../processors/ggfs/GridGgfsDataManager.java | 26 +-
.../processors/ggfs/GridGgfsMetaManager.java | 72 ++++--
.../cache/IgniteCacheAtomicInvokeTest.java | 47 ++++
.../cache/IgniteCacheAtomicLocalInvokeTest.java | 41 +++
...niteCacheAtomicLocalWithStoreInvokeTest.java | 22 ++
.../IgniteCacheAtomicNearEnabledInvokeTest.java | 24 ++
.../cache/IgniteCacheInvokeAbstractTest.java | 21 +-
.../cache/IgniteCacheTxLocalInvokeTest.java | 41 +++
.../IgniteCacheTxNearEnabledInvokeTest.java | 24 ++
.../IgniteCacheExpiryPolicyAbstractTest.java | 7 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 171 ++++++-------
...idCacheGetAndTransformStoreAbstractTest.java | 21 +-
.../cache/GridCacheIncrementTransformTest.java | 19 +-
.../GridCacheInterceptorAbstractSelfTest.java | 66 +++--
...ridCacheMultinodeUpdateAbstractSelfTest.java | 28 +-
...HeapMultiThreadedUpdateAbstractSelfTest.java | 21 +-
...CacheOffHeapMultiThreadedUpdateSelfTest.java | 9 +-
.../GridCacheOffHeapTieredAbstractSelfTest.java | 45 ++--
...heOffHeapTieredEvictionAbstractSelfTest.java | 32 ++-
.../GridCacheReturnValueTransferSelfTest.java | 33 ++-
.../processors/cache/GridCacheTestEntryEx.java | 3 +-
.../IgniteTxExceptionAbstractSelfTest.java | 16 +-
.../IgniteTxStoreExceptionAbstractSelfTest.java | 25 +-
.../GridCacheTransformEventSelfTest.java | 68 +++--
.../IgniteTxPreloadAbstractTest.java | 44 +++-
...heAbstractTransformWriteThroughSelfTest.java | 37 ++-
.../dht/GridCacheAtomicNearCacheSelfTest.java | 72 +++---
...GridCacheValueConsistencyAtomicSelfTest.java | 23 +-
.../testframework/junits/GridAbstractTest.java | 3 +-
.../junits/common/GridCommonAbstractTest.java | 50 +++-
.../bamboo/GridDataGridTestSuite.java | 15 +-
.../hadoop/jobtracker/GridHadoopJobTracker.java | 124 +++++----
51 files changed, 1801 insertions(+), 524 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index a59573e..f51c237 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -13,6 +13,7 @@ import org.apache.ignite.cache.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
import org.jetbrains.annotations.*;
import javax.cache.*;
@@ -298,4 +299,179 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* @return Cache size on this node.
*/
public int localSize(CachePeekMode... peekModes);
+
+ /**
+ * Stores given key-value pair in cache. If filters are provided, then entries will
+ * be stored in cache only if they pass the filter. Note that filter check is atomic,
+ * so value stored in cache is guaranteed to be consistent with the filters. If cache
+ * previously contained value for the given key, then this value is returned.
+ * In case of {@link GridCacheMode#PARTITIONED} or {@link GridCacheMode#REPLICATED} caches,
+ * the value will be loaded from the primary node, which in its turn may load the value
+ * from the swap storage, and consecutively, if it's not in swap,
+ * from the underlying persistent storage. If value has to be loaded from persistent
+ * storage, {@link org.gridgain.grid.cache.store.GridCacheStore#load(IgniteTx, Object)} method will be used.
+ * <p>
+ * If the returned value is not needed, method {@link #putIf(Object, Object, IgnitePredicate)} should
+ * always be used instead of this one to avoid the overhead associated with returning of the previous value.
+ * <p>
+ * If write-through is enabled, the stored value will be persisted to {@link org.gridgain.grid.cache.store.GridCacheStore}
+ * via {@link org.gridgain.grid.cache.store.GridCacheStore#put(IgniteTx, Object, Object)} method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ * <h2 class="header">Cache Flags</h2>
+ * This method is not available if any of the following flags are set on projection:
+ * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}.
+ *
+ * @param key Key to store in cache.
+ * @param val Value to be associated with the given key.
+ * @param filter Optional filter to check prior to putting value in cache. Note
+ * that filter check is atomic with put operation.
+ * @return Previous value associated with specified key, or {@code null}
+ * if entry did not pass the filter, or if there was no mapping for the key in swap
+ * or in persistent storage.
+ * @throws NullPointerException If either key or value are {@code null}.
+ * @throws GridCacheFlagException If projection flags validation failed.
+ */
+ // TODO IGNITE-1 fix entry type.
+ @Nullable public V getAndPutIf(K key, V val, @Nullable IgnitePredicate<GridCacheEntry<K, V>> filter);
+
+ /**
+ * Stores given key-value pair in cache. If filters are provided, then entries will
+ * be stored in cache only if they pass the filter. Note that filter check is atomic,
+ * so value stored in cache is guaranteed to be consistent with the filters.
+ * <p>
+ * This method will return {@code true} if value is stored in cache and {@code false} otherwise.
+ * Unlike {@link #getAndPutIf(Object, Object, IgnitePredicate)} method, it does not return previous
+ * value and, therefore, does not have any overhead associated with returning a value. It
+ * should be used whenever return value is not required.
+ * <p>
+ * If write-through is enabled, the stored value will be persisted to {@link org.gridgain.grid.cache.store.GridCacheStore}
+ * via {@link org.gridgain.grid.cache.store.GridCacheStore#put(IgniteTx, Object, Object)} method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ * <h2 class="header">Cache Flags</h2>
+ * This method is not available if any of the following flags are set on projection:
+ * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}.
+ *
+ * @param key Key to store in cache.
+ * @param val Value to be associated with the given key.
+ * @param filter Optional filter to check prior to putting value in cache. Note
+ * that filter check is atomic with put operation.
+ * @return {@code True} if optional filter passed and value was stored in cache,
+ * {@code false} otherwise. Note that this method will return {@code true} if filter is not
+ * specified.
+ * @throws NullPointerException If either key or value are {@code null}.
+ * @throws GridCacheFlagException If projection flags validation failed.
+ */
+ // TODO IGNITE-1 fix entry type.
+ public boolean putIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter);
+
+ /**
+ * Removes given key mapping from cache. If cache previously contained value for the given key,
+ * then this value is returned. In case of {@link GridCacheMode#PARTITIONED} or {@link GridCacheMode#REPLICATED}
+ * caches, the value will be loaded from the primary node, which in its turn may load the value
+ * from the disk-based swap storage, and consecutively, if it's not in swap,
+ * from the underlying persistent storage. If value has to be loaded from persistent
+ * storage, {@link org.gridgain.grid.cache.store.GridCacheStore#load(IgniteTx, Object)} method will be used.
+ * <p>
+ * If the returned value is not needed, method {@link #removeIf(Object, IgnitePredicate)} should
+ * always be used instead of this one to avoid the overhead associated with returning of the
+ * previous value.
+ * <p>
+ * If write-through is enabled, the value will be removed from {@link org.gridgain.grid.cache.store.GridCacheStore}
+ * via {@link org.gridgain.grid.cache.store.GridCacheStore#remove(IgniteTx, Object)} method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ * <h2 class="header">Cache Flags</h2>
+ * This method is not available if any of the following flags are set on projection:
+ * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}.
+ *
+ * @param key Key whose mapping is to be removed from cache.
+ * @param filter Optional filter to check prior to removing value form cache. Note
+ * that filter is checked atomically together with remove operation.
+ * @return Previous value associated with specified key, or {@code null}
+ * if there was no value for this key.
+ * @throws NullPointerException If key is {@code null}.
+ * @throws GridCacheFlagException If projection flags validation failed.
+ */
+ // TODO IGNITE-1 fix entry type.
+ public V getAndRemoveIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter);
+
+ /**
+ * Removes given key mapping from cache.
+ * <p>
+ * This method will return {@code true} if remove did occur, which means that all optionally
+ * provided filters have passed and there was something to remove, {@code false} otherwise.
+ * <p>
+ * If write-through is enabled, the value will be removed from {@link org.gridgain.grid.cache.store.GridCacheStore}
+ * via {@link org.gridgain.grid.cache.store.GridCacheStore#remove(IgniteTx, Object)} method.
+ * <h2 class="header">Transactions</h2>
+ * This method is transactional and will enlist the entry into ongoing transaction
+ * if there is one.
+ * <h2 class="header">Cache Flags</h2>
+ * This method is not available if any of the following flags are set on projection:
+ * {@link GridCacheFlag#LOCAL}, {@link GridCacheFlag#READ}.
+ *
+ * @param key Key whose mapping is to be removed from cache.
+ * @param filter Optional filter to check prior to removing value form cache. Note
+ * that filter is checked atomically together with remove operation.
+ * @return {@code True} if filter passed validation and entry was removed, {@code false} otherwise.
+ * Note that if filter is not specified, this method will return {@code true}.
+ * @throws NullPointerException if the key is {@code null}.
+ * @throws GridCacheFlagException If projection flags validation failed.
+ */
+ // TODO IGNITE-1 fix entry type.
+ public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter);
+
+ /**
+ * Creates projection that will operate with portable objects.
+ * <p>
+ * Projection returned by this method will force cache not to deserialize portable objects,
+ * so keys and values will be returned from cache API methods without changes. Therefore,
+ * signature of the projection can contain only following types:
+ * <ul>
+ * <li>{@link org.apache.ignite.portables.PortableObject} for portable classes</li>
+ * <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li>
+ * <li>Arrays of primitives (byte[], int[], ...)</li>
+ * <li>{@link String} and array of {@link String}s</li>
+ * <li>{@link UUID} and array of {@link UUID}s</li>
+ * <li>{@link Date} and array of {@link Date}s</li>
+ * <li>{@link java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li>
+ * <li>Enums and array of enums</li>
+ * <li>
+ * Maps, collections and array of objects (but objects inside
+ * them will still be converted if they are portable)
+ * </li>
+ * </ul>
+ * <p>
+ * For example, if you use {@link Integer} as a key and {@code Value} class as a value
+ * (which will be stored in portable format), you should acquire following projection
+ * to avoid deserialization:
+ * <pre>
+ * GridCacheProjection<Integer, GridPortableObject> prj = cache.keepPortable();
+ *
+ * // Value is not deserialized and returned in portable format.
+ * GridPortableObject po = prj.get(1);
+ * </pre>
+ * <p>
+ * Note that this method makes sense only if cache is working in portable mode
+ * ({@link org.gridgain.grid.cache.GridCacheConfiguration#isPortableEnabled()} returns {@code true}. If not,
+ * this method is no-op and will return current projection.
+ *
+ * @return Projection for portable objects.
+ */
+ public <K1, V1> IgniteCache<K1, V1> keepPortable();
+
+ /**
+ * Gets cache projection base on this one, but with the specified flags turned on.
+ * <h1 class="header">Cache Flags</h1>
+ * The resulting projection will inherit all the flags from this projection.
+ *
+ * @param flags Flags to turn on (if empty, then no-op).
+ * @return New projection based on this one, but with the specified flags turned on.
+ */
+ public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java b/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java
new file mode 100644
index 0000000..08ce72e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CachePartialUpdateException.java
@@ -0,0 +1,36 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.cache;
+
+import org.gridgain.grid.cache.*;
+
+import javax.cache.*;
+import java.util.*;
+
+/**
+ * Exception thrown from non-transactional cache in case when update succeeded only partially.
+ * One can get list of keys for which update failed with method {@link #failedKeys()}.
+ */
+public class CachePartialUpdateException extends CacheException {
+ /**
+ * @param e Cause.
+ */
+ public CachePartialUpdateException(GridCachePartialUpdateException e) {
+ super(e.getMessage(), e);
+ }
+
+ /**
+ * Gets collection of failed keys.
+ * @return Collection of failed keys.
+ */
+ public <K> Collection<K> failedKeys() {
+ return ((GridCachePartialUpdateException)getCause()).failedKeys();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 410fb9a..f7c157f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -13,8 +13,10 @@ import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.lang.*;
+import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
@@ -30,7 +32,7 @@ import java.util.concurrent.locks.*;
/**
* Cache proxy.
*/
-public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable {
+public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements IgniteCache<K, V>, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -51,10 +53,14 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
* @param ctx Context.
* @param delegate Delegate.
* @param prj Projection.
+ * @param async Async support flag.
*/
public IgniteCacheProxy(GridCacheContext<K, V> ctx,
GridCacheProjectionEx<K, V> delegate,
- @Nullable GridCacheProjectionImpl<K, V> prj) {
+ @Nullable GridCacheProjectionImpl<K, V> prj,
+ boolean async) {
+ super(async);
+
assert ctx != null;
assert delegate != null;
@@ -84,7 +90,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
try {
GridCacheProjectionEx<K, V> prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc);
- return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0);
+ return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0, isAsync());
}
finally {
gate.leave(prev);
@@ -98,12 +104,81 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
/** {@inheritDoc} */
- @Override public void localLoadCache(@Nullable IgniteBiPredicate p, @Nullable Object... args) throws CacheException {
+ @Override public void localLoadCache(@Nullable IgniteBiPredicate p, @Nullable Object... args)
+ throws CacheException {
// TODO IGNITE-1.
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
+ @Nullable @Override public V getAndPutIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) {
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.put(key, val, filter);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean putIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) {
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.putx(key, val, filter);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getAndRemoveIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) {
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.remove(key, filter);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) {
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.removex(key, filter);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
try {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
@@ -116,7 +191,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -215,7 +290,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -257,7 +332,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -274,7 +349,44 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
+ }
+ }
+
+ /**
+ * @param keys Keys.
+ * @return Values map.
+ */
+ public Map<K, V> getAll(Collection<? extends K> keys) {
+ try {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.getAll(keys);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /**
+ * Gets entry set containing internal entries.
+ *
+ * @param filter Filter.
+ * @return Entry set.
+ */
+ public Set<GridCacheEntry<K, V>> entrySetx(IgnitePredicate<GridCacheEntry<K, V>> filter) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.entrySetx(filter);
+ }
+ finally {
+ gate.leave(prev);
}
}
@@ -305,7 +417,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -322,7 +434,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -339,7 +451,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -356,7 +468,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -373,7 +485,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -390,7 +502,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -407,7 +519,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -424,7 +536,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -441,7 +553,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -458,7 +570,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -475,7 +587,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -498,16 +610,34 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
- EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args).get();
+ if (isAsync()) {
+ IgniteFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args);
+
+ IgniteFuture<T> fut0 = fut.chain(new CX1<IgniteFuture<EntryProcessorResult<T>>, T>() {
+ @Override public T applyx(IgniteFuture<EntryProcessorResult<T>> fut)
+ throws IgniteCheckedException {
+ EntryProcessorResult<T> res = fut.get();
+
+ return res.get();
+ }
+ });
+
+ curFut.set(fut0);
- return res.get();
+ return null;
+ }
+ else {
+ EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args);
+
+ return res.get();
+ }
}
finally {
gate.leave(prev);
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -519,14 +649,14 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
- return delegate.invokeAll(keys, entryProcessor, args).get();
+ return saveOrGet(delegate.invokeAllAsync(keys, entryProcessor, args));
}
finally {
gate.leave(prev);
}
}
catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ throw cacheException(e);
}
}
@@ -618,20 +748,81 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
/** {@inheritDoc} */
@Override public IgniteCache<K, V> enableAsync() {
- // TODO IGNITE-1.
- throw new UnsupportedOperationException();
+ if (isAsync())
+ return this;
+
+ return new IgniteCacheProxy<>(ctx, delegate, prj, true);
+ }
+
+ /**
+ * @param e Checked exception.
+ * @return Cache exception.
+ */
+ private CacheException cacheException(IgniteCheckedException e) {
+ if (e instanceof GridCachePartialUpdateException)
+ return new CachePartialUpdateException((GridCachePartialUpdateException)e);
+
+ return new CacheException(e);
}
/** {@inheritDoc} */
- @Override public boolean isAsync() {
- // TODO IGNITE-1.
- throw new UnsupportedOperationException();
+ @SuppressWarnings("unchecked")
+ @Override public <K1, V1> IgniteCache<K1, V1> keepPortable() {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ GridCacheProjectionImpl<K1, V1> prj0 = new GridCacheProjectionImpl<>(
+ (GridCacheProjection<K1, V1>)(prj != null ? prj : delegate),
+ (GridCacheContext<K1, V1>)ctx,
+ null,
+ null,
+ prj != null ? prj.flags() : null,
+ prj != null ? prj.subjectId() : null,
+ true,
+ prj != null ? prj.expiry() : null);
+
+ return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx,
+ prj0,
+ prj0,
+ isAsync());
+ }
+ finally {
+ gate.leave(prev);
+ }
}
/** {@inheritDoc} */
- @Override public <R> IgniteFuture<R> future() {
- // TODO IGNITE-1.
- throw new UnsupportedOperationException();
+ @Override public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ Set<GridCacheFlag> res = EnumSet.noneOf(GridCacheFlag.class);
+
+ Set<GridCacheFlag> flags0 = prj !=null ? prj.flags() : null;
+
+ if (flags0 != null && !flags0.isEmpty())
+ res.addAll(flags0);
+
+ res.addAll(EnumSet.copyOf(F.asList(flags)));
+
+ GridCacheProjectionImpl<K, V> prj0 = new GridCacheProjectionImpl<>(
+ (prj != null ? prj : delegate),
+ ctx,
+ null,
+ null,
+ res,
+ prj != null ? prj.subjectId() : null,
+ true,
+ prj != null ? prj.expiry() : null);
+
+ return new IgniteCacheProxy<>(ctx,
+ prj0,
+ prj0,
+ isAsync());
+ }
+ finally {
+ gate.leave(prev);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java
index cb0192c..b1030bd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheInterceptor.java
@@ -42,7 +42,7 @@ public interface GridCacheInterceptor<K, V> {
@Nullable public V onGet(K key, @Nullable V val);
/**
- * This method is called within {@link GridCacheProjection#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])}
+ * This method is called within {@link GridCacheProjection#put(Object, Object, IgnitePredicate[])}
* and similar operations before new value is stored in cache.
* <p>
* Implementations should not execute any complex logic,
@@ -56,7 +56,7 @@ public interface GridCacheInterceptor<K, V> {
* @param oldVal Old value.
* @param newVal New value.
* @return Value to be put to cache. Returning {@code null} cancels the update.
- * @see GridCacheProjection#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])
+ * @see GridCacheProjection#put(Object, Object, IgnitePredicate[])
*/
@Nullable public V onBeforePut(K key, @Nullable V oldVal, V newVal);
@@ -76,7 +76,7 @@ public interface GridCacheInterceptor<K, V> {
public void onAfterPut(K key, V val);
/**
- * This method is called within {@link GridCacheProjection#remove(Object, org.apache.ignite.lang.IgnitePredicate[])}
+ * This method is called within {@link GridCacheProjection#remove(Object, IgnitePredicate[])}
* and similar operations to provide control over returned value.
* <p>
* Implementations should not execute any complex logic,
@@ -91,7 +91,7 @@ public interface GridCacheInterceptor<K, V> {
* @return Tuple. The first value is the flag whether remove should be cancelled or not.
* The second is the value to be returned as result of {@code remove()} operation,
* may be {@code null}.
- * @see GridCacheProjection#remove(Object, org.apache.ignite.lang.IgnitePredicate[])
+ * @see GridCacheProjection#remove(Object, IgnitePredicate[])
*/
@Nullable public IgniteBiTuple<Boolean, V> onBeforeRemove(K key, @Nullable V val);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java
index dd41e55..015a5a7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java
+++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCachePartialUpdateException.java
@@ -49,6 +49,7 @@ public class GridCachePartialUpdateException extends IgniteCheckedException {
addSuppressed(err);
}
+ /** {@inheritDoc} */
@Override public String getMessage() {
return super.getMessage() + ": " + failedKeys;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 62daeb9..d4028be 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -1420,12 +1420,18 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
* @param reload Reload flag.
* @param tx Transaction.
* @param filter Filter.
+ * @param subjId Subject ID.
+ * @param taskName Task name.
* @param vis Visitor.
* @return Future.
*/
- public IgniteFuture<Object> readThroughAllAsync(final Collection<? extends K> keys, boolean reload,
- @Nullable final IgniteTxEx<K, V> tx, IgnitePredicate<GridCacheEntry<K, V>>[] filter, @Nullable UUID subjId,
- String taskName, final IgniteBiInClosure<K, V> vis) {
+ public IgniteFuture<Object> readThroughAllAsync(final Collection<? extends K> keys,
+ boolean reload,
+ @Nullable final IgniteTxEx<K, V> tx,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+ @Nullable UUID subjId,
+ String taskName,
+ final IgniteBiInClosure<K, V> vis) {
return ctx.closures().callLocalSafe(new GPC<Object>() {
@Nullable @Override public Object call() {
try {
@@ -2194,7 +2200,66 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(
+ @Override public <T> EntryProcessorResult<T> invoke(final K key,
+ final EntryProcessor<K, V, T> entryProcessor,
+ final Object... args)
+ throws IgniteCheckedException {
+ A.notNull(key, "key", entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKey(key);
+
+ ctx.denyOnLocalRead();
+
+ return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
+ @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter<K, V> tx)
+ throws IgniteCheckedException {
+ Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
+ Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
+
+ IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut =
+ tx.invokeAsync(ctx, false, invokeMap, args);
+
+ Map<K, EntryProcessorResult<T>> resMap = fut.get().value();
+
+ assert resMap != null;
+ assert resMap.size() == 1 : resMap.size();
+
+ return resMap.values().iterator().next();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(final Set<? extends K> keys,
+ final EntryProcessor<K, V, T> entryProcessor,
+ final Object... args) throws IgniteCheckedException {
+ A.notNull(keys, "keys", entryProcessor, "entryProcessor");
+
+ if (keyCheck)
+ validateCacheKeys(keys);
+
+ ctx.denyOnLocalRead();
+
+ return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
+ @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ throws IgniteCheckedException {
+ Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
+ @Override public EntryProcessor apply(K k) {
+ return entryProcessor;
+ }
+ });
+
+ IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> fut =
+ tx.invokeAsync(ctx, false, invokeMap, args);
+
+ return fut.get().value();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(
final K key,
final EntryProcessor<K, V, T> entryProcessor,
final Object... args)
@@ -2208,8 +2273,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
IgniteFuture<?> fut = asyncOp(new AsyncInOp(key) {
@Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) {
- Map<? extends K, EntryProcessor> invokeMap =
- Collections.singletonMap(key, (EntryProcessor)entryProcessor);
+ Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
+ Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
return tx.invokeAsync(ctx, false, invokeMap, args);
}
@@ -2238,11 +2303,11 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
final Set<? extends K> keys,
final EntryProcessor<K, V, T> entryProcessor,
final Object... args) {
- A.notNull(entryProcessor, "entryProcessor");
+ A.notNull(keys, "keys", entryProcessor, "entryProcessor");
if (keyCheck)
validateCacheKeys(keys);
@@ -2251,7 +2316,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
IgniteFuture<?> fut = asyncOp(new AsyncInOp(keys) {
@Override public IgniteFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> inOp(IgniteTxLocalAdapter<K, V> tx) {
- Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+ Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
@Override public EntryProcessor apply(K k) {
return entryProcessor;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index 1b71eec..101427f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -450,6 +450,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
* @param ver Cache version.
* @param op Operation.
* @param writeObj Value. Type depends on operation.
+ * @param invokeArgs Optional arguments for EntryProcessor.
* @param writeThrough Write through flag.
* @param retval Return value flag.
* @param expiryPlc Expiry policy..
@@ -459,14 +460,15 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
* @param intercept If {@code true} then calls cache interceptor.
* @param subjId Subject ID initiated this update.
* @param taskName Task name.
- * @return Tuple containing success flag and old value.
+ * @return Tuple containing success flag and operation result.
* @throws IgniteCheckedException If update failed.
* @throws GridCacheEntryRemovedException If entry is obsolete.
*/
- public IgniteBiTuple<Boolean, V> innerUpdateLocal(
+ public IgniteBiTuple<Boolean, Object> innerUpdateLocal(
GridCacheVersion ver,
GridCacheOperation op,
@Nullable Object writeObj,
+ @Nullable Object[] invokeArgs,
boolean writeThrough,
boolean retval,
@Nullable ExpiryPolicy expiryPlc,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index f14bba5..1b9ea20 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -1365,10 +1365,11 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public IgniteBiTuple<Boolean, V> innerUpdateLocal(
+ @Override public IgniteBiTuple<Boolean, Object> innerUpdateLocal(
GridCacheVersion ver,
GridCacheOperation op,
@Nullable Object writeObj,
+ @Nullable Object[] invokeArgs,
boolean writeThrough,
boolean retval,
@Nullable ExpiryPolicy expiryPlc,
@@ -1381,7 +1382,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert cctx.isLocal() && cctx.atomic();
- V old;
+ Object opRes;
boolean res = true;
@@ -1397,7 +1398,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
unswap(true, retval);
// Possibly get old value form store.
- old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+ V old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+
+ opRes = old;
GridCacheValueBytes oldBytes = valueBytesUnlocked();
@@ -1428,7 +1431,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
updateTtl(ttl);
}
- return new IgniteBiTuple<>(false, retval ? old : null);
+ return new IgniteBiTuple<>(false, (Object)(retval ? old : null));
}
}
@@ -1444,11 +1447,24 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
if (op == GridCacheOperation.TRANSFORM) {
transformCloClsName = writeObj.getClass().getName();
- IgniteClosure<V, V> transform = (IgniteClosure<V, V>)writeObj;
+ EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)writeObj;
- assert transform != null;
+ assert entryProcessor != null;
- updated = cctx.unwrapTemporary(transform.apply(old));
+ CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(key, old);
+
+ try {
+ Object computed = entryProcessor.process(entry, invokeArgs);
+
+ updated = cctx.unwrapTemporary(entry.getValue());
+
+ opRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed));
+ }
+ catch (Exception e) {
+ updated = old;
+
+ opRes = new CacheInvokeResult<>(e);
+ }
}
else
updated = (V)writeObj;
@@ -1460,13 +1476,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
updated = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated);
if (updated == null)
- return new IgniteBiTuple<>(false, cctx.<V>unwrapTemporary(old));
+ return new IgniteBiTuple<>(false, (Object)cctx.<V>unwrapTemporary(old));
}
else {
interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key, old);
if (cctx.cancelRemove(interceptorRes))
- return new IgniteBiTuple<>(false, cctx.<V>unwrapTemporary(interceptorRes.get2()));
+ return new IgniteBiTuple<>(false,
+ (Object)cctx.<V>unwrapTemporary(interceptorRes.get2()));
}
}
@@ -1576,7 +1593,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
}
}
- return new IgniteBiTuple<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old));
+ return new IgniteBiTuple<>(res,
+ (Object)(cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : opRes)));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
index 0724a58..70798ff 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
@@ -1599,7 +1599,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (cache == null)
throw new IllegalArgumentException("Cache is not configured: " + name);
- return new IgniteCacheProxy<>(cache.context(), cache, null);
+ return new IgniteCacheProxy<>(cache.context(), cache, null, false);
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache instance for given name.
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> IgniteCacheProxy<K, V> jcache(@Nullable String name) {
+ GridCacheAdapter<K, V> cache = (GridCacheAdapter<K, V>)caches.get(name);
+
+ if (cache == null)
+ throw new IllegalArgumentException("Cache is not configured: " + name);
+
+ return new IgniteCacheProxy<>(cache.context(), cache, null, false);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
index 1a98192..8df7d10 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
@@ -399,9 +399,31 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> {
* @param key Key.
* @param entryProcessor Entry processor.
* @param args Arguments.
+ * @return Invoke result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public <T> EntryProcessorResult<T> invoke(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException;
+
+ /**
+ * @param keys Keys.
+ * @param entryProcessor Entry processor.
+ * @param args Arguments.
+ * @return Invoke results.
+ * @throws IgniteCheckedException If failed.
+ */
+ public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException;
+
+ /**
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param args Arguments.
* @return Future.
*/
- public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args);
@@ -411,7 +433,7 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> {
* @param args Arguments.
* @return Future.
*/
- public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
Object... args);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
index ad5cde3..62b6b72 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
@@ -789,17 +789,30 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
+ throws IgniteCheckedException {
+ return cache.invoke(key, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ return cache.invokeAll(keys, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) {
- return cache.invoke(key, entryProcessor, args);
+ return cache.invokeAsync(key, entryProcessor, args);
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
Object... args) {
- return cache.invokeAll(keys, entryProcessor, args);
+ return cache.invokeAllAsync(keys, entryProcessor, args);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
index 90aeb0b..66f8626 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
@@ -738,9 +738,9 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ @Override public <T> EntryProcessorResult<T> invoke(K key,
EntryProcessor<K, V, T> entryProcessor,
- Object... args) {
+ Object... args) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -752,9 +752,9 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
EntryProcessor<K, V, T> entryProcessor,
- Object... args) {
+ Object... args) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -766,6 +766,34 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.invokeAsync(key, entryProcessor, args);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+ try {
+ return delegate.invokeAllAsync(keys, entryProcessor, args);
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteFuture<Boolean> putxAsync(K key, V val,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
index a7e47b0..b6fe4be 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
@@ -180,7 +180,8 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> {
* @throws IgniteCheckedException If data loading failed.
*/
@SuppressWarnings({"unchecked"})
- public boolean loadAllFromStore(@Nullable IgniteTx tx, Collection<? extends K> keys,
+ public boolean loadAllFromStore(@Nullable IgniteTx tx,
+ Collection<? extends K> keys,
final IgniteBiInClosure<K, V> vis) throws IgniteCheckedException {
if (store != null) {
if (!keys.isEmpty()) {
@@ -230,6 +231,10 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> {
return true;
}
+ else {
+ for (K key : keys)
+ vis.apply(key, null);
+ }
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 78d92f8..0644821 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -455,7 +455,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public void transform(K key, IgniteClosure<V, V> transformer) throws IgniteCheckedException {
- transformAsync(key, transformer).get();
+ //transformAsync(key, transformer).get();
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@@ -482,8 +484,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m) throws IgniteCheckedException {
- transformAllAsync(m).get();
+ @Override public void transformAll(@Nullable Map<? extends K, ? extends IgniteClosure<V, V>> m)
+ throws IgniteCheckedException {
+ //transformAllAsync(m).get();
+ // TODO IGNITE-44.
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@@ -632,8 +637,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
+ throws IgniteCheckedException {
+ return invokeAsync(key, entryProcessor, args).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args)
+ throws IgniteCheckedException {
+ return invokeAllAsync(keys, entryProcessor, args).get();
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public <T> IgniteFuture<EntryProcessorResult<T>> invoke(K key,
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
EntryProcessor<K, V, T> entryProcessor,
Object... args) {
A.notNull(key, "key", entryProcessor, "entryProcessor");
@@ -671,7 +690,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAll(Set<? extends K> keys,
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
final EntryProcessor<K, V, T> entryProcessor,
Object... args) {
A.notNull(keys, "keys", entryProcessor, "entryProcessor");
@@ -701,8 +720,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* Entry point for all public API put/transform methods.
*
- * @param map Put map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed.
- * @param transformMap Transform map. Either {@code map}, {@code transformMap} or {@code drMap} should be passed.
+ * @param map Put map. Either {@code map}, {@code invokeMap} or {@code drMap} should be passed.
+ * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or {@code drMap} should be passed.
* @param invokeArgs Optional arguments for EntryProcessor.
* @param drPutMap DR put map.
* @param drRmvMap DR remove map.
@@ -714,7 +733,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private IgniteFuture updateAllAsync0(
@Nullable final Map<? extends K, ? extends V> map,
- @Nullable final Map<? extends K, EntryProcessor> transformMap,
+ @Nullable final Map<? extends K, EntryProcessor> invokeMap,
@Nullable Object[] invokeArgs,
@Nullable final Map<? extends K, GridCacheDrInfo<V>> drPutMap,
@Nullable final Map<? extends K, GridCacheVersion> drRmvMap,
@@ -738,10 +757,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx,
this,
ctx.config().getWriteSynchronizationMode(),
- transformMap != null ? TRANSFORM : UPDATE,
- map != null ? map.keySet() : transformMap != null ? transformMap.keySet() : drPutMap != null ?
+ invokeMap != null ? TRANSFORM : UPDATE,
+ map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : drPutMap != null ?
drPutMap.keySet() : drRmvMap.keySet(),
- map != null ? map.values() : transformMap != null ? transformMap.values() : null,
+ map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
invokeArgs,
drPutMap != null ? drPutMap.values() : null,
drRmvMap != null ? drRmvMap.values() : null,
@@ -1213,12 +1232,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
int size = req.keys().size();
Map<K, V> putMap = null;
+
Map<K, EntryProcessor<K, V, ?>> entryProcessorMap = null;
+
Collection<K> rmvKeys = null;
+
UpdateBatchResult<K, V> updRes = new UpdateBatchResult<>();
+
List<GridDhtCacheEntry<K, V>> filtered = new ArrayList<>(size);
+
GridCacheOperation op = req.operation();
- Map<Object, Object> invokeResMap = op == TRANSFORM ? U.newHashMap(size) : null;
+
+ Map<K, EntryProcessorResult> invokeResMap =
+ op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : null;
int firstEntryIdx = 0;
@@ -2644,7 +2670,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private boolean readersOnly;
/** */
- private Map<Object, Object> invokeRes;
+ private Map<K, EntryProcessorResult> invokeRes;
/**
* @param entry Entry.
@@ -2679,14 +2705,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @param invokeRes Result for invoke operation.
*/
- private void invokeResult(Map<Object, Object> invokeRes) {
+ private void invokeResult(Map<K, EntryProcessorResult> invokeRes) {
this.invokeRes = invokeRes;
}
/**
* @return Result for invoke operation.
*/
- Map<Object, Object> invokeResults() {
+ Map<K, EntryProcessorResult> invokeResults() {
return invokeRes;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcb30d10/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
index 07e9785..5b3055a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -555,6 +555,34 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
+ @Override public <T> EntryProcessorResult<T> invoke(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ return dht.invoke(key, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws IgniteCheckedException {
+ return dht.invokeAll(keys, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) throws EntryProcessorException {
+ return dht.invokeAsync(key, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ return dht.invokeAllAsync(keys, entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
@Override public V remove(K key,
@Nullable GridCacheEntryEx<K, V> entry,
@Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter) throws IgniteCheckedException {