You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/03 16:53:31 UTC
[3/4] incubator-ignite git commit: # ignite-51-filters
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index bcf0f0c..eb31100 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -147,11 +147,11 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Grid cache. */
private GridCacheAdapter<K, V> cache;
- /** No-value filter array. */
- private IgnitePredicate<Cache.Entry<K, V>>[] noValArr;
+ /** No value filter array. */
+ private CacheEntryPredicate[] noValArr0;
- /** Has-value filter array. */
- private IgnitePredicate<Cache.Entry<K, V>>[] hasValArr;
+ /** Has value filter array. */
+ private CacheEntryPredicate[] hasValArr0;
/** No-peek-value filter array. */
private IgnitePredicate<Cache.Entry<Object, Object>>[] noPeekArr;
@@ -159,9 +159,6 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Has-peek-value filter array. */
private IgnitePredicate<Cache.Entry<K, V>>[] hasPeekArr;
- /** No-op filter array. */
- private IgnitePredicate<Cache.Entry<K, V>>[] trueArr;
-
/** Cached local rich node. */
private ClusterNode locNode;
@@ -290,11 +287,11 @@ public class GridCacheContext<K, V> implements Externalizable {
log = ctx.log(getClass());
- noValArr = new IgnitePredicate[]{F.cacheNoGetValue()};
- hasValArr = new IgnitePredicate[]{F.cacheHasGetValue()};
noPeekArr = new IgnitePredicate[]{F.cacheNoPeekValue()};
hasPeekArr = new IgnitePredicate[]{F.cacheHasPeekValue()};
- trueArr = new IgnitePredicate[]{F.alwaysTrue()};
+
+ noValArr0 = new CacheEntryPredicate[]{new CacheEntryPredicateNoValue()};
+ hasValArr0 = new CacheEntryPredicate[]{new CacheEntryPredicateHasValue()};
cacheObjCtx = new CacheObjectContext(ctx);
@@ -964,33 +961,34 @@ public class GridCacheContext<K, V> implements Externalizable {
public CacheJtaManagerAdapter<K, V> jta() {
return jtaMgr;
}
+
/**
* @return No get-value filter.
*/
- public IgnitePredicate<Cache.Entry<K, V>>[] noGetArray() {
- return noValArr;
+ @SuppressWarnings("unchecked")
+ public <K, V> IgnitePredicate<Cache.Entry<K, V>>[] noPeekArray() {
+ return (IgnitePredicate<Cache.Entry<K, V>>[])((IgnitePredicate[])noPeekArr);
}
/**
* @return Has get-value filer.
*/
- public IgnitePredicate<Cache.Entry<K, V>>[] hasGetArray() {
- return hasValArr;
+ public IgnitePredicate<Cache.Entry<K, V>>[] hasPeekArray() {
+ return hasPeekArr;
}
/**
- * @return No get-value filter.
+ * @return No value filter.
*/
- @SuppressWarnings("unchecked")
- public <K, V> IgnitePredicate<Cache.Entry<K, V>>[] noPeekArray() {
- return (IgnitePredicate<Cache.Entry<K, V>>[])((IgnitePredicate[])noPeekArr);
+ public CacheEntryPredicate[] noValArray() {
+ return noValArr0;
}
/**
- * @return Has get-value filer.
+ * @return Has value filter.
*/
- public IgnitePredicate<Cache.Entry<K, V>>[] hasPeekArray() {
- return hasPeekArr;
+ public CacheEntryPredicate[] hasValArray() {
+ return noValArr0;
}
/**
@@ -1005,17 +1003,11 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
- * @return Empty filter.
- */
- public IgnitePredicate<Cache.Entry<K, V>> truex() {
- return F.alwaysTrue();
- }
-
- /**
- * @return No-op array.
+ * @param val Value to check.
+ * @return Predicate array that checks for value.
*/
- public IgnitePredicate<Cache.Entry<K, V>>[] trueArray() {
- return trueArr;
+ public CacheEntryPredicate[] equalsValArray(V val) {
+ return new CacheEntryPredicate[]{new CacheEntryPredicateContainsValue(toCacheObject(val))};
}
/**
@@ -1098,6 +1090,29 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @param e Entry.
+ * @param p Predicates.
+ * @return {@code True} if predicates passed.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean isAll(GridCacheEntryEx e, CacheEntryPredicate[] p) throws IgniteCheckedException {
+ if (p == null || p.length == 0)
+ return true;
+
+ try {
+ for (CacheEntryPredicate p0 : p) {
+ if (p0 != null && !p0.apply(e))
+ return false;
+ }
+ }
+ catch (RuntimeException ex) {
+ throw U.cast(ex);
+ }
+
+ return true;
+ }
+
+ /**
* Forces LOCAL flag.
*
* @return Previously forced flags.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
index f04ef90..b365b45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
@@ -240,61 +240,16 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
private void onUndeploy0(final ClassLoader ldr, final GridCacheContext<K, V> cacheCtx) {
GridCacheAdapter<K, V> cache = cacheCtx.cache();
- Set<K> keySet = cache.keySet(cacheCtx.vararg(
- new P1<Cache.Entry<K, V>>() {
- @Override public boolean apply(Cache.Entry<K, V> e) {
- return cacheCtx.isNear() ? undeploy(e, cacheCtx.near()) || undeploy(e, cacheCtx.near().dht()) :
- undeploy(e, cacheCtx.cache());
- }
-
- /**
- * @param e Entry.
- * @param cache Cache.
- * @return {@code True} if entry should be undeployed.
- */
- private boolean undeploy(Cache.Entry<K, V> e, GridCacheAdapter<K, V> cache) {
- // TODO IGNITE-51.
- K k = e.getKey();
-
- GridCacheEntryEx entry = cache.peekEx(cacheCtx.toCacheKeyObject(e.getKey()));
-
- if (entry == null)
- return false;
-
- CacheObject v;
-
- try {
- v = entry.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty());
- }
- catch (GridCacheEntryRemovedException ignore) {
- return false;
- }
- catch (IgniteException ignore) {
- // Peek can throw runtime exception if unmarshalling failed.
- return true;
- }
+ Collection<KeyCacheObject> keys = new ArrayList<>();
- assert k != null : "Key cannot be null for cache entry: " + e;
+ for (GridCacheEntryEx e : cache.entries()) {
+ boolean undeploy = cacheCtx.isNear() ?
+ undeploy(ldr, e, cacheCtx.near()) || undeploy(ldr, e, cacheCtx.near().dht()) :
+ undeploy(ldr, e, cacheCtx.cache());
- ClassLoader keyLdr = U.detectObjectClassLoader(k);
- ClassLoader valLdr = U.detectObjectClassLoader(v);
-
- boolean res = F.eq(ldr, keyLdr) || F.eq(ldr, valLdr);
-
- if (log.isDebugEnabled())
- log.debug("Finished examining entry [entryCls=" + e.getClass() +
- ", key=" + k + ", keyCls=" + k.getClass() +
- ", valCls=" + (v != null ? v.getClass() : "null") +
- ", keyLdr=" + keyLdr + ", valLdr=" + valLdr + ", res=" + res + ']');
-
- return res;
- }
- }));
-
- Collection<K> keys = new ArrayList<>();
-
- for (K k : keySet)
- keys.add(k);
+ if (undeploy)
+ keys.add(e.key());
+ }
if (log.isDebugEnabled())
log.debug("Finished searching keys for undeploy [keysCnt=" + keys.size() + ']');
@@ -333,6 +288,52 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param ldr Class loader.
+ * @param e Entry.
+ * @param cache Cache.
+ * @return {@code True} if need to undeploy.
+ */
+ private boolean undeploy(ClassLoader ldr, GridCacheEntryEx e, GridCacheAdapter cache) {
+ KeyCacheObject key = e.key();
+
+ GridCacheEntryEx entry = cache.peekEx(key);
+
+ if (entry == null)
+ return false;
+
+ CacheObject v;
+
+ try {
+ v = entry.peek(GridCachePeekMode.GLOBAL, CU.empty0());
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ return false;
+ }
+ catch (IgniteException ignore) {
+ // Peek can throw runtime exception if unmarshalling failed.
+ return true;
+ }
+
+ assert key != null : "Key cannot be null for cache entry: " + e;
+
+ Object key0 = key.value(cache.context(), false);
+ Object val0 = CU.value(v, cache.context(), false);
+
+ ClassLoader keyLdr = U.detectObjectClassLoader(key0);
+ ClassLoader valLdr = U.detectObjectClassLoader(val0);
+
+ boolean res = F.eq(ldr, keyLdr) || F.eq(ldr, valLdr);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished examining entry [entryCls=" + e.getClass() +
+ ", key=" + key0 + ", keyCls=" + key0.getClass() +
+ ", valCls=" + (val0 != null ? val0.getClass() : "null") +
+ ", keyLdr=" + keyLdr + ", valLdr=" + valLdr + ", res=" + res + ']');
+
+ return res;
+ }
+
+ /**
* @param sndId Sender node ID.
* @param ldrId Loader ID.
* @param userVer User version.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 5196965..5fb1346 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -212,7 +212,7 @@ public interface GridCacheEntryEx {
* @throws IgniteCheckedException If swap could not be released.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- public <K, V> boolean invalidate(@Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ public <K, V> boolean invalidate(@Nullable CacheEntryPredicate[] filter)
throws GridCacheEntryRemovedException, IgniteCheckedException;
/**
@@ -223,7 +223,7 @@ public interface GridCacheEntryEx {
* @throws IgniteCheckedException If operation failed.
* @return {@code true} if entry was not being used and could be removed.
*/
- public <K, V> boolean compact(@Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ public <K, V> boolean compact(@Nullable CacheEntryPredicate[] filter)
throws GridCacheEntryRemovedException, IgniteCheckedException;
/**
@@ -234,7 +234,7 @@ public interface GridCacheEntryEx {
* @throws IgniteCheckedException In case of error.
*/
public <K, V> boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException;
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException;
/**
* Evicts entry when batch evict is performed. When called, does not write entry data to swap, but instead
@@ -360,7 +360,7 @@ public interface GridCacheEntryEx {
boolean evt,
boolean metrics,
long topVer,
- IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
+ CacheEntryPredicate[] filter,
GridDrType drType,
long drExpireTime,
@Nullable GridCacheVersion explicitVer,
@@ -396,7 +396,7 @@ public interface GridCacheEntryEx {
boolean evt,
boolean metrics,
long topVer,
- IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
+ CacheEntryPredicate[] filter,
GridDrType drType,
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
@@ -449,7 +449,7 @@ public interface GridCacheEntryEx {
boolean metrics,
boolean primary,
boolean checkVer,
- @Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
+ @Nullable CacheEntryPredicate[] filter,
GridDrType drType,
long conflictTtl,
long conflictExpireTime,
@@ -490,7 +490,7 @@ public interface GridCacheEntryEx {
@Nullable ExpiryPolicy expiryPlc,
boolean evt,
boolean metrics,
- @Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
+ @Nullable CacheEntryPredicate[] filter,
boolean intercept,
@Nullable UUID subjId,
String taskName
@@ -507,7 +507,7 @@ public interface GridCacheEntryEx {
* @return {@code True} if entry was not being used, passed the filter and could be removed.
*/
public <K, V> boolean clear(GridCacheVersion ver, boolean readers,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException;
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException;
/**
* This locks is called by transaction manager during prepare step
@@ -578,7 +578,7 @@ public interface GridCacheEntryEx {
* @return Value.
* @throws GridCacheEntryRemovedException If entry has been removed.
*/
- @Nullable public <K, V> CacheObject peek(GridCachePeekMode mode, IgnitePredicate<Cache.Entry<K, V>>... filter)
+ @Nullable public <K, V> CacheObject peek(GridCachePeekMode mode, CacheEntryPredicate... filter)
throws GridCacheEntryRemovedException;
/**
@@ -609,7 +609,7 @@ public interface GridCacheEntryEx {
* @throws GridCacheEntryRemovedException If entry has been removed.
*/
@Nullable public <K, V> CacheObject peek(Collection<GridCachePeekMode> modes,
- IgnitePredicate<Cache.Entry<K, V>>... filter) throws GridCacheEntryRemovedException;
+ CacheEntryPredicate... filter) throws GridCacheEntryRemovedException;
/**
* @param failFast Fail-fast flag.
@@ -624,7 +624,7 @@ public interface GridCacheEntryEx {
@SuppressWarnings({"RedundantTypeArguments"})
@Nullable public <K, V> GridTuple<CacheObject> peek0(boolean failFast,
GridCachePeekMode mode,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter,
+ @Nullable CacheEntryPredicate[] filter,
@Nullable IgniteInternalTx tx)
throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index e34cb66..5a40751 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -654,7 +654,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
GridCacheAdapter<K, V> cache,
GridCacheEntryEx entry,
GridCacheVersion obsoleteVer,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter,
+ @Nullable CacheEntryPredicate[] filter,
boolean explicit
) throws IgniteCheckedException {
assert cache != null;
@@ -855,7 +855,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
* @throws IgniteCheckedException In case of error.
*/
public boolean evict(@Nullable GridCacheEntryEx entry, @Nullable GridCacheVersion obsoleteVer,
- boolean explicit, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ boolean explicit, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
if (entry == null)
return true;
@@ -995,7 +995,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
* @param filter Filter.
* @throws GridCacheEntryRemovedException If entry got removed.
*/
- private void enqueue(GridCacheEntryEx entry, IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ private void enqueue(GridCacheEntryEx entry, CacheEntryPredicate[] filter)
throws GridCacheEntryRemovedException {
Node<EvictionInfo> node = entry.meta(meta);
@@ -1240,16 +1240,21 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
* @param info Eviction info.
* @return Version aware filter.
*/
- private IgnitePredicate<Cache.Entry<K, V>>[] versionFilter(final EvictionInfo info) {
+ private CacheEntryPredicate[] versionFilter(final EvictionInfo info) {
// If version has changed since we started the whole process
// then we should not evict entry.
- return cctx.vararg(new P1<Cache.Entry<K, V>>() {
- @Override public boolean apply(Cache.Entry<K, V> e) {
- GridCacheVersion ver = (GridCacheVersion)((CacheVersionedEntryImpl)e).version();
+ return new CacheEntryPredicate[]{new CacheEntryPredicateAdapter() {
+ @Override public boolean apply(GridCacheEntryEx e) {
+ try {
+ GridCacheVersion ver = e.version();
- return info.version().equals(ver) && F.isAll(info.filter());
+ return info.version().equals(ver) && F.isAll(info.filter());
+ }
+ catch (GridCacheEntryRemovedException err) {
+ return false;
+ }
}
- });
+ }};
}
/**
@@ -1465,7 +1470,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
private GridCacheVersion ver;
/** Filter to pass before entry will be evicted. */
- private IgnitePredicate<Cache.Entry<K, V>>[] filter;
+ private CacheEntryPredicate[] filter;
/**
* @param entry Entry.
@@ -1473,7 +1478,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
* @param filter Filter.
*/
EvictionInfo(GridCacheEntryEx entry, GridCacheVersion ver,
- IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ CacheEntryPredicate[] filter) {
assert entry != null;
assert ver != null;
@@ -1499,7 +1504,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V
/**
* @return Filter.
*/
- IgnitePredicate<Cache.Entry<K, V>>[] filter() {
+ CacheEntryPredicate[] filter() {
return filter;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java
index df1bc8e..ef6e2cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java
@@ -69,9 +69,7 @@ public class GridCacheKeySet<K, V> extends GridSerializableSet<K> {
/** {@inheritDoc} */
@Override public void clear() {
- ctx.cache().clearLocally0(F.viewReadOnly(map.values(), F.<K, V>cacheEntry2Key(), filter), CU.<K, V>empty());
-
- map.clear();
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 6dd4cae..518df69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -983,7 +983,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
boolean evt,
boolean metrics,
long topVer,
- IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
+ CacheEntryPredicate[] filter,
GridDrType drType,
long drExpireTime,
@Nullable GridCacheVersion explicitVer,
@@ -1133,7 +1133,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
boolean evt,
boolean metrics,
long topVer,
- IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
+ CacheEntryPredicate[] filter,
GridDrType drType,
@Nullable GridCacheVersion explicitVer,
@Nullable UUID subjId,
@@ -1324,7 +1324,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
@Nullable ExpiryPolicy expiryPlc,
boolean evt,
boolean metrics,
- @Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
+ @Nullable CacheEntryPredicate[] filter,
boolean intercept,
@Nullable UUID subjId,
String taskName
@@ -1391,16 +1391,16 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Apply metrics.
if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
// PutIfAbsent methods mustn't update hit/miss statistics
- if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noPeekArray())
+ if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noValArray())
cctx.cache().metrics0().onRead(old != null);
}
// Check filter inside of synchronization.
if (!F.isEmpty(filter)) {
- boolean pass = cctx.isAll(wrapFilterLocked(), filter);
+ boolean pass = cctx.isAll(this, filter);
if (!pass) {
- if (expiryPlc != null && !readThrough && filter != cctx.noPeekArray() && hasValueUnlocked())
+ if (expiryPlc != null && !readThrough && filter != cctx.noValArray() && hasValueUnlocked())
updateTtl(expiryPlc);
return new T3<>(false, retval ? CU.value(old, cctx, false) : null, null);
@@ -1617,7 +1617,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
boolean metrics,
boolean primary,
boolean verCheck,
- @Nullable IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
+ @Nullable CacheEntryPredicate[] filter,
GridDrType drType,
long explicitTtl,
long explicitExpireTime,
@@ -1834,16 +1834,16 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Apply metrics.
if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
// PutIfAbsent methods mustn't update hit/miss statistics
- if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noPeekArray())
+ if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noValArray())
cctx.cache().metrics0().onRead(oldVal != null);
}
// Check filter inside of synchronization.
if (!F.isEmptyOrNulls(filter)) {
- boolean pass = cctx.isAll(wrapFilterLocked(), filter);
+ boolean pass = cctx.isAll(this, filter);
if (!pass) {
- if (expiryPlc != null && !readThrough && hasValueUnlocked() && filter != cctx.noPeekArray())
+ if (expiryPlc != null && !readThrough && hasValueUnlocked() && filter != cctx.noValArray())
updateTtl(expiryPlc);
return new GridCacheUpdateAtomicResult(false,
@@ -2314,7 +2314,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
/** {@inheritDoc} */
@Override public <K, V> boolean clear(GridCacheVersion ver, boolean readers,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
cctx.denyOnFlag(READ);
boolean ret;
@@ -2559,7 +2559,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
/** {@inheritDoc} */
- @Override public <K, V> boolean invalidate(@Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ @Override public <K, V> boolean invalidate(@Nullable CacheEntryPredicate[] filter)
throws GridCacheEntryRemovedException, IgniteCheckedException {
if (F.isEmptyOrNulls(filter)) {
synchronized (this) {
@@ -2599,7 +2599,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
/** {@inheritDoc} */
- @Override public <K, V> boolean compact(@Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ @Override public <K, V> boolean compact(@Nullable CacheEntryPredicate[] filter)
throws GridCacheEntryRemovedException, IgniteCheckedException {
// For optimistic checking.
GridCacheVersion startVer;
@@ -2799,7 +2799,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
/** {@inheritDoc} */
@Nullable @Override public <K, V> CacheObject peek(GridCachePeekMode mode,
- IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ CacheEntryPredicate[] filter)
throws GridCacheEntryRemovedException {
try {
GridTuple<CacheObject> peek = peek0(false, mode, filter, cctx.tm().localTxx());
@@ -2851,7 +2851,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
/** {@inheritDoc} */
@Override public <K, V> CacheObject peek(Collection<GridCachePeekMode> modes,
- IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ CacheEntryPredicate[] filter)
throws GridCacheEntryRemovedException {
assert modes != null;
@@ -2887,7 +2887,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
*/
@SuppressWarnings({"RedundantTypeArguments"})
@Nullable @Override public <K, V> GridTuple<CacheObject> peek0(boolean failFast, GridCachePeekMode mode,
- IgnitePredicate<Cache.Entry<K, V>>[] filter, @Nullable IgniteInternalTx tx)
+ CacheEntryPredicate[] filter, @Nullable IgniteInternalTx tx)
throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException {
assert tx == null || tx.local();
@@ -2999,7 +2999,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
* @throws IgniteCheckedException If unexpected cache failure occurred.
*/
@Nullable private <K, V> GridTuple<CacheObject> peekTxThenGlobal(boolean failFast,
- IgnitePredicate<Cache.Entry<K, V>>[] filter,
+ CacheEntryPredicate[] filter,
IgniteInternalTx tx)
throws GridCacheFilterFailedException, GridCacheEntryRemovedException, IgniteCheckedException
{
@@ -3022,7 +3022,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
* @throws GridCacheFilterFailedException If filter failed.
*/
@Nullable private <K, V> GridTuple<CacheObject> peekTx(boolean failFast,
- IgnitePredicate<Cache.Entry<K, V>>[] filter,
+ CacheEntryPredicate[] filter,
@Nullable IgniteInternalTx tx) throws GridCacheFilterFailedException {
return tx == null ? null : tx.peek(cctx, failFast, key, filter);
}
@@ -3040,7 +3040,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
@SuppressWarnings({"RedundantTypeArguments"})
@Nullable private <K, V> GridTuple<CacheObject> peekGlobal(boolean failFast,
long topVer,
- IgnitePredicate<Cache.Entry<K, V>>[] filter,
+ CacheEntryPredicate[] filter,
@Nullable IgniteCacheExpiryPolicy expiryPlc
)
throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException {
@@ -3070,7 +3070,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
updateTtl(expiryPlc);
}
- if (!cctx.isAll(this.<K, V>wrap(), filter))
+ if (!cctx.isAll(this, filter))
return F.t(CU.<CacheObject>failed(failFast));
if (F.isEmptyOrNulls(filter) || ver.equals(version()))
@@ -3095,11 +3095,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
*/
@SuppressWarnings({"unchecked"})
@Nullable private <K, V> GridTuple<CacheObject> peekSwap(boolean failFast,
- IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ CacheEntryPredicate[] filter)
throws IgniteCheckedException, GridCacheFilterFailedException
{
- if (!cctx.isAll(this.<K, V>wrap(), filter))
- return F.t((CacheObject)CU.failed(failFast));
+ if (!cctx.isAll(this, filter))
+ return F.t(CU.failed(failFast));
synchronized (this) {
if (checkExpired())
@@ -3119,9 +3119,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
* @throws GridCacheFilterFailedException If filter failed.
*/
@SuppressWarnings({"unchecked"})
- @Nullable private <K, V> CacheObject peekDb(boolean failFast, IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ @Nullable private <K, V> CacheObject peekDb(boolean failFast, CacheEntryPredicate[] filter)
throws IgniteCheckedException, GridCacheFilterFailedException {
- if (!cctx.isAll(this.<K, V>wrap(), filter))
+ if (!cctx.isAll(this, filter))
return CU.failed(failFast);
synchronized (this) {
@@ -3812,7 +3812,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
/** {@inheritDoc} */
@Override public <K, V> boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
boolean marked = false;
try {
@@ -3853,7 +3853,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
v = ver;
}
- if (!cctx.isAll(/*version needed for sync evicts*/this.<K, V>wrapVersioned(), filter))
+ if (!cctx.isAll(/*version needed for sync evicts*/this, filter))
return false;
synchronized (this) {
@@ -3949,10 +3949,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
* @param filter Entry filter.
* @return {@code True} if entry is visitable.
*/
- public <K, V> boolean visitable(IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ public <K, V> boolean visitable(CacheEntryPredicate[] filter) {
try {
- if (obsoleteOrDeleted() || (filter != CU.<K, V>empty() &&
- !cctx.isAll(this.<K, V> wrapLazyValue(), filter)))
+ if (obsoleteOrDeleted() || (filter != CU.empty0() &&
+ !cctx.isAll(this, filter)))
return false;
}
catch (IgniteCheckedException e) {
@@ -4416,7 +4416,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
return null;
try {
- return CU.value(e.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty()), cctx, false);
+ return CU.value(e.peek(GridCachePeekMode.GLOBAL, CU.empty0()), cctx, false);
}
catch (GridCacheEntryRemovedException ignored) {
// No-op.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 2810e20..0e72011 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -346,10 +346,16 @@ public abstract class GridCacheMessage implements Message {
for (IgniteTxEntry e : txEntries) {
e.marshal(ctx, transferExpiry);
+ if (e.filters() != null) {
+ GridCacheContext cctx = ctx.cacheContext(e.cacheId());
+
+ for (CacheEntryPredicate p : e.filters())
+ p.prepareMarshal(cctx);
+ }
+
if (ctx.deploymentEnabled()) {
prepareObject(e.key(), ctx);
prepareObject(e.value(), ctx);
- prepareFilter(e.filters(), ctx);
}
}
}
@@ -376,8 +382,16 @@ public abstract class GridCacheMessage implements Message {
assert ctx != null;
if (txEntries != null) {
- for (IgniteTxEntry e : txEntries)
+ for (IgniteTxEntry e : txEntries) {
e.unmarshal(ctx, near, ldr);
+
+ if (e.filters() != null) {
+ GridCacheContext cctx = ctx.cacheContext(e.cacheId());
+
+ for (CacheEntryPredicate p : e.filters())
+ p.finishUnmarshal(cctx, ldr);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
index fe72bef..08a3fb1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
@@ -49,7 +49,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
*
* @return Filter on which this projection is based on.
*/
- @Nullable public IgnitePredicate<Cache.Entry<K, V>> predicate();
+ @Nullable public CacheEntryPredicate predicate();
/**
* Internal method that is called from {@link GridCacheEntryImpl}.
@@ -63,7 +63,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
* @throws IgniteCheckedException If failed.
*/
@Nullable public V put(K key, V val, @Nullable GridCacheEntryEx entry, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException;
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException;
/**
* Internal method that is called from {@link GridCacheEntryImpl}.
@@ -76,7 +76,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
* @return Put operation future.
*/
public IgniteInternalFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx entry, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter);
+ @Nullable CacheEntryPredicate... filter);
/**
* Internal method that is called from {@link GridCacheEntryImpl}.
@@ -90,7 +90,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
* @throws IgniteCheckedException If failed.
*/
public boolean putx(K key, V val, @Nullable GridCacheEntryEx entry, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException;
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException;
/**
* Internal method that is called from {@link GridCacheEntryImpl}.
@@ -103,7 +103,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
* @return Putx operation future.
*/
public IgniteInternalFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx entry, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter);
+ @Nullable CacheEntryPredicate... filter);
/**
* Store DR data.
@@ -134,7 +134,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
* @throws IgniteCheckedException If failed.
*/
@Nullable public V remove(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException;
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException;
/**
* Internal method that is called from {@link GridCacheEntryImpl}.
@@ -145,7 +145,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
* @return Put operation future.
*/
public IgniteInternalFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter);
+ @Nullable CacheEntryPredicate... filter);
/**
* Removes DR data.
@@ -176,7 +176,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
* @throws IgniteCheckedException If failed.
*/
public boolean removex(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException;
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException;
/**
* Internal method that is called from {@link GridCacheEntryImpl}.
@@ -187,7 +187,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
* @return Putx operation future.
*/
public IgniteInternalFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter);
+ @Nullable CacheEntryPredicate... filter);
/**
* Asynchronously stores given key-value pair in cache only if only if the previous value is equal to the
@@ -293,7 +293,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
* @throws IgniteCheckedException If failed.
*/
@Nullable public V get(K key, @Nullable GridCacheEntryEx entry, boolean deserializePortable,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException;
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException;
/**
* Gets value from cache. Will go to primary node even if this is a backup.
@@ -372,7 +372,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
* @param filter Filter.
* @return Entry set.
*/
- public Set<Cache.Entry<K, V>> entrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter);
+ public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter);
/**
* Gets set of primary entries containing internal entries.
@@ -380,7 +380,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
* @param filter Optional filter.
* @return Primary entry set.
*/
- public Set<Cache.Entry<K, V>> primaryEntrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter);
+ public Set<Cache.Entry<K, V>> primaryEntrySetx(CacheEntryPredicate... filter);
/**
* @return {@link ExpiryPolicy} associated with this projection.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6febd89a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
index 3e8930b..7a27cff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
@@ -51,21 +51,9 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** */
private static final long serialVersionUID = 0L;
- /** Key-value filter taking null values. */
+ /** Entry filter. */
@GridToStringExclude
- private KeyValueFilter<K, V> withNullKvFilter;
-
- /** Key-value filter not allowing null values. */
- @GridToStringExclude
- private KeyValueFilter<K, V> noNullKvFilter;
-
- /** Entry filter built with {@link #withNullKvFilter}. */
- @GridToStringExclude
- private FullFilter<K, V> withNullEntryFilter;
-
- /** Entry filter built with {@link #noNullKvFilter}. */
- @GridToStringExclude
- private FullFilter<K, V> noNullEntryFilter;
+ private CacheEntryPredicate filter;
/** Base cache. */
private GridCacheAdapter<K, V> cache;
@@ -99,7 +87,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/**
* @param parent Parent projection.
* @param cctx Cache context.
- * @param kvFilter Key-value filter.
* @param entryFilter Entry filter.
* @param flags Flags for new projection
*/
@@ -107,8 +94,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
public GridCacheProjectionImpl(
CacheProjection<K, V> parent,
GridCacheContext<K, V> cctx,
- @Nullable IgniteBiPredicate<K, V> kvFilter,
- @Nullable IgnitePredicate<? super Cache.Entry<K, V>> entryFilter,
+ @Nullable CacheEntryPredicate entryFilter,
@Nullable Set<CacheFlag> flags,
@Nullable UUID subjId,
boolean keepPortable,
@@ -127,13 +113,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
this.flags = Collections.unmodifiableSet(f);
- withNullKvFilter = new KeyValueFilter<>(kvFilter, false);
-
- noNullKvFilter = new KeyValueFilter<>(kvFilter, true);
-
- withNullEntryFilter = new FullFilter<>(withNullKvFilter, entryFilter);
-
- noNullEntryFilter = new FullFilter<>(noNullKvFilter, entryFilter);
+ this.filter = entryFilter;
this.subjId = subjId;
@@ -156,22 +136,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
/**
- * @param noNulls Flag indicating whether filter should accept nulls or not.
- * @return Entry filter for the flag.
- */
- IgnitePredicate<Cache.Entry<K, V>> entryFilter(boolean noNulls) {
- return noNulls ? noNullEntryFilter : withNullEntryFilter;
- }
-
- /**
- * @param noNulls Flag indicating whether filter should accept nulls or not.
- * @return Key-value filter for the flag.
- */
- IgniteBiPredicate<K, V> kvFilter(boolean noNulls) {
- return noNulls ? noNullKvFilter : withNullKvFilter;
- }
-
- /**
* @return Keep portable flag.
*/
public boolean isKeepPortable() {
@@ -189,102 +153,43 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
* {@code Ands} passed in filter with projection filter.
*
* @param filter filter to {@code and}.
- * @param noNulls Flag indicating whether filter should accept nulls or not.
* @return {@code Anded} filter array.
*/
- IgnitePredicate<Cache.Entry<K, V>> and(
- IgnitePredicate<Cache.Entry<K, V>> filter, boolean noNulls) {
- IgnitePredicate<Cache.Entry<K, V>> entryFilter = entryFilter(noNulls);
+ CacheEntryPredicate and(CacheEntryPredicate filter) {
+ CacheEntryPredicate entryFilter = this.filter;
if (filter == null)
return entryFilter;
- return F0.and(entryFilter, filter);
- }
-
- /**
- * {@code Ands} passed in filter with projection filter.
- *
- * @param filter filter to {@code and}.
- * @param noNulls Flag indicating whether filter should accept nulls or not.
- * @return {@code Anded} filter array.
- */
- @SuppressWarnings({"unchecked"})
- IgniteBiPredicate<K, V> and(final IgniteBiPredicate<K, V> filter, boolean noNulls) {
- final IgniteBiPredicate<K, V> kvFilter = kvFilter(noNulls);
-
- if (filter == null)
- return kvFilter;
-
- return new P2<K, V>() {
- @Override public boolean apply(K k, V v) {
- return F.isAll2(k, v, kvFilter) && filter.apply(k, v);
- }
- };
- }
-
- /**
- * {@code Ands} passed in filter with projection filter.
- *
- * @param filter filter to {@code and}.
- * @param noNulls Flag indicating whether filter should accept nulls or not.
- * @return {@code Anded} filter array.
- */
- @SuppressWarnings({"unchecked"})
- IgniteBiPredicate<K, V> and(final IgniteBiPredicate<K, V>[] filter, boolean noNulls) {
- final IgniteBiPredicate<K, V> kvFilter = kvFilter(noNulls);
-
- if (filter == null)
- return kvFilter;
-
- return new P2<K, V>() {
- @Override public boolean apply(K k, V v) {
- return F.isAll2(k, v, kvFilter) && F.isAll2(k, v, filter);
- }
- };
+ return F0.and0(entryFilter, filter);
}
/**
* {@code Ands} two passed in filters.
*
* @param f1 First filter.
- * @param nonNulls Flag indicating whether nulls should be included.
* @return {@code Anded} filter.
*/
- private IgnitePredicate<Cache.Entry<K, V>> and(@Nullable final IgnitePredicate<Cache.Entry<K, V>>[] f1,
- boolean nonNulls) {
- IgnitePredicate<Cache.Entry<K, V>> entryFilter = entryFilter(nonNulls);
+ private CacheEntryPredicate and(@Nullable final CacheEntryPredicate[] f1) {
+ CacheEntryPredicate entryFilter = filter;
if (F.isEmpty(f1))
return entryFilter;
- return F0.and(entryFilter, f1);
- }
-
- /**
- * @param e Entry to verify.
- * @param noNulls Flag indicating whether filter should accept nulls or not.
- * @return {@code True} if filter passed.
- */
- boolean isAll(Cache.Entry<K, V> e, boolean noNulls) {
- CacheFlag[] f = cctx.forceLocalRead();
-
- try {
- return F.isAll(e, entryFilter(noNulls));
- }
- finally {
- cctx.forceFlags(f);
- }
+ return F0.and0(entryFilter, f1);
}
/**
* @param k Key.
* @param v Value.
- * @param noNulls Flag indicating whether filter should accept nulls or not.
* @return {@code True} if filter passed.
*/
- boolean isAll(K k, V v, boolean noNulls) {
- IgniteBiPredicate<K, V> p = kvFilter(noNulls);
+ boolean isAll(K k, V v) {
+ if (k == null || v == null)
+ return false;
+
+ // TODO IGNITE-51.
+ IgniteBiPredicate<K, V> p = null;
if (p != null) {
CacheFlag[] f = cctx.forceLocalRead();
@@ -303,10 +208,9 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/**
* @param map Map.
- * @param noNulls Flag indicating whether filter should accept nulls or not.
* @return {@code True} if filter passed.
*/
- Map<? extends K, ? extends V> isAll(Map<? extends K, ? extends V> map, boolean noNulls) {
+ Map<? extends K, ? extends V> isAll(Map<? extends K, ? extends V> map) {
if (F.isEmpty(map))
return Collections.<K, V>emptyMap();
@@ -317,7 +221,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
K k = e.getKey();
V v = e.getValue();
- if (!isAll(k, v, noNulls)) {
+ if (!isAll(k, v)) {
failed = true;
break;
@@ -333,44 +237,13 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
K k = e.getKey();
V v = e.getValue();
- if (isAll(k, v, noNulls))
+ if (isAll(k, v))
cp.put(k, v);
}
return cp;
}
- /**
- * Entry projection-filter-aware visitor.
- *
- * @param vis Visitor.
- * @return Projection-filter-aware visitor.
- */
- private IgniteInClosure<Cache.Entry<K, V>> visitor(final IgniteInClosure<Cache.Entry<K, V>> vis) {
- return new CI1<Cache.Entry<K, V>>() {
- @Override public void apply(Cache.Entry<K, V> e) {
- if (isAll(e, true))
- vis.apply(e);
- }
- };
- }
-
- /**
- * Entry projection-filter-aware visitor.
- *
- * @param vis Visitor.
- * @return Projection-filter-aware visitor.
- */
- private IgnitePredicate<Cache.Entry<K, V>> visitor(final IgnitePredicate<Cache.Entry<K, V>> vis) {
- return new P1<Cache.Entry<K, V>>() {
- @Override public boolean apply(Cache.Entry<K, V> e) {
- // If projection filter didn't pass, go to the next element.
- // Otherwise, delegate to the visitor.
- return !isAll(e, true) || vis.apply(e);
- }
- };
- }
-
/** {@inheritDoc} */
@SuppressWarnings( {"unchecked", "RedundantCast"})
@Override public <K1, V1> GridCache<K1, V1> cache() {
@@ -388,8 +261,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
cctx,
- noNullKvFilter.kvFilter,
- noNullEntryFilter.entryFilter,
+ filter,
flags,
subjId,
keepPortable,
@@ -427,8 +299,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>(
(CacheProjection<K1, V1>)this,
(GridCacheContext<K1, V1>)cctx,
- CU.<K1, V1>typeFilter(keyType, valType),
- (IgnitePredicate<Cache.Entry>)noNullEntryFilter.entryFilter,
+ CU.typeFilter0(keyType, valType),
flags,
subjId,
keepPortable,
@@ -438,44 +309,13 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
/** {@inheritDoc} */
- @Override public CacheProjection<K, V> projection(IgniteBiPredicate<K, V> p) {
- if (p == null)
- return new GridCacheProxyImpl<>(cctx, this, this);
-
- IgniteBiPredicate<K, V> kvFilter = p;
-
- if (noNullKvFilter.kvFilter != null)
- kvFilter = and(p, true);
-
- if (cctx.deploymentEnabled()) {
- try {
- cctx.deploy().registerClasses(p);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
-
- GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
- cctx,
- kvFilter,
- noNullEntryFilter.entryFilter,
- flags,
- subjId,
- keepPortable,
- expiryPlc);
-
- return new GridCacheProxyImpl<>(cctx, prj, prj);
- }
-
- /** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
- @Override public CacheProjection<K, V> projection(IgnitePredicate<Cache.Entry<K, V>> filter) {
+ @Override public CacheProjection<K, V> projection(CacheEntryPredicate filter) {
if (filter == null)
return new GridCacheProxyImpl<>(cctx, this, this);
- if (noNullEntryFilter.entryFilter != null)
- filter = and(filter, true);
+ if (this.filter != null)
+ filter = and(filter);
if (cctx.deploymentEnabled()) {
try {
@@ -488,7 +328,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
cctx,
- noNullKvFilter.kvFilter,
filter,
flags,
subjId,
@@ -513,8 +352,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
cctx,
- noNullKvFilter.kvFilter,
- noNullEntryFilter.entryFilter,
+ filter,
res,
subjId,
keepPortable,
@@ -537,8 +375,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
cctx,
- noNullKvFilter.kvFilter,
- noNullEntryFilter.entryFilter,
+ filter,
res,
subjId,
keepPortable,
@@ -553,8 +390,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>(
(CacheProjection<K1, V1>)this,
(GridCacheContext<K1, V1>)cctx,
- (IgniteBiPredicate<K1, V1>)noNullKvFilter.kvFilter,
- (IgnitePredicate<Cache.Entry>)noNullEntryFilter.entryFilter,
+ filter,
flags,
subjId,
true,
@@ -602,7 +438,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public int nearSize() {
return cctx.config().getCacheMode() == PARTITIONED && isNearEnabled(cctx) ?
- cctx.near().nearKeySet(entryFilter(true)).size() : 0;
+ cctx.near().nearKeySet(filter).size() : 0;
}
/** {@inheritDoc} */
@@ -641,16 +477,6 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
/** {@inheritDoc} */
- @Override public void forEach(IgniteInClosure<Cache.Entry<K, V>> vis) {
- cache.forEach(visitor(vis));
- }
-
- /** {@inheritDoc} */
- @Override public boolean forAll(IgnitePredicate<Cache.Entry<K, V>> vis) {
- return cache.forAll(visitor(vis));
- }
-
- /** {@inheritDoc} */
@Override public V reload(K key) throws IgniteCheckedException {
return cache.reload(key);
}
@@ -687,8 +513,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public V get(K key, @Nullable GridCacheEntryEx entry, boolean deserializePortable,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
- return cache.get(key, entry, deserializePortable, and(filter, false));
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
+ return cache.get(key, entry, deserializePortable, and(filter));
}
/** {@inheritDoc} */
@@ -752,44 +578,44 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
/** {@inheritDoc} */
- @Override public V put(K key, V val, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter)
+ @Override public V put(K key, V val, @Nullable CacheEntryPredicate[] filter)
throws IgniteCheckedException {
return putAsync(key, val, filter).get();
}
/** {@inheritDoc} */
@Override public V put(K key, V val, @Nullable GridCacheEntryEx entry, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
return cache.put(key, val, entry, ttl, filter);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> putAsync(K key, V val,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
return putAsync(key, val, null, -1, filter);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> putAsync(K key, V val, @Nullable GridCacheEntryEx entry, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
A.notNull(key, "key", val, "val");
// Check k-v predicate first.
- if (!isAll(key, val, true))
+ if (!isAll(key, val))
return new GridFinishedFuture<>(cctx.kernalContext());
- return cache.putAsync(key, val, entry, ttl, and(filter, false));
+ return cache.putAsync(key, val, entry, ttl, and(filter));
}
/** {@inheritDoc} */
@Override public boolean putx(K key, V val, @Nullable GridCacheEntryEx entry, long ttl,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
return cache.putx(key, val, entry, ttl, filter);
}
/** {@inheritDoc} */
@Override public boolean putx(K key, V val,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
return putxAsync(key, val, filter).get();
}
@@ -847,20 +673,20 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> putxAsync(K key, V val,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
return putxAsync(key, val, null, -1, filter);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> putxAsync(K key, V val, @Nullable GridCacheEntryEx entry,
- long ttl, @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ long ttl, @Nullable CacheEntryPredicate[] filter) {
A.notNull(key, "key", val, "val");
// Check k-v predicate first.
- if (!isAll(key, val, true))
+ if (!isAll(key, val))
return new GridFinishedFuture<>(cctx.kernalContext(), false);
- return cache.putxAsync(key, val, entry, ttl, and(filter, false));
+ return cache.putxAsync(key, val, entry, ttl, and(filter));
}
/** {@inheritDoc} */
@@ -870,7 +696,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> putIfAbsentAsync(K key, V val) {
- return putAsync(key, val, cctx.<K, V>noPeekArray());
+ return putAsync(key, val, cctx.noValArray());
}
/** {@inheritDoc} */
@@ -880,7 +706,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> putxIfAbsentAsync(K key, V val) {
- return putxAsync(key, val, cctx.<K, V>noPeekArray());
+ return putxAsync(key, val, cctx.noValArray());
}
/** {@inheritDoc} */
@@ -890,7 +716,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> replaceAsync(K key, V val) {
- return putAsync(key, val, cctx.hasPeekArray());
+ return putAsync(key, val, cctx.hasValArray());
}
/** {@inheritDoc} */
@@ -900,7 +726,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> replacexAsync(K key, V val) {
- return putxAsync(key, val, cctx.hasPeekArray());
+ return putxAsync(key, val, cctx.hasValArray());
}
/** {@inheritDoc} */
@@ -910,66 +736,66 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
- IgnitePredicate<Cache.Entry<K, V>> fltr = and(F.<K, V>cacheContainsPeek(oldVal), false);
+ CacheEntryPredicate fltr = and(cctx.equalsValArray(oldVal));
return cache.putxAsync(key, newVal, fltr);
}
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> m,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
putAllAsync(m, filter).get();
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> putAllAsync(Map<? extends K, ? extends V> m,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
- m = isAll(m, true);
+ @Nullable CacheEntryPredicate[] filter) {
+ m = isAll(m);
if (F.isEmpty(m))
return new GridFinishedFuture<>(cctx.kernalContext());
- return cache.putAllAsync(m, and(filter, false));
+ return cache.putAllAsync(m, and(filter));
}
/** {@inheritDoc} */
@Override public Set<K> keySet() {
- return cache.keySet(entryFilter(true));
+ return cache.keySet(filter);
}
/** {@inheritDoc} */
- @Override public Set<K> keySet(@Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
+ @Override public Set<K> keySet(@Nullable CacheEntryPredicate... filter) {
return cache.keySet(filter);
}
/** {@inheritDoc} */
@Override public Set<K> primaryKeySet() {
- return cache.primaryKeySet(entryFilter(true));
+ return cache.primaryKeySet(filter);
}
/** {@inheritDoc} */
@Override public Collection<V> values() {
- return cache.values(entryFilter(true));
+ return cache.values(filter);
}
/** {@inheritDoc} */
@Override public Collection<V> primaryValues() {
- return cache.primaryValues(entryFilter(true));
+ return cache.primaryValues(filter);
}
/** {@inheritDoc} */
@Override public Set<Cache.Entry<K, V>> entrySet() {
- return cache.entrySet(entryFilter(true));
+ return cache.entrySet(filter);
}
/** {@inheritDoc} */
- @Override public Set<Cache.Entry<K, V>> entrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter) {
- return cache.entrySetx(F.and(filter, entryFilter(true)));
+ @Override public Set<Cache.Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {
+ return cache.entrySetx(F0.and0(filter, this.filter));
}
/** {@inheritDoc} */
- @Override public Set<Cache.Entry<K, V>> primaryEntrySetx(IgnitePredicate<Cache.Entry<K, V>>... filter) {
- return cache.primaryEntrySetx(F.and(filter, entryFilter(true)));
+ @Override public Set<Cache.Entry<K, V>> primaryEntrySetx(CacheEntryPredicate... filter) {
+ return cache.primaryEntrySetx(F0.and0(filter, this.filter));
}
/** {@inheritDoc} */
@@ -980,7 +806,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public Set<Cache.Entry<K, V>> primaryEntrySet() {
- return cache.primaryEntrySet(entryFilter(true));
+ return cache.primaryEntrySet(filter);
}
/** {@inheritDoc} */
@@ -1000,8 +826,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
}
/** {@inheritDoc} */
- @Override public IgnitePredicate<Cache.Entry<K, V>> predicate() {
- return withNullEntryFilter.hasFilter() ? withNullEntryFilter : null;
+ @Override public CacheEntryPredicate predicate() {
+ return filter;
}
/** {@inheritDoc} */
@@ -1016,7 +842,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public V peek(K key) {
- return cache.peek(key, entryFilter(true));
+ return cache.peek(key, filter);
}
/** {@inheritDoc} */
@@ -1037,14 +863,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
@Override public V peek(K key, @Nullable Collection<GridCachePeekMode> modes) throws IgniteCheckedException {
V val = cache.peek(key, modes);
- return isAll(key, val, true) ? val : null;
+ return isAll(key, val) ? val : null;
}
/** {@inheritDoc} */
@Nullable @Override public Cache.Entry<K, V> entry(K key) {
V val = peek(key);
- if (!isAll(key, val, false))
+ if (!isAll(key, val))
return null;
return cache.entry(key);
@@ -1053,7 +879,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public boolean evict(K key) {
if (predicate() != null)
- return cache.evict(key, entryFilter(true));
+ return cache.evict(key, filter);
else
return cache.evict(key);
}
@@ -1061,7 +887,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public void evictAll(@Nullable Collection<? extends K> keys) {
if (predicate() != null)
- cache.evictAll(keys, entryFilter(true));
+ cache.evictAll(keys, filter);
else
cache.evictAll(keys);
}
@@ -1093,12 +919,12 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public boolean clearLocally(K key) {
- return cache.clearLocally0(key, entryFilter(true));
+ return cache.clearLocally0(key, filter);
}
/** {@inheritDoc} */
@Override public boolean compact(K key) throws IgniteCheckedException {
- return cache.compact(key, entryFilter(false));
+ return cache.compact(key, filter);
}
/** {@inheritDoc} */
@@ -1108,30 +934,30 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public V remove(K key,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
return removeAsync(key, filter).get();
}
/** {@inheritDoc} */
@Override public V remove(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
return removeAsync(key, entry, filter).get();
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<V> removeAsync(K key, IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Override public IgniteInternalFuture<V> removeAsync(K key, CacheEntryPredicate[] filter) {
return removeAsync(key, null, filter);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> removeAsync(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
- return cache.removeAsync(key, entry, and(filter, false));
+ @Nullable CacheEntryPredicate... filter) {
+ return cache.removeAsync(key, entry, and(filter));
}
/** {@inheritDoc} */
@Override public boolean removex(K key,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
return removexAsync(key, filter).get();
}
@@ -1147,20 +973,20 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public boolean removex(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
return removexAsync(key, entry, filter).get();
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> removexAsync(K key,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
+ @Nullable CacheEntryPredicate[] filter) {
return removexAsync(key, null, filter);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> removexAsync(K key, @Nullable GridCacheEntryEx entry,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) {
- return cache.removexAsync(key, entry, and(filter, false));
+ @Nullable CacheEntryPredicate... filter) {
+ return cache.removexAsync(key, entry, and(filter));
}
/** {@inheritDoc} */
@@ -1168,7 +994,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal");
// Check k-v predicate first.
- if (!isAll(key, newVal, true))
+ if (!isAll(key, newVal))
return new GridFinishedFuture<>(cctx.kernalContext(), new GridCacheReturn<V>(false));
return cache.replacexAsync(key, oldVal, newVal);
@@ -1186,7 +1012,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) {
- return !isAll(key, val, true) ? new GridFinishedFuture<>(cctx.kernalContext(),
+ return !isAll(key, val) ? new GridFinishedFuture<>(cctx.kernalContext(),
new GridCacheReturn<V>(false)) : cache.removexAsync(key, val);
}
@@ -1197,20 +1023,20 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> removeAsync(K key, V val) {
- return !isAll(key, val, true) ? new GridFinishedFuture<>(cctx.kernalContext(), false) :
+ return !isAll(key, val) ? new GridFinishedFuture<>(cctx.kernalContext(), false) :
cache.removeAsync(key, val);
}
/** {@inheritDoc} */
@Override public void removeAll(@Nullable Collection<? extends K> keys,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
- cache.removeAll(keys, and(filter, false));
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
+ cache.removeAll(keys, and(filter));
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllAsync(@Nullable Collection<? extends K> keys,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
- return cache.removeAllAsync(keys, and(filter, false));
+ @Nullable CacheEntryPredicate[] filter) {
+ return cache.removeAllAsync(keys, and(filter));
}
/** {@inheritDoc} */
@@ -1233,37 +1059,37 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public boolean lock(K key, long timeout,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) throws IgniteCheckedException {
- return cache.lock(key, timeout, and(filter, false));
+ @Nullable CacheEntryPredicate... filter) throws IgniteCheckedException {
+ return cache.lock(key, timeout, and(filter));
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockAsync(K key, long timeout,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
- return cache.lockAsync(key, timeout, and(filter, false));
+ @Nullable CacheEntryPredicate[] filter) {
+ return cache.lockAsync(key, timeout, and(filter));
}
/** {@inheritDoc} */
@Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
- return cache.lockAll(keys, timeout, and(filter, false));
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
+ return cache.lockAll(keys, timeout, and(filter));
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys, long timeout,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) {
- return cache.lockAllAsync(keys, timeout, and(filter, false));
+ @Nullable CacheEntryPredicate[] filter) {
+ return cache.lockAllAsync(keys, timeout, and(filter));
}
/** {@inheritDoc} */
- @Override public void unlock(K key, IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
- cache.unlock(key, and(filter, false));
+ @Override public void unlock(K key, CacheEntryPredicate[] filter) throws IgniteCheckedException {
+ cache.unlock(key, and(filter));
}
/** {@inheritDoc} */
@Override public void unlockAll(@Nullable Collection<? extends K> keys,
- @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
- cache.unlockAll(keys, and(filter, false));
+ @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException {
+ cache.unlockAll(keys, and(filter));
}
/** {@inheritDoc} */
@@ -1319,7 +1145,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
/** {@inheritDoc} */
@Override public Iterator<Cache.Entry<K, V>> iterator() {
- return cache.entrySet(entryFilter(true)).iterator();
+ return cache.entrySet(filter).iterator();
}
/** {@inheritDoc} */
@@ -1343,8 +1169,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
return new GridCacheProjectionImpl<>(
this,
cctx,
- noNullKvFilter.kvFilter,
- noNullEntryFilter.entryFilter,
+ filter,
flags,
subjId,
true,
@@ -1355,11 +1180,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(cctx);
- out.writeObject(noNullEntryFilter);
- out.writeObject(withNullEntryFilter);
-
- out.writeObject(noNullKvFilter);
- out.writeObject(withNullKvFilter);
+ out.writeObject(filter);
U.writeCollection(out, flags);
@@ -1371,11 +1192,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
cctx = (GridCacheContext<K, V>)in.readObject();
- noNullEntryFilter = (FullFilter<K, V>)in.readObject();
- withNullEntryFilter = (FullFilter<K, V>)in.readObject();
-
- noNullKvFilter = (KeyValueFilter<K, V>)in.readObject();
- withNullKvFilter = (KeyValueFilter<K, V>)in.readObject();
+ filter = (CacheEntryPredicate)in.readObject();
flags = U.readSet(in);
@@ -1390,106 +1207,4 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
@Override public String toString() {
return S.toString(GridCacheProjectionImpl.class, this);
}
-
- /**
- * @param <K> Key type.
- * @param <V> Value type.
- */
- public static class FullFilter<K, V> implements IgnitePredicate<Cache.Entry<K, V>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Key filter. */
- private KeyValueFilter<K, V> kvFilter;
-
- /** Entry filter. */
- private IgnitePredicate<? super Cache.Entry<K, V>> entryFilter;
-
- /**
- * @param kvFilter Key-value filter.
- * @param entryFilter Entry filter.
- */
- private FullFilter(KeyValueFilter<K, V> kvFilter, IgnitePredicate<? super Cache.Entry<K, V>> entryFilter) {
- this.kvFilter = kvFilter;
- this.entryFilter = entryFilter;
- }
-
- /**
- * @return {@code True} if has non-null key value or entry filter.
- */
- boolean hasFilter() {
- return (kvFilter != null && kvFilter.filter() != null) || entryFilter != null;
- }
-
- /**
- * @return Key-value filter.
- */
- public KeyValueFilter<K, V> keyValueFilter() {
- return kvFilter;
- }
-
- /**
- * @return Entry filter.
- */
- public IgnitePredicate<? super Cache.Entry<K, V>> entryFilter() {
- return entryFilter;
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(Cache.Entry<K, V> e) {
- if (kvFilter != null) {
- if (!kvFilter.apply(e.getKey(), e.getValue()))
- return false;
- }
-
- return F.isAll(e, entryFilter);
- }
- }
-
- /**
- * @param <K> Key type.
- * @param <V> Value type.
- */
- public static class KeyValueFilter<K, V> implements IgniteBiPredicate<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Key filter. */
- private IgniteBiPredicate<K, V> kvFilter;
-
- /** No nulls flag. */
- private boolean noNulls;
-
- /**
- * @param kvFilter Key-value filter.
- * @param noNulls Filter without null-values.
- */
- private KeyValueFilter(IgniteBiPredicate<K, V> kvFilter, boolean noNulls) {
- this.kvFilter = kvFilter;
- this.noNulls = noNulls;
- }
-
- /**
- * @return Key-value filter.
- */
- public IgniteBiPredicate<K, V> filter() {
- return kvFilter;
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(K k, V v) {
- if (k == null) // Should never happen, but just in case.
- return false;
-
- if (v == null)
- return !noNulls;
-
- if (kvFilter != null) {
- if (!kvFilter.apply(k, v))
- return false;
- }
-
- return true;
- }
- }
}