You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/04 09:20:41 UTC
[2/8] incubator-ignite git commit: # ignite-51-filters
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index bc317de..88200bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -285,7 +285,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public IgnitePredicate<Cache.Entry<K, V>> predicate() {
+ @Override public CacheEntryPredicate predicate() {
return delegate.predicate();
}
@@ -303,13 +303,8 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public CacheProjection<K, V> projection(@Nullable IgniteBiPredicate<K, V> p) {
- return delegate.projection(p);
- }
-
- /** {@inheritDoc} */
@Override public CacheProjection<K, V> projection(
- @Nullable IgnitePredicate<Cache.Entry<K, V>> filter) {
+ @Nullable CacheEntryPredicate filter) {
return delegate.projection(filter);
}
@@ -394,30 +389,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public void forEach(IgniteInClosure<Cache.Entry<K, V>> vis) {
- GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
- try {
- delegate.forEach(vis);
- }
- finally {
- gate.leave(prev);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean forAll(IgnitePredicate<Cache.Entry<K, V>> vis) {
- GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
- try {
- return delegate.forAll(vis);
- }
- finally {
- gate.leave(prev);
- }
- }
-
- /** {@inheritDoc} */
@Nullable @Override public V reload(K key)
throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
@@ -504,7 +475,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public V get(K key, @Nullable GridCacheEntryEx entry, boolean deserializePortable,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -660,7 +631,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Nullable @Override public V put(K key, V val, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ @Nullable @Override public V put(K key, V val, @Nullable CacheEntryPredicate[] filter)
throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
@@ -674,7 +645,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public V put(K key, V val, @Nullable GridCacheEntryEx entry, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -687,7 +658,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> putAsync(K key, V val,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -700,7 +671,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx entry, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -713,7 +684,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public boolean putx(K key, V val, @Nullable GridCacheEntryEx entry, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -725,7 +696,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public boolean putx(K key, V val, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ @Override public boolean putx(K key, V val, @Nullable CacheEntryPredicate[] filter)
throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
@@ -849,7 +820,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> putxAsync(K key, V val,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -865,7 +836,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
V val,
@Nullable GridCacheEntryEx entry,
long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -998,7 +969,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public void putAll(@Nullable Map<? extends K, ? extends V> m,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1011,7 +982,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> putAllAsync(@Nullable Map<? extends K, ? extends V> m,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1035,7 +1006,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public Set<K> keySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Override public Set<K> keySet(@Nullable CacheEntryPredicate... filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1107,7 +1078,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public Set<Cache.Entry<K, V>> entrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Override public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1119,7 +1090,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public Set<Cache.Entry<K, V>> primaryEntrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Override public Set<Cache.Entry<K, V>> primaryEntrySetx(CacheEntryPredicate... filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1390,7 +1361,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Nullable @Override public V remove(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ @Nullable @Override public V remove(K key, @Nullable CacheEntryPredicate[] filter)
throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
@@ -1404,7 +1375,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public V remove(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1416,7 +1387,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> removeAsync(K key, IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Override public IgniteInternalFuture<V> removeAsync(K key, CacheEntryPredicate[] filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1429,7 +1400,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1441,7 +1412,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public boolean removex(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ @Override public boolean removex(K key, @Nullable CacheEntryPredicate[] filter)
throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
@@ -1479,7 +1450,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public boolean removex(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1492,7 +1463,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> removexAsync(K key,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1505,7 +1476,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1590,7 +1561,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public void removeAll(@Nullable Collection<? extends K> keys,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1603,7 +1574,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync(@Nullable Collection<? extends K> keys,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1652,7 +1623,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public boolean lock(K key, long timeout, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ @Override public boolean lock(K key, long timeout, @Nullable CacheEntryPredicate[] filter)
throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
@@ -1666,7 +1637,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockAsync(K key, long timeout,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1679,7 +1650,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1692,7 +1663,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys, long timeout,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1704,7 +1675,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
}
/** {@inheritDoc} */
- @Override public void unlock(K key, IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Override public void unlock(K key, CacheEntryPredicate[] filter) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1717,7 +1688,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
/** {@inheritDoc} */
@Override public void unlockAll(@Nullable Collection<? extends K> keys,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index a60feb6..01ccddd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -129,6 +129,9 @@ public class GridCacheUtils {
/** Empty predicate array. */
private static final IgnitePredicate[] EMPTY_FILTER = new IgnitePredicate[0];
+ /** Empty predicate array. */
+ private static final CacheEntryPredicate[] EMPTY_FILTER0 = new CacheEntryPredicate[0];
+
/** Always false predicat array. */
private static final IgnitePredicate[] ALWAYS_FALSE = new IgnitePredicate[] {
new P1() {
@@ -138,6 +141,24 @@ public class GridCacheUtils {
}
};
+ /** */
+ private static final CacheEntryPredicate[] ALWAYS_FALSE0 = new CacheEntryPredicate[] {
+ new CacheEntryPredicateAdapter() {
+ @Override public boolean apply(GridCacheEntryEx e) {
+ return false;
+ }
+ }
+ };
+
+ /** */
+ private static final CacheEntryPredicate[] ALWAYS_TRUE0 = new CacheEntryPredicate[] {
+ new CacheEntryPredicateAdapter() {
+ @Override public boolean apply(GridCacheEntryEx e) {
+ return true;
+ }
+ }
+ };
+
/** Read filter. */
private static final IgnitePredicate READ_FILTER = new P1<Object>() {
@Override public boolean apply(Object e) {
@@ -749,6 +770,14 @@ public class GridCacheUtils {
}
/**
+ * @return Empty filter.
+ */
+ @SuppressWarnings({"unchecked"})
+ public static CacheEntryPredicate[] empty0() {
+ return EMPTY_FILTER0;
+ }
+
+ /**
* @return Always false filter.
*/
@SuppressWarnings({"unchecked"})
@@ -757,6 +786,13 @@ public class GridCacheUtils {
}
/**
+ * @return Always false filter.
+ */
+ public static CacheEntryPredicate[] alwaysFalse0() {
+ return ALWAYS_FALSE0;
+ }
+
+ /**
* @return Closure that converts tx entry to key.
*/
@SuppressWarnings({"unchecked"})
@@ -842,6 +878,28 @@ public class GridCacheUtils {
}
/**
+ * @param keyType Key type.
+ * @param valType Value type.
+ * @return Type filter.
+ */
+ public static CacheEntryPredicate typeFilter0(final Class<?> keyType, final Class<?> valType) {
+ return new CacheEntrySerializablePredicate(new CacheEntryPredicateAdapter() {
+ @Override public boolean apply(GridCacheEntryEx e) {
+ try {
+ Object val = CU.value(e.rawGetOrUnmarshal(true), e.context(), false);
+
+ return val != null &&
+ valType.isAssignableFrom(val.getClass()) &&
+ keyType.isAssignableFrom(e.key().value(e.context(), false).getClass());
+ }
+ catch (IgniteCheckedException err) {
+ throw new IgniteException(err);
+ }
+ }
+ });
+ }
+
+ /**
* @return Boolean reducer.
*/
public static IgniteReducer<Boolean, Boolean> boolReducer() {
@@ -1724,17 +1782,17 @@ public class GridCacheUtils {
* @throws ClassNotFoundException If class not found.
*/
@SuppressWarnings("unchecked")
- @Nullable public static <K, V> IgnitePredicate<Cache.Entry<K, V>>[] readEntryFilterArray(ObjectInput in)
+ @Nullable public static <K, V> CacheEntryPredicate[] readEntryFilterArray(ObjectInput in)
throws IOException, ClassNotFoundException {
int len = in.readInt();
- IgnitePredicate<Cache.Entry<K, V>>[] arr = null;
+ CacheEntryPredicate[] arr = null;
if (len > 0) {
- arr = new IgnitePredicate[len];
+ arr = new CacheEntryPredicate[len];
for (int i = 0; i < len; i++)
- arr[i] = (IgnitePredicate<Cache.Entry<K, V>>)in.readObject();
+ arr[i] = (CacheEntryPredicate)in.readObject();
}
return arr;
@@ -1745,7 +1803,23 @@ public class GridCacheUtils {
* @param n Node.
* @return Predicate that evaulates to {@code true} if entry is primary for node.
*/
- public static <K, V> IgnitePredicate<Cache.Entry<K, V>> cachePrimary(
+ public static CacheEntryPredicate cachePrimary(
+ final CacheAffinity aff,
+ final ClusterNode n
+ ) {
+ return new CacheEntryPredicateAdapter() {
+ @Override public boolean apply(GridCacheEntryEx e) {
+ return aff.isPrimary(n, e.key());
+ }
+ };
+ }
+
+ /**
+ * @param aff Affinity.
+ * @param n Node.
+ * @return Predicate that evaulates to {@code true} if entry is primary for node.
+ */
+ public static <K, V> IgnitePredicate<Cache.Entry<K, V>> cachePrimary0(
final CacheAffinity<K> aff,
final ClusterNode n
) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueCollection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueCollection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueCollection.java
index 9f9dcce..ddf4918 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueCollection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheValueCollection.java
@@ -118,9 +118,7 @@ public class GridCacheValueCollection<K, V> extends GridSerializableCollection<V
/** {@inheritDoc} */
@Override public void clear() {
- ctx.cache().clearLocally0(F.viewReadOnly(map.values(), F.<K, V>cacheEntry2Key(), filter), CU.<K, V>empty());
-
- map.clear();
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b77b8db..af23dd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -702,7 +702,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
* @param filter Filter.
* @return Entry set.
*/
- public Set<Entry<K, V>> entrySetx(IgnitePredicate<Entry<K, V>>... filter) {
+ public Set<Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1347,7 +1347,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
(CacheProjection<K1, V1>)(prj != null ? prj : delegate),
(GridCacheContext<K1, V1>)ctx,
null,
- null,
prj != null ? prj.flags() : null,
prj != null ? prj.subjectId() : null,
true,
@@ -1390,7 +1389,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
(prj != null ? prj : delegate),
ctx,
null,
- null,
res,
prj != null ? prj.subjectId() : null,
true,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 42c3fea..c9f72f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -82,7 +82,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
TransactionIsolation isolation,
boolean isInvalidate,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter
+ CacheEntryPredicate[] filter
) {
assert tx != null;
@@ -91,7 +91,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout,
- IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ CacheEntryPredicate... filter) {
IgniteTxLocalEx tx = ctx.tm().userTxx();
// Return value flag is true because we choose to bring values for explicit locks.
@@ -126,7 +126,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
boolean retval,
@Nullable TransactionIsolation isolation,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter);
+ CacheEntryPredicate[] filter);
/**
* @param key Key to remove.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index c787261..a867c2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -205,7 +205,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
@Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
boolean failFast,
KeyCacheObject key,
- IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ CacheEntryPredicate[] filter)
throws GridCacheFilterFailedException
{
assert false : "Method peek can only be called on user transaction: " + this;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index bb7d308..dfdda3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -803,7 +803,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/** {@inheritDoc} */
@Override public void unlockAll(Collection<? extends K> keys,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
assert false;
}
@@ -1059,7 +1059,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
while (partIt.hasNext()) {
GridDhtCacheEntry next = partIt.next();
- if (next.isInternal() || !next.visitable(CU.<K, V>empty()))
+ if (next.isInternal() || !next.visitable(CU.empty0()))
continue;
entry = next.wrapLazyValue();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 6fcc7f6..2d049cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -107,7 +107,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
private IgniteLogger log;
/** Filter. */
- private IgnitePredicate<Cache.Entry<K, V>>[] filter;
+ private CacheEntryPredicate[] filter;
/** Transaction. */
private GridDhtTxLocalAdapter tx;
@@ -161,7 +161,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
GridDhtTxLocalAdapter tx,
long threadId,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
super(cctx.kernalContext(), CU.boolReducer());
assert nearNodeId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 449702b..b394d49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -556,7 +556,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
boolean retval,
TransactionIsolation isolation,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
return lockAllAsyncInternal(
keys,
timeout,
@@ -591,7 +591,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
boolean retval,
TransactionIsolation isolation,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
if (keys == null || keys.isEmpty())
return new GridDhtFinishedFuture<>(ctx.kernalContext(), true);
@@ -665,7 +665,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
final GridCacheContext<K, V> cacheCtx,
final ClusterNode nearNode,
final GridNearLockRequest req,
- @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter0) {
+ @Nullable final CacheEntryPredicate[] filter0) {
final List<KeyCacheObject> keys = req.keys();
IgniteInternalFuture<Object> keyFut = null;
@@ -691,7 +691,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (exx != null)
return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx);
- IgnitePredicate<Cache.Entry<K, V>>[] filter = filter0;
+ CacheEntryPredicate[] filter = filter0;
// Set message into thread context.
GridDhtTxLocal tx = null;
@@ -710,7 +710,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
// Unmarshal filter first.
if (filter == null)
- filter = (IgnitePredicate[])req.filter();
+ filter = req.filter();
GridDhtLockFuture<K, V> fut = null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 060b02c..bd11c0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -553,7 +553,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
null,
cached,
null,
- CU.empty(),
+ CU.empty0(),
false,
-1L,
-1L,
@@ -613,7 +613,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
final boolean read,
final Set<KeyCacheObject> skipped,
final long accessTtl,
- @Nullable final IgnitePredicate<Cache.Entry<Object, Object>>[] filter) {
+ @Nullable final CacheEntryPredicate[] filter) {
if (log.isDebugEnabled())
log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" +
skipped + ']');
@@ -631,7 +631,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/*retval*/false,
isolation,
accessTtl,
- (IgnitePredicate[])CU.empty());
+ CU.empty0());
return new GridEmbeddedFuture<>(
fut,
@@ -648,7 +648,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/*retval*/false,
/*read*/read,
accessTtl,
- filter == null ? CU.empty() : filter,
+ filter == null ? CU.empty0() : filter,
/**computeInvoke*/false);
return ret;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e4212de..16cc223 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -308,26 +308,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public V put(K key, V val, @Nullable GridCacheEntryEx cached, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
return putAsync(key, val, cached, ttl, filter).get();
}
/** {@inheritDoc} */
@Override public boolean putx(K key, V val, @Nullable GridCacheEntryEx cached,
- long ttl, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ long ttl, @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
return putxAsync(key, val, cached, ttl, filter).get();
}
/** {@inheritDoc} */
@Override public boolean putx(K key, V val,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ CacheEntryPredicate[] filter) throws IgniteCheckedException {
return putxAsync(key, val, filter).get();
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx entry,
- long ttl, @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ long ttl, @Nullable CacheEntryPredicate... filter) {
A.notNull(key, "key");
return updateAllAsync0(F0.asMap(key, val),
@@ -344,7 +344,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx entry, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
A.notNull(key, "key");
return updateAllAsync0(F0.asMap(key, val),
@@ -367,7 +367,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public IgniteInternalFuture<V> putIfAbsentAsync(K key, V val) {
A.notNull(key, "key", val, "val");
- return putAsync(key, val, ctx.<K, V>noPeekArray());
+ return putAsync(key, val, ctx.noValArray());
}
/** {@inheritDoc} */
@@ -379,7 +379,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public IgniteInternalFuture<Boolean> putxIfAbsentAsync(K key, V val) {
A.notNull(key, "key", val, "val");
- return putxAsync(key, val, ctx.<K, V>noPeekArray());
+ return putxAsync(key, val, ctx.noValArray());
}
/** {@inheritDoc} */
@@ -391,7 +391,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public IgniteInternalFuture<V> replaceAsync(K key, V val) {
A.notNull(key, "key", val, "val");
- return putAsync(key, val, ctx.hasPeekArray());
+ return putAsync(key, val, ctx.hasValArray());
}
/** {@inheritDoc} */
@@ -403,7 +403,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public IgniteInternalFuture<Boolean> replacexAsync(K key, V val) {
A.notNull(key, "key", val, "val");
- return putxAsync(key, val, ctx.hasPeekArray());
+ return putxAsync(key, val, ctx.hasValArray());
}
/** {@inheritDoc} */
@@ -415,10 +415,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal");
- if (ctx.portableEnabled())
- oldVal = (V)ctx.marshalToPortable(oldVal);
- return putxAsync(key, newVal, ctx.equalsPeekArray(oldVal));
+ return putxAsync(key, newVal, ctx.equalsValArray(oldVal));
}
/** {@inheritDoc} */
@@ -436,18 +434,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) {
A.notNull(key, "key", val, "val");
- if (ctx.portableEnabled())
- val = (V)ctx.marshalToPortable(val);
-
- return removeAllAsync0(F.asList(key), null, null, true, true, ctx.equalsPeekArray(val));
+ return removeAllAsync0(F.asList(key), null, null, true, true, ctx.equalsValArray(val));
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) {
- if (ctx.portableEnabled())
- oldVal = (V)ctx.marshalToPortable(oldVal);
-
return updateAllAsync0(F.asMap(key, newVal),
null,
null,
@@ -456,18 +448,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
true,
null,
- ctx.equalsPeekArray(oldVal));
+ ctx.equalsValArray(oldVal));
}
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> m,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ CacheEntryPredicate[] filter) throws IgniteCheckedException {
putAllAsync(m, filter).get();
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
return updateAllAsync0(m,
null,
null,
@@ -502,14 +494,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public V remove(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
return removeAsync(key, entry, filter).get();
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
A.notNull(key, "key");
return removeAllAsync0(Collections.singletonList(key), null, entry, true, false, filter);
@@ -517,13 +509,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public void removeAll(Collection<? extends K> keys,
- IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ CacheEntryPredicate... filter) throws IgniteCheckedException {
removeAllAsync(keys, filter).get();
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync(Collection<? extends K> keys,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
A.notNull(keys, "keys");
return removeAllAsync0(keys, null, null, false, false, filter);
@@ -531,14 +523,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public boolean removex(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
return removexAsync(key, entry, filter).get();
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
A.notNull(key, "key");
return removeAllAsync0(Collections.singletonList(key), null, entry, false, false, filter);
@@ -553,10 +545,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public IgniteInternalFuture<Boolean> removeAsync(K key, V val) {
A.notNull(key, "key", val, "val");
- if (ctx.portableEnabled())
- val = (V)ctx.marshalToPortable(val);
-
- return removexAsync(key, ctx.equalsPeekArray(val));
+ return removexAsync(key, ctx.equalsValArray(val));
}
/** {@inheritDoc} */
@@ -565,7 +554,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> localRemoveAll(IgnitePredicate<Cache.Entry<K, V>> filter) {
+ @Override public IgniteInternalFuture<?> localRemoveAll(CacheEntryPredicate filter) {
return removeAllAsync(keySet(filter), null);
}
@@ -640,7 +629,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean retval,
@Nullable TransactionIsolation isolation,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
return new FinishedLockFuture(new UnsupportedOperationException("Locks are not supported for " +
"CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)"));
}
@@ -785,7 +774,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean retval,
final boolean rawRetval,
@Nullable GridCacheEntryEx cached,
- @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter
+ @Nullable final CacheEntryPredicate[] filter
) {
if (map != null && keyCheck)
validateCacheKeys(map.keySet());
@@ -812,7 +801,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
retval,
rawRetval,
prj != null ? prj.expiry() : null,
- (IgnitePredicate[])filter,
+ filter,
subjId,
taskNameHash);
@@ -842,7 +831,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable GridCacheEntryEx cached,
final boolean retval,
boolean rawRetval,
- @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter
+ @Nullable final CacheEntryPredicate[] filter
) {
final boolean statsEnabled = ctx.config().isStatisticsEnabled();
@@ -874,7 +863,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
retval,
rawRetval,
(filter != null && prj != null) ? prj.expiry() : null,
- (IgnitePredicate[])filter,
+ filter,
subjId,
taskNameHash);
@@ -2241,7 +2230,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res) {
try {
- return ctx.isAll(entry.wrapFilterLocked(), req.filter());
+ return ctx.isAll(entry, req.filter());
}
catch (IgniteCheckedException e) {
res.addFailedKey(entry.key(), e);
@@ -2465,7 +2454,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/*metrics*/true,
/*primary*/false,
/*check version*/!req.forceTransformBackups(),
- CU.empty(),
+ null,
replicate ? DR_BACKUP : DR_NONE,
ttl,
expireTime,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 567bf67..9756544 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -112,7 +112,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
private long topVer;
/** Optional filter. */
- private final IgnitePredicate<Cache.Entry<Object, Object>>[] filter;
+ private final CacheEntryPredicate[] filter;
/** Write synchronization mode. */
private final CacheWriteSynchronizationMode syncMode;
@@ -192,7 +192,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
final boolean retval,
final boolean rawRetval,
@Nullable ExpiryPolicy expiryPlc,
- final IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
+ final CacheEntryPredicate[] filter,
UUID subjId,
int taskNameHash
) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index b0d356c..1ab0fa0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -116,11 +116,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
private byte[] expiryPlcBytes;
/** Filter. */
- @GridDirectTransient
- private IgnitePredicate<Cache.Entry<Object, Object>>[] filter;
-
- /** Filter bytes. */
- private byte[][] filterBytes;
+ private CacheEntryPredicate[] filter;
/** Flag indicating whether request contains primary keys. */
private boolean hasPrimary;
@@ -173,7 +169,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
boolean forceTransformBackups,
@Nullable ExpiryPolicy expiryPlc,
@Nullable Object[] invokeArgs,
- @Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
+ @Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash
) {
@@ -282,7 +278,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/**
* @return Filter.
*/
- @Nullable public IgnitePredicate<Cache.Entry<Object, Object>>[] filter() {
+ @Nullable public CacheEntryPredicate[] filter() {
return filter;
}
@@ -511,7 +507,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
else
prepareMarshalCacheObjects(vals, cctx);
- filterBytes = marshalFilter(filter, ctx);
+ if (filter != null) {
+ for (CacheEntryPredicate p : filter)
+ p.prepareMarshal(cctx);
+ }
+
invokeArgsBytes = marshalInvokeArguments(invokeArgs, ctx);
if (expiryPlc != null)
@@ -531,7 +531,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
else
finishUnmarshalCacheObjects(vals, cctx, ldr);
- filter = unmarshalFilter(filterBytes, ctx, ldr);
+ if (filter != null) {
+ for (CacheEntryPredicate p : filter)
+ p.finishUnmarshal(cctx, ldr);
+ }
+
invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
if (expiryPlcBytes != null)
@@ -590,7 +594,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
writer.incrementState();
case 9:
- if (!writer.writeObjectArray("filterBytes", filterBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
@@ -626,7 +630,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
writer.incrementState();
case 15:
- if (!writer.writeByte("op", op != null ? (byte) op.ordinal() : -1))
+ if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
return false;
writer.incrementState();
@@ -644,7 +648,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
writer.incrementState();
case 18:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte) syncMode.ordinal() : -1))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
@@ -738,7 +742,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 9:
- filterBytes = reader.readObjectArray("filterBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+ filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index d0dc2e0..87c0591 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -378,7 +378,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean retval,
@Nullable TransactionIsolation isolation,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter
+ CacheEntryPredicate[] filter
) {
assert tx == null || tx instanceof GridNearTxLocal;
@@ -413,7 +413,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
/** {@inheritDoc} */
@Override public void unlockAll(Collection<? extends K> keys,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
if (keys.isEmpty())
return;
@@ -431,9 +431,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
GridDistributedCacheEntry entry = peekExx(cacheKey);
- Cache.Entry<K, V> Entry = entry == null ? entry(key) : entry.<K, V>wrapLazyValue();
-
- if (!ctx.isAll(Entry, filter))
+ if (entry == null || !ctx.isAll(entry, filter))
break; // While.
GridCacheMvccCandidate lock =
@@ -617,7 +615,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
final boolean txRead,
final long timeout,
final long accessTtl,
- @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter
+ @Nullable final CacheEntryPredicate[] filter
) {
assert keys != null;
@@ -690,7 +688,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
final boolean txRead,
final long timeout,
final long accessTtl,
- @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable final CacheEntryPredicate[] filter) {
int cnt = keys.size();
if (tx == null) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 441c2fd..3c1b2fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -95,7 +95,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
private IgniteLogger log;
/** Filter. */
- private IgnitePredicate<Cache.Entry<K, V>>[] filter;
+ private CacheEntryPredicate[] filter;
/** Transaction. */
@GridToStringExclude
@@ -139,7 +139,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
boolean retval,
long timeout,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
super(cctx.kernalContext(), CU.boolReducer());
assert keys != null;
@@ -609,7 +609,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
final ClusterNode node = map.node();
if (filter != null && filter.length != 0)
- req.filter((IgnitePredicate[])filter, cctx);
+ req.filter(filter, cctx);
if (node.isLocal())
lockLocally(mappedKeys, req.topologyVersion(), mappings);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 2358013..9aa407f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -227,7 +227,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
/*metrics*/true,
/*primary*/false,
/*check version*/true,
- CU.empty(),
+ CU.empty0(),
DR_NONE,
ttl,
expireTime,
@@ -323,7 +323,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
/*metrics*/true,
/*primary*/false,
/*check version*/!req.forceTransformBackups(),
- CU.empty(),
+ CU.empty0(),
DR_NONE,
ttl,
expireTime,
@@ -391,7 +391,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
V val,
@Nullable GridCacheEntryEx cached,
long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter
+ @Nullable CacheEntryPredicate[] filter
) throws IgniteCheckedException {
return dht.put(key, val, cached, ttl, filter);
}
@@ -401,14 +401,14 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
V val,
@Nullable GridCacheEntryEx cached,
long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
return dht.putx(key, val, cached, ttl, filter);
}
/** {@inheritDoc} */
@Override public boolean putx(K key,
V val,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ CacheEntryPredicate[] filter) throws IgniteCheckedException {
return dht.putx(key, val, filter);
}
@@ -418,7 +418,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
V val,
@Nullable GridCacheEntryEx entry,
long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
return dht.putAsync(key, val, entry, ttl, filter);
}
@@ -428,7 +428,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
V val,
@Nullable GridCacheEntryEx entry,
long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
return dht.putxAsync(key, val, entry, ttl, filter);
}
@@ -505,14 +505,14 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public void putAll(Map<? extends K, ? extends V> m, IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ @Override public void putAll(Map<? extends K, ? extends V> m, CacheEntryPredicate[] filter)
throws IgniteCheckedException {
dht.putAll(m, filter);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
return dht.putAllAsync(m, filter);
}
@@ -571,7 +571,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public V remove(K key,
@Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
return dht.remove(key, entry, filter);
}
@@ -579,26 +579,26 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<V> removeAsync(K key,
@Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
return dht.removeAsync(key, entry, filter);
}
/** {@inheritDoc} */
- @Override public void removeAll(Collection<? extends K> keys, IgnitePredicate<Cache.Entry<K, V>>... filter)
+ @Override public void removeAll(Collection<? extends K> keys, CacheEntryPredicate... filter)
throws IgniteCheckedException {
dht.removeAll(keys, filter);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync(Collection<? extends K> keys,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
return dht.removeAllAsync(keys, filter);
}
/** {@inheritDoc} */
@Override public boolean removex(K key,
@Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
return dht.removex(key, entry, filter);
}
@@ -606,7 +606,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Boolean> removexAsync(K key,
@Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
return dht.removexAsync(key, entry, filter);
}
@@ -636,7 +636,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> localRemoveAll(IgnitePredicate<Cache.Entry<K, V>> filter) {
+ @Override public IgniteInternalFuture<?> localRemoveAll(CacheEntryPredicate filter) {
return dht.localRemoveAll(filter);
}
@@ -659,13 +659,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
boolean retval,
@Nullable TransactionIsolation isolation,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
return dht.lockAllAsync(null, timeout, filter);
}
/** {@inheritDoc} */
@Override public void unlockAll(@Nullable Collection<? extends K> keys,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
dht.unlockAll(keys, filter);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 3f1ae46..4d79f18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -359,12 +359,12 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
* @return Near entries.
*/
public Set<Cache.Entry<K, V>> nearEntries() {
- return super.entrySet(CU.<K, V>empty());
+ return super.entrySet(CU.empty0());
}
/** {@inheritDoc} */
@Override public Set<Cache.Entry<K, V>> entrySet(
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Nullable CacheEntryPredicate... filter) {
return new EntrySet(super.entrySet(filter), dht().entrySet(filter));
}
@@ -374,41 +374,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
}
/** {@inheritDoc} */
- @Override public Set<Cache.Entry<K, V>> primaryEntrySet(
- @Nullable final IgnitePredicate<Cache.Entry<K, V>>... filter) {
- final long topVer = ctx.affinity().affinityTopologyVersion();
-
- Collection<Cache.Entry<K, V>> entries =
- F.flatCollections(
- F.viewReadOnly(
- dht().topology().currentLocalPartitions(),
- new C1<GridDhtLocalPartition, Collection<Cache.Entry<K, V>>>() {
- @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition p) {
- return F.viewReadOnly(
- p.entries(),
- new C1<GridDhtCacheEntry, Cache.Entry<K, V>>() {
- @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry e) {
- return e.wrapLazyValue();
- }
- },
- new P1<GridDhtCacheEntry>() {
- @Override public boolean apply(GridDhtCacheEntry e) {
- return !e.obsoleteOrDeleted();
- }
- });
- }
- },
- new P1<GridDhtLocalPartition>() {
- @Override public boolean apply(GridDhtLocalPartition p) {
- return p.primary(topVer);
- }
- }));
-
- return new GridCacheEntrySet<>(ctx, entries, filter);
- }
-
- /** {@inheritDoc} */
- @Override public Set<K> keySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Override public Set<K> keySet(@Nullable CacheEntryPredicate[] filter) {
return new GridCacheKeySet<>(ctx, entrySet(filter), null);
}
@@ -416,31 +382,31 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
* @param filter Entry filter.
* @return Keys for near cache only.
*/
- public Set<K> nearKeySet(@Nullable IgnitePredicate<Cache.Entry<K, V>> filter) {
+ public Set<K> nearKeySet(@Nullable CacheEntryPredicate filter) {
return super.keySet(filter);
}
/** {@inheritDoc} */
- @Override public Set<K> primaryKeySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Override public Set<K> primaryKeySet(@Nullable CacheEntryPredicate... filter) {
return new GridCacheKeySet<>(ctx, primaryEntrySet(filter), null);
}
/** {@inheritDoc} */
- @Override public Collection<V> values(IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Override public Collection<V> values(CacheEntryPredicate... filter) {
return new GridCacheValueCollection<>(ctx, entrySet(filter), ctx.vararg(F.<K, V>cacheHasPeekValue()));
}
/** {@inheritDoc} */
- @Override public Collection<V> primaryValues(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Override public Collection<V> primaryValues(@Nullable CacheEntryPredicate... filter) {
return new GridCacheValueCollection<>(
ctx,
entrySet(filter),
ctx.vararg(
- CU.<K, V>cachePrimary(ctx.grid().<K>affinity(ctx.name()), ctx.localNode())));
+ CU.<K, V>cachePrimary0(ctx.grid().<K>affinity(ctx.name()), ctx.localNode())));
}
/** {@inheritDoc} */
- @Override public boolean evict(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Override public boolean evict(K key, @Nullable CacheEntryPredicate[] filter) {
// Use unary 'and' to make sure that both sides execute.
return super.evict(key, filter) & dht().evict(key, filter);
}
@@ -450,13 +416,13 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
* @param filter Optional filter.
* @return {@code True} if evicted.
*/
- public boolean evictNearOnly(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ public boolean evictNearOnly(K key, @Nullable CacheEntryPredicate[] filter) {
return super.evict(key, filter);
}
/** {@inheritDoc} */
@Override public void evictAll(Collection<? extends K> keys,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
super.evictAll(keys, filter);
dht().evictAll(keys, filter);
@@ -464,7 +430,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
/** {@inheritDoc} */
@Override public boolean compact(K key,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
return super.compact(key, filter) | dht().compact(key, filter);
}
@@ -483,7 +449,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
*/
@Nullable public V peekNearOnly(K key) {
try {
- GridTuple<V> peek = peek0(true, key, SMART, CU.<K, V>empty());
+ GridTuple<V> peek = peek0(true, key, SMART, CU.empty0());
return peek != null ? peek.get() : null;
}
@@ -496,7 +462,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
}
/** {@inheritDoc} */
- @Override public V peek(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>> filter) {
+ @Override public V peek(K key, @Nullable CacheEntryPredicate filter) {
try {
GridTuple<V> res = peek0(false, key, SMART, filter);
@@ -535,13 +501,13 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
}
/** {@inheritDoc} */
- @Override public boolean clearLocally0(K key, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Override public boolean clearLocally0(K key, @Nullable CacheEntryPredicate[] filter) {
return super.clearLocally0(key, filter) | dht().clearLocally0(key, filter);
}
/** {@inheritDoc} */
@Override public void clearLocally0(Collection<? extends K> keys,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
super.clearLocally0(keys, filter);
dht().clearLocally0(keys, filter);
@@ -759,7 +725,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
currIter.remove();
try {
- GridNearCacheAdapter.this.remove(currEntry.getKey(), CU.<K, V>empty());
+ GridNearCacheAdapter.this.remove(currEntry.getKey(), CU.empty0());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index c855b47..67001aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -97,7 +97,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
private IgniteLogger log;
/** Filter. */
- private IgnitePredicate<Cache.Entry<K, V>>[] filter;
+ private CacheEntryPredicate[] filter;
/** Transaction. */
@GridToStringExclude
@@ -149,7 +149,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
boolean retval,
long timeout,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
super(cctx.kernalContext(), CU.boolReducer());
assert keys != null;
@@ -795,7 +795,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
try {
entry = cctx.near().entryExx(key, topVer);
- if (!cctx.isAll(entry.<K, V>wrapLazyValue(), filter)) {
+ if (!cctx.isAll(entry, filter)) {
if (log.isDebugEnabled())
log.debug("Entry being locked did not pass filter (will not lock): " + entry);
@@ -953,7 +953,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
final ClusterNode node = map.node();
if (filter != null && filter.length != 0)
- req.filter((IgnitePredicate[])filter, cctx);
+ req.filter(filter, cctx);
if (node.isLocal()) {
req.miniId(IgniteUuid.randomUuid());
@@ -1035,7 +1035,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
if (inTx() && implicitTx() && tx.onePhaseCommit()) {
boolean pass = res.filterResult(i);
- tx.entry(cctx.txKey(k)).filters(pass ? CU.empty() : CU.alwaysFalse());
+ tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0());
}
if (record) {
@@ -1390,7 +1390,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
if (inTx() && implicitTx() && tx.onePhaseCommit()) {
boolean pass = res.filterResult(i);
- tx.entry(cctx.txKey(k)).filters(pass ? CU.empty() : CU.alwaysFalse());
+ tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0());
}
entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index e521efb..d34f5a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -49,11 +49,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
private IgniteUuid miniId;
/** Filter. */
- private byte[][] filterBytes;
-
- /** Filter. */
@GridDirectTransient
- private IgnitePredicate<Cache.Entry<Object, Object>>[] filter;
+ private CacheEntryPredicate[] filter;
/** Implicit flag. */
private boolean implicitTx;
@@ -225,7 +222,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
/**
* @return Filter.
*/
- public IgnitePredicate<Cache.Entry<Object, Object>>[] filter() {
+ public CacheEntryPredicate[] filter() {
return filter;
}
@@ -234,7 +231,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
- public void filter(IgnitePredicate<Cache.Entry<Object, Object>>[] filter, GridCacheContext ctx)
+ public void filter(CacheEntryPredicate[] filter, GridCacheContext ctx)
throws IgniteCheckedException {
this.filter = filter;
}
@@ -307,16 +304,24 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (filterBytes == null)
- filterBytes = marshalFilter(filter, ctx);
+ if (filter != null) {
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ for (CacheEntryPredicate p : filter)
+ p.prepareMarshal(cctx);
+ }
}
/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (filter == null && filterBytes != null)
- filter = unmarshalFilter(filterBytes, ctx, ldr);
+ if (filter != null) {
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ for (CacheEntryPredicate p : filter)
+ p.finishUnmarshal(cctx, ldr);
+ }
}
/** {@inheritDoc} */
@@ -346,12 +351,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
writer.incrementState();
- case 24:
- if (!writer.writeObjectArray("filterBytes", filterBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
case 25:
if (!writer.writeBoolean("hasTransforms", hasTransforms))
return false;
@@ -438,14 +437,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 24:
- filterBytes = reader.readObjectArray("filterBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 25:
hasTransforms = reader.readBoolean("hasTransforms");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 9b15b40..ee80a00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -416,7 +416,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
boolean retval,
TransactionIsolation isolation,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter
+ CacheEntryPredicate[] filter
) {
GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx,
keys,
@@ -467,7 +467,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
}
/** {@inheritDoc} */
- @Override public void unlockAll(Collection<? extends K> keys, IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Override public void unlockAll(Collection<? extends K> keys, CacheEntryPredicate[] filter) {
if (keys.isEmpty())
return;
@@ -486,7 +486,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
GridDistributedCacheEntry entry = peekExx(cacheKey);
- if (entry == null || !ctx.isAll(entry.<K, V>wrapLazyValue(), filter))
+ if (entry == null || !ctx.isAll(entry, filter))
break; // While.
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index ef2899a..6b16aa4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1096,7 +1096,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/*retval*/false,
isolation,
accessTtl,
- CU.empty());
+ CU.empty0());
return new GridEmbeddedFuture<>(
fut,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index ccd4c8c..097590a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -292,7 +292,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
cached.unswap();
try {
- if (cached.peek(GLOBAL, CU.empty()) == null && cached.evictInternal(false, xidVer, null)) {
+ if (cached.peek(GLOBAL, CU.empty0()) == null && cached.evictInternal(false, xidVer, null)) {
evicted.add(entry.txKey());
return false;
@@ -346,7 +346,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
else {
cached.unswap();
- if (cached.peek(GLOBAL, CU.empty()) == null && cached.evictInternal(false, xidVer, null)) {
+ if (cached.peek(GLOBAL, CU.empty0()) == null && cached.evictInternal(false, xidVer, null)) {
cached.context().cache().removeIfObsolete(key.key());
evicted.add(key);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 4c59437..5eb4284 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -112,13 +112,13 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
TransactionIsolation isolation,
boolean invalidate,
long accessTtl,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
return lockAllAsync(keys, timeout, tx, filter);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
IgniteTxLocalEx tx = ctx.tm().localTx();
return lockAllAsync(ctx.cacheKeysView(keys), timeout, tx, filter);
@@ -134,7 +134,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
public IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys,
long timeout,
@Nullable IgniteTxLocalEx tx,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(ctx.kernalContext(), true);
@@ -186,7 +186,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public void unlockAll(Collection<? extends K> keys,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ CacheEntryPredicate[] filter) throws IgniteCheckedException {
long topVer = ctx.affinity().affinityTopologyVersion();
for (K key : keys) {