You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/08/25 09:22:59 UTC
[2/2] ignite git commit: IGNITE-6101 Try to improve local scans
performance
IGNITE-6101 Try to improve local scans performance
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c9d80a5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c9d80a5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c9d80a5
Branch: refs/heads/master
Commit: 1c9d80a540cbce0a9d9a65e3fac2e06f53b73f43
Parents: 79d47f8
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Fri Aug 25 12:22:44 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Aug 25 12:22:44 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/binary/BinaryUtils.java | 26 +
.../processors/cache/CacheObjectUtils.java | 65 ++-
.../processors/cache/GridCacheAdapter.java | 6 +-
.../processors/cache/GridCacheEntryEx.java | 10 +
.../processors/cache/GridCacheMapEntry.java | 27 +-
.../processors/cache/IgniteCacheProxyImpl.java | 26 +-
.../colocated/GridDhtDetachedCacheEntry.java | 4 +-
.../distributed/near/GridNearCacheEntry.java | 4 +-
.../processors/cache/query/CacheQueryEntry.java | 58 +++
.../query/GridCacheDistributedQueryManager.java | 16 +-
.../cache/query/GridCacheQueryAdapter.java | 53 ++-
.../cache/query/GridCacheQueryManager.java | 470 +++++++++----------
.../IgniteCacheObjectProcessorImpl.java | 164 -------
.../UserCacheObjectByteArrayImpl.java | 59 +++
.../cacheobject/UserCacheObjectImpl.java | 82 ++++
.../cacheobject/UserKeyCacheObjectImpl.java | 101 ++++
.../service/GridServiceProcessor.java | 6 +-
.../resources/META-INF/classnames.properties | 88 ++--
.../processors/cache/GridCacheTestEntryEx.java | 6 +
.../GridCacheQueryTransformerSelfTest.java | 41 ++
20 files changed, 788 insertions(+), 524 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index 74d1730..8970a4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -60,6 +60,12 @@ import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.internal.binary.builder.BinaryLazyValue;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectByteArrayImpl;
+import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectImpl;
+import org.apache.ignite.internal.processors.cacheobject.UserKeyCacheObjectImpl;
import org.apache.ignite.internal.util.MutableSingletonList;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -707,6 +713,26 @@ public class BinaryUtils {
}
/**
+ * @param obj Object to check.
+ * @return True if this is an object of a known type.
+ */
+ public static boolean knownCacheObject(Object obj) {
+ if (obj == null)
+ return false;
+
+ Class<?> cls= obj.getClass();
+
+ return cls == KeyCacheObjectImpl.class ||
+ cls == BinaryObjectImpl.class ||
+ cls == CacheObjectImpl.class ||
+ cls == CacheObjectByteArrayImpl.class ||
+ cls == BinaryEnumObjectImpl.class ||
+ cls == UserKeyCacheObjectImpl.class ||
+ cls == UserCacheObjectImpl.class ||
+ cls == UserCacheObjectByteArrayImpl.class;
+ }
+
+ /**
* @param arr Array to check.
* @return {@code true} if this array is of a known type.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
index 5afa751..aeca79e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
@@ -17,15 +17,14 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.util.MutableSingletonList;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
/**
* Cache object utility methods.
*/
@@ -36,10 +35,35 @@ public class CacheObjectUtils {
* @param cpy Copy value flag.
* @return Unwrapped object.
*/
+ public static Object unwrapBinaryIfNeeded(CacheObjectValueContext ctx, CacheObject o, boolean keepBinary, boolean cpy) {
+ return unwrapBinary(ctx, o, keepBinary, cpy);
+ }
+
+ /**
+ * @param o Object to unwrap.
+ * @param keepBinary Keep binary flag.
+ * @param cpy Copy value flag.
+ * @return Unwrapped object.
+ */
public static Object unwrapBinaryIfNeeded(CacheObjectValueContext ctx, Object o, boolean keepBinary, boolean cpy) {
if (o == null)
return null;
+ // TODO has to be overloaded
+ if (o instanceof Map.Entry) {
+ Map.Entry entry = (Map.Entry)o;
+
+ Object key = entry.getKey();
+
+ Object uKey = unwrapBinary(ctx, key, keepBinary, cpy);
+
+ Object val = entry.getValue();
+
+ Object uVal = unwrapBinary(ctx, val, keepBinary, cpy);
+
+ return (key != uKey || val != uVal) ? F.t(uKey, uVal) : o;
+ }
+
return unwrapBinary(ctx, o, keepBinary, cpy);
}
@@ -86,7 +110,10 @@ public class CacheObjectUtils {
Map<Object, Object> map0 = BinaryUtils.newMap(map);
for (Map.Entry<Object, Object> e : map.entrySet())
- map0.put(unwrapBinary(ctx, e.getKey(), false, cpy), unwrapBinary(ctx, e.getValue(), false, cpy));
+ // TODO why don't we use keepBinary parameter here?
+ map0.put(
+ unwrapBinary(ctx, e.getKey(), false, cpy),
+ unwrapBinary(ctx, e.getValue(), false, cpy));
return map0;
}
@@ -105,7 +132,7 @@ public class CacheObjectUtils {
col0 = new ArrayList<>(col.size());
for (Object obj : col)
- col0.add(unwrapBinary(ctx, obj, keepBinary, cpy));
+ col0.add(unwrapBinaryIfNeeded(ctx, obj, keepBinary, cpy));
return col0;
}
@@ -137,31 +164,25 @@ public class CacheObjectUtils {
*/
@SuppressWarnings("unchecked")
private static Object unwrapBinary(CacheObjectValueContext ctx, Object o, boolean keepBinary, boolean cpy) {
- if (o instanceof Map.Entry) {
- Map.Entry entry = (Map.Entry)o;
-
- Object key = entry.getKey();
-
- Object uKey = unwrapBinary(ctx, key, keepBinary, cpy);
+ if (o == null)
+ return o;
- Object val = entry.getValue();
+ while (BinaryUtils.knownCacheObject(o)) {
+ CacheObject co = (CacheObject)o;
- Object uVal = unwrapBinary(ctx, val, keepBinary, cpy);
+ if (!co.isPlatformType() && keepBinary)
+ return o;
- return (key != uKey || val != uVal) ? F.t(uKey, uVal) : o;
+ // It may be a collection of binaries
+ o = co.value(ctx, cpy);
}
- else if (BinaryUtils.knownCollection(o))
+
+ if (BinaryUtils.knownCollection(o))
return unwrapKnownCollection(ctx, (Collection<Object>)o, keepBinary, cpy);
else if (BinaryUtils.knownMap(o))
return unwrapBinariesIfNeeded(ctx, (Map<Object, Object>)o, keepBinary, cpy);
else if (o instanceof Object[])
return unwrapBinariesInArrayIfNeeded(ctx, (Object[])o, keepBinary, cpy);
- else if (o instanceof CacheObject) {
- CacheObject co = (CacheObject)o;
-
- if (!keepBinary || co.isPlatformType())
- return unwrapBinary(ctx, co.value(ctx, cpy), keepBinary, cpy);
- }
return o;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index fed716c..8e346ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -83,13 +83,13 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -3916,7 +3916,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return ctx.itHolder().iterator(iter, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
@Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
- return new CacheEntryImpl<>(e.getKey(), e.getValue());
+ // Actually Scan Query returns Iterator<CacheQueryEntry> by default,
+ // CacheQueryEntry implements both Map.Entry and Cache.Entry interfaces.
+ return (Cache.Entry<K, V>) e;
}
@Override protected void remove(Cache.Entry<K, V> item) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index e2bc7ff..b2cabac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -934,6 +935,15 @@ public interface GridCacheEntryEx {
throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
+ * @param row Already extracted value.
+ * @return Value.
+ * @throws IgniteCheckedException If failed to read from swap storage.
+ * @throws GridCacheEntryRemovedException If entry was removed.
+ */
+ @Nullable public CacheObject unswap(CacheDataRow row)
+ throws IgniteCheckedException, GridCacheEntryRemovedException;
+
+ /**
* Unswap ignoring flags.
*
* @param needVal If {@code false} then do not need to deserialize value during unswap.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index d991c86..61f6fb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -342,9 +342,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
+ @Override public final CacheObject unswap(CacheDataRow row) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ row = unswap(row, true);
+
+ return row != null ? row.value() : null;
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public final CacheObject unswap(boolean needVal)
throws IgniteCheckedException, GridCacheEntryRemovedException {
- CacheDataRow row = unswap(needVal, true);
+ CacheDataRow row = unswap(null, true);
return row != null ? row.value() : null;
}
@@ -352,13 +359,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/**
* Unswaps an entry.
*
- * @param needVal If {@code false} then do not to deserialize value during unswap.
+ * @param row Already extracted cache data.
* @param checkExpire If {@code true} checks for expiration, as result entry can be obsoleted or marked deleted.
* @return Value.
* @throws IgniteCheckedException If failed.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- @Nullable protected CacheDataRow unswap(boolean needVal, boolean checkExpire)
+ @Nullable protected CacheDataRow unswap(@Nullable CacheDataRow row, boolean checkExpire)
throws IgniteCheckedException, GridCacheEntryRemovedException {
boolean obsolete = false;
boolean deferred = false;
@@ -368,7 +375,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
checkObsolete();
if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) {
- CacheDataRow read = cctx.offheap().read(this);
+ assert row == null || row.key() == key: "Unexpected row key";
+
+ CacheDataRow read = row == null ? cctx.offheap().read(this) : row;
flags |= IS_UNSWAPPED_MASK;
@@ -572,7 +581,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (val == null) {
if (isStartVersion()) {
- unswap(true, false);
+ unswap(null, false);
val = this.val;
}
@@ -1322,7 +1331,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Load and remove from swap if it is new.
if (isNew())
- oldRow = unswap(retval, false);
+ oldRow = unswap(null, false);
old = val;
@@ -2408,7 +2417,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return null;
if (val == null && offheap)
- unswap(true, false);
+ unswap(null, false);
if (checkExpired()) {
if (cctx.deferredDelete()) {
@@ -2645,7 +2654,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean isNew = isStartVersion();
if (isNew)
- unswap(true, false);
+ unswap(null, false);
CacheObject val = this.val;
@@ -2949,7 +2958,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
checkObsolete();
if (isStartVersion())
- unswap(true, false);
+ unswap(null, false);
long expireTime = expireTimeExtras();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 54fcafa..bc486e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -375,31 +375,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN,
ctx.name(), ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() {
@Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException {
- final GridCloseableIterator iter0 = qry.executeScanQuery();
-
- final boolean needToConvert = transformer == null;
-
- return new GridCloseableIteratorAdapter<R>() {
- @Override protected R onNext() throws IgniteCheckedException {
- Object next = iter0.nextX();
-
- if (needToConvert) {
- Map.Entry<K, V> entry = (Map.Entry<K, V>)next;
-
- return (R)new CacheEntryImpl<>(entry.getKey(), entry.getValue());
- }
-
- return (R)next;
- }
-
- @Override protected boolean onHasNext() throws IgniteCheckedException {
- return iter0.hasNextX();
- }
-
- @Override protected void onClose() throws IgniteCheckedException {
- iter0.close();
- }
- };
+ return qry.executeScanQuery();
}
}, true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 7da3d4f..5566bb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -22,8 +22,8 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -53,7 +53,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Nullable @Override public CacheDataRow unswap(boolean needVal, boolean checkExpire) throws IgniteCheckedException {
+ @Nullable @Override public CacheDataRow unswap(CacheDataRow row, boolean checkExpire) throws IgniteCheckedException {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 6e606bf..ce728b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -30,9 +30,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvcc;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.F;
@@ -443,7 +443,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Nullable @Override public CacheDataRow unswap(boolean needVal, boolean checkExpire) {
+ @Nullable @Override public CacheDataRow unswap(CacheDataRow row, boolean checkExpire) {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java
new file mode 100644
index 0000000..4787464
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import javax.cache.Cache;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.internal.processors.cache.CacheEntryImplEx;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+final class CacheQueryEntry<K,V> extends IgniteBiTuple<K,V> implements Cache.Entry<K,V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public CacheQueryEntry() {
+ // No-op.
+ }
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ */
+ CacheQueryEntry(@Nullable K key, @Nullable V val) {
+ super(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T unwrap(Class<T> cls) {
+ if (cls != null && cls.isAssignableFrom(getClass()))
+ return cls.cast(this);
+
+ if (cls.isAssignableFrom(CacheEntryImpl.class))
+ return (T)new CacheEntryImpl<>(getKey(), getValue());
+
+ if (cls.isAssignableFrom(CacheEntry.class))
+ return (T)new CacheEntryImplEx<>(getKey(), getValue(), null);
+
+ throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 7f859a2..b860f02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -632,7 +631,20 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
if (locIter != null && locIter.hasNextX())
cur = locIter.nextX();
- return cur != null || (cur = fut.next()) != null;
+ return cur != null || (cur = convert(fut.next())) != null;
+ }
+
+ /**
+ * @param obj Entry to convert.
+ * @return Cache entry
+ */
+ private Object convert(Object obj) {
+ if(qry.transform() != null)
+ return obj;
+
+ Map.Entry e = (Map.Entry)obj;
+
+ return e == null ? null : new CacheQueryEntry(e.getKey(), e.getValue());
}
@Override protected void onClose() throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 023c03c..c4eae8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.query;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
@@ -517,7 +518,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
@Override public GridCloseableIterator executeScanQuery() throws IgniteCheckedException {
assert type == SCAN : "Wrong processing of qyery: " + type;
- Collection<ClusterNode> nodes = nodes();
+ // Affinity nodes snapshot.
+ Collection<ClusterNode> nodes = new ArrayList<>(nodes());
cctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -537,13 +539,15 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
final GridCacheQueryManager qryMgr = cctx.queries();
- if (part != null && !cctx.isLocal())
- return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
- else {
- boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
+ boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
- return loc ? qryMgr.scanQueryLocal(this, true) : qryMgr.scanQueryDistributed(this, nodes);
- }
+ if (loc)
+ return qryMgr.scanQueryLocal(this, true);
+
+ if (part != null)
+ return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
+ else
+ return qryMgr.scanQueryDistributed(this, nodes);
}
/**
@@ -621,12 +625,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/**
* Wrapper for queries with fallback.
*/
- private static class ScanQueryFallbackClosableIterator extends GridCloseableIteratorAdapter<Map.Entry> {
+ private static class ScanQueryFallbackClosableIterator extends GridCloseableIteratorAdapter {
/** */
private static final long serialVersionUID = 0L;
/** Query future. */
- private volatile T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> tuple;
+ private volatile T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> tuple;
/** Backups. */
private volatile Queue<ClusterNode> nodes;
@@ -653,7 +657,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
private boolean firstItemReturned;
/** */
- private Map.Entry cur;
+ private Object cur;
/**
* @param part Partition.
@@ -726,7 +730,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
}
else {
- final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null);
+ final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, qry.transform, null);
GridCacheQueryFutureAdapter fut =
(GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean, Collections.singleton(node));
@@ -736,13 +740,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
/** {@inheritDoc} */
- @Override protected Map.Entry onNext() throws IgniteCheckedException {
+ @Override protected Object onNext() throws IgniteCheckedException {
if (!onHasNext())
throw new NoSuchElementException();
assert cur != null;
- Map.Entry e = cur;
+ Object e = cur;
cur = null;
@@ -755,9 +759,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
if (cur != null)
return true;
- T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> t = tuple;
+ T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> t = tuple;
- GridCloseableIterator<Map.Entry> iter = t.get1();
+ GridCloseableIterator<Object> iter = t.get1();
if (iter != null) {
boolean hasNext = iter.hasNext();
@@ -773,14 +777,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
assert fut != null;
if (firstItemReturned)
- return (cur = (Map.Entry)fut.next()) != null;
+ return (cur = convert(fut.next())) != null;
try {
fut.awaitFirstPage();
firstItemReturned = true;
- return (cur = (Map.Entry)fut.next()) != null;
+ return (cur = convert(fut.next())) != null;
}
catch (IgniteClientDisconnectedCheckedException e) {
throw CU.convertToCacheException(e);
@@ -793,6 +797,19 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
/**
+ * @param obj Entry to convert.
+ * @return Cache entry
+ */
+ private Object convert(Object obj) {
+ if(qry.transform() != null)
+ return obj;
+
+ Map.Entry e = (Map.Entry)obj;
+
+ return e == null ? null : new CacheQueryEntry(e.getKey(), e.getValue());
+ }
+
+ /**
* @param e Exception for query run.
*/
private void retryIfPossible(IgniteCheckedException e) {
@@ -847,7 +864,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
@Override protected void onClose() throws IgniteCheckedException {
super.onClose();
- T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> t = tuple;
+ T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> t = tuple;
if (t != null && t.get1() != null)
t.get1().close();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 3e772cd..3e27720 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -40,10 +40,10 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import javax.cache.Cache;
-import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.cache.query.QueryMetrics;
@@ -63,7 +63,9 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
@@ -71,10 +73,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate;
@@ -821,22 +823,22 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @throws IgniteCheckedException If failed to get iterator.
*/
@SuppressWarnings({"unchecked"})
- private GridCloseableIterator<IgniteBiTuple<K, V>> scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode)
+ private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode)
throws IgniteCheckedException {
final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
try {
injectResources(keyValFilter);
- Integer part = qry.partition();
-
- if (cctx.isLocal())
- part = null;
+ Integer part = cctx.isLocal() ? null : qry.partition();
if (part != null && (part < 0 || part >= cctx.affinity().partitions()))
- return new GridEmptyCloseableIterator<>();
-
- final ExpiryPolicy plc = cctx.expiry();
+ return new GridEmptyCloseableIterator() {
+ @Override public void close() throws IgniteCheckedException {
+ closeScanFilter(keyValFilter);
+ super.close();
+ }
+ };
AffinityTopologyVersion topVer = GridQueryProcessor.getRequestAffinityTopologyVersion();
@@ -858,13 +860,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
"Partition can not be reserved");
- if (locPart0.state() != OWNING) {
- locPart0.release();
-
- throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
- "Partition can not be reserved");
- }
-
locPart = locPart0;
it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part);
@@ -872,19 +867,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
else {
locPart = null;
+ // TODO shouldn't we reserve all involved partitions?
it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer);
}
- return new PeekValueExpiryAwareIterator(it, plc, topVer, keyValFilter, qry.keepBinary(), locNode) {
- @Override protected void onClose() {
- super.onClose();
-
- if (locPart != null)
- locPart.release();
-
- closeScanFilter(keyValFilter);
- }
- };
+ return new ScanQueryIterator(it, qry, topVer, locPart, keyValFilter, locNode, cctx, log);
}
catch (IgniteCheckedException | RuntimeException e) {
closeScanFilter(keyValFilter);
@@ -1189,9 +1176,16 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
- while (!Thread.currentThread().isInterrupted() && iter.hasNext()) {
+ CacheObjectContext objCtx = cctx.cacheObjectContext();
+
+ while (!Thread.currentThread().isInterrupted()) {
long start = statsEnabled ? System.nanoTime() : 0L;
+ // Need to call it after gathering start time because
+ // actual row extracting may happen inside this method.
+ if(!iter.hasNext())
+ break;
+
IgniteBiTuple<K, V> row = iter.next();
// Query is cancelled.
@@ -1249,8 +1243,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
V val0 = null;
if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
- key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
- val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
+ key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, qry.keepBinary(), false);
+ val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, qry.keepBinary(), false);
switch (type) {
case SQL:
@@ -1320,9 +1314,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (rdc != null || trans != null) {
if (key0 == null)
- key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
+ key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, qry.keepBinary(), false);
if (val0 == null)
- val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
+ val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, qry.keepBinary(), false);
Cache.Entry<K, V> entry = new CacheEntryImpl(key0, val0);
@@ -1422,22 +1416,24 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* Process local scan query.
*
* @param qry Query.
- * @param updStatisticsIfNeeded Update statistics flag.
+ * @param updateStatistics Update statistics flag.
*/
@SuppressWarnings({"unchecked", "serial"})
protected GridCloseableIterator scanQueryLocal(final GridCacheQueryAdapter qry,
- final boolean updStatisticsIfNeeded) throws IgniteCheckedException {
+ boolean updateStatistics) throws IgniteCheckedException {
if (!enterBusy())
throw new IllegalStateException("Failed to process query request (grid is stopping).");
final boolean statsEnabled = cctx.config().isStatisticsEnabled();
- boolean needUpdStatistics = updStatisticsIfNeeded && statsEnabled;
+ updateStatistics &= statsEnabled;
long startTime = U.currentTimeMillis();
final String namex = cctx.name();
+ final IgniteBiPredicate<K, V> scanFilter = qry.scanFilter();
+
try {
assert qry.type() == SCAN;
@@ -1445,7 +1441,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
log.debug("Running local SCAN query: " + qry);
final String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
- final IgniteBiPredicate filter = qry.scanFilter();
final ClusterNode locNode = cctx.localNode();
final UUID subjId = qry.subjectId();
@@ -1458,80 +1453,23 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
namex,
null,
null,
- filter,
+ scanFilter,
null,
null,
subjId,
taskName));
}
- final GridCloseableIterator<IgniteBiTuple<K, V>> iter = scanIterator(qry, true);
+ GridCloseableIterator it = scanIterator(qry, true);
- if (updStatisticsIfNeeded)
- needUpdStatistics = false;
-
- final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
- return new GridCloseableIteratorAdapter<Object>() {
- @Override protected Object onNext() throws IgniteCheckedException {
- long start = statsEnabled ? System.nanoTime() : 0L;
+ updateStatistics = false;
- IgniteBiTuple<K, V> next = iter.nextX();
-
- if (statsEnabled) {
- CacheMetricsImpl metrics = cctx.cache().metrics0();
-
- metrics.onRead(true);
-
- metrics.addGetTimeNanos(System.nanoTime() - start);
- }
-
- if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
- cctx.gridEvents().record(new CacheQueryReadEvent<>(
- cctx.localNode(),
- "Scan query entry read.",
- EVT_CACHE_QUERY_OBJECT_READ,
- CacheQueryType.SCAN.name(),
- namex,
- null,
- null,
- filter,
- null,
- null,
- subjId,
- taskName,
- next.getKey(),
- next.getValue(),
- null,
- null));
- }
-
- IgniteClosure transform = qry.transform();
-
- if (transform == null)
- return next;
-
- Cache.Entry<K, V> entry;
-
- if (qry.keepBinary())
- entry = cctx.cache().keepBinary().getEntry(next.getKey());
- else
- entry = cctx.cache().getEntry(next.getKey());
-
- return transform.apply(entry);
- }
-
- @Override protected boolean onHasNext() throws IgniteCheckedException {
- return iter.hasNextX();
- }
-
- @Override protected void onClose() throws IgniteCheckedException {
- iter.close();
- }
- };
+ return it;
}
catch (Exception e) {
- if (needUpdStatistics)
+ closeScanFilter(scanFilter);
+
+ if (updateStatistics)
cctx.queries().collectMetrics(GridCacheQueryType.SCAN, namex, startTime,
U.currentTimeMillis() - startTime, true);
@@ -2047,8 +1985,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private static final long serialVersionUID = 0L;
/**
- * Number of fields to report when no fields defined.
- * Includes _key and _val columns.
+ * Number of fields to report when no fields defined. Includes _key and _val columns.
*/
private static final int NO_FIELDS_COLUMNS_COUNT = 2;
@@ -2862,14 +2799,68 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
- *
+ * The map prevents put to the map in case the specified request has been removed previously.
*/
- private class PeekValueExpiryAwareIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
+ private class RequestFutureMap extends LinkedHashMap<Long, GridFutureAdapter<QueryResult<K, V>>> {
/** */
private static final long serialVersionUID = 0L;
+ /** Count of canceled keys */
+ private static final int CANCELED_COUNT = 128;
+
+ /**
+ * The ID of the canceled request is stored to the set in case remove(reqId) is called before put(reqId,
+ * future).
+ */
+ private Set<Long> canceled;
+
+ /** {@inheritDoc} */
+ @Override public GridFutureAdapter<QueryResult<K, V>> remove(Object key) {
+ if (containsKey(key))
+ return super.remove(key);
+ else {
+ if (canceled == null) {
+ canceled = Collections.newSetFromMap(
+ new LinkedHashMap<Long, Boolean>() {
+ @Override protected boolean removeEldestEntry(Map.Entry<Long, Boolean> eldest) {
+ return size() > CANCELED_COUNT;
+ }
+ });
+ }
+
+ canceled.add((Long)key);
+
+ return null;
+ }
+ }
+
+ /**
+ * @return true if the key is canceled
+ */
+ boolean isCanceled(Long key) {
+ return canceled != null && canceled.contains(key);
+ }
+ }
+
+ /** */
+ private static final class ScanQueryIterator<K, V> extends GridCloseableIteratorAdapter<Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final GridDhtCacheAdapter dht;
+
+ /** */
+ private final GridDhtLocalPartition locPart;
+
/** */
- private final ExpiryPolicy plc;
+ private final IgniteBiPredicate<K, V> scanFilter;
+
+ /** */
+ private final boolean statsEnabled;
+
+ /** */
+ private final GridIterator<CacheDataRow> it;
/** */
private final GridCacheAdapter cache;
@@ -2878,73 +2869,94 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private final AffinityTopologyVersion topVer;
/** */
- private final GridDhtCacheAdapter dht;
+ private final boolean keepBinary;
/** */
- private final IgniteBiPredicate<K, V> keyValFilter;
+ private final boolean readEvt;
/** */
- private boolean locNode;
+ private final String cacheName;
/** */
- private final boolean keepBinary;
+ private final UUID subjId;
/** */
- private IgniteBiTuple<K, V> next;
+ private final String taskName;
/** */
- private IgniteCacheExpiryPolicy expiryPlc;
+ private final IgniteClosure transform;
+
+ /** */
+ private final CacheObjectContext objCtx;
+
+ /** */
+ private final GridCacheContext cctx;
+
+ /** */
+ private final IgniteLogger log;
/** */
- private GridIterator<CacheDataRow> it;
+ private Object next;
- /** Need advance. */
+ /** */
private boolean needAdvance;
+ /** */
+ private IgniteCacheExpiryPolicy expiryPlc;
+
/**
* @param it Iterator.
- * @param plc Expiry policy.
+ * @param qry Query.
* @param topVer Topology version.
- * @param keyValFilter Key-value filter.
- * @param keepBinary Keep binary flag from the query.
- * @param locNode Local node.
+ * @param locPart Local partition.
+ * @param scanFilter Scan filter.
+ * @param locNode Local node flag.
+ * @param cctx Cache context.
+ * @param log Logger.
*/
- private PeekValueExpiryAwareIterator(
+ ScanQueryIterator(
GridIterator<CacheDataRow> it,
- ExpiryPolicy plc,
+ GridCacheQueryAdapter qry,
AffinityTopologyVersion topVer,
- IgniteBiPredicate<K, V> keyValFilter,
- boolean keepBinary,
- boolean locNode
- ) {
+ GridDhtLocalPartition locPart,
+ IgniteBiPredicate<K, V> scanFilter,
+ boolean locNode,
+ GridCacheContext cctx,
+ IgniteLogger log) {
this.it = it;
- this.plc = plc;
this.topVer = topVer;
- this.keyValFilter = keyValFilter;
- this.locNode = locNode;
+ this.locPart = locPart;
+ this.scanFilter = scanFilter;
+ this.cctx = cctx;
+ this.log = log;
- dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
- cache = dht != null ? dht : cctx.cache();
+ statsEnabled = locNode && cctx.config().isStatisticsEnabled();
- this.keepBinary = keepBinary;
- expiryPlc = cctx.cache().expiryPolicy(plc);
+ readEvt = locNode && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
- needAdvance = true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onHasNext() {
- if (needAdvance) {
- advance();
-
- needAdvance = false;
+ if(readEvt){
+ taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
+ subjId = qry.subjectId();
+ }
+ else {
+ taskName = null;
+ subjId = null;
}
- return next != null;
+ // keep binary for remote scans if possible
+ keepBinary = (!locNode && scanFilter == null) || qry.keepBinary();
+ transform = qry.transform();
+ dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
+ cache = dht != null ? dht : cctx.cache();
+ objCtx = cctx.cacheObjectContext();
+ cacheName = cctx.name();
+
+ needAdvance = true;
+ expiryPlc = this.cctx.cache().expiryPolicy(null);
}
/** {@inheritDoc} */
- @Override public IgniteBiTuple<K, V> onNext() {
+ @Override protected Object onNext() {
if (needAdvance)
advance();
else
@@ -2957,26 +2969,64 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/** {@inheritDoc} */
+ @Override protected boolean onHasNext() {
+ if (needAdvance) {
+ advance();
+
+ needAdvance = false;
+ }
+
+ return next != null;
+ }
+
+ /** {@inheritDoc} */
@Override protected void onClose() {
- sendTtlUpdate();
+ if (expiryPlc != null && dht != null) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = null;
+ }
+
+ if (locPart != null)
+ locPart.release();
+
+ closeScanFilter(scanFilter);
}
/**
* Moves the iterator to the next cache entry.
*/
private void advance() {
- IgniteBiTuple<K, V> next0 = null;
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
+ Object next = null;
while (it.hasNext()) {
CacheDataRow row = it.next();
KeyCacheObject key = row.key();
-
CacheObject val;
if (expiryPlc != null) {
try {
- val = value(key);
+ CacheDataRow tmp = row;
+
+ while (true) {
+ try {
+ GridCacheEntryEx entry = cache.entryEx(key);
+
+ entry.unswap(tmp);
+
+ val = entry.peek(true, true, topVer, expiryPlc);
+
+ cctx.evicts().touch(entry, topVer);
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ tmp = null;
+ }
+ }
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -2985,126 +3035,58 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
val = null;
}
- if (dht != null && expiryPlc.readyToFlush(100)) {
+ if (dht != null && expiryPlc.readyToFlush(100))
dht.sendTtlUpdateRequest(expiryPlc);
-
- expiryPlc = cctx.cache().expiryPolicy(plc);
- }
}
else
val = row.value();
if (val != null) {
- boolean keepBinary0 = !locNode || keepBinary;
+ K key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, keepBinary, false);
+ V val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, keepBinary, false);
- next0 = F.t(
- (K)cctx.unwrapBinaryIfNeeded(key, keepBinary0),
- (V)cctx.unwrapBinaryIfNeeded(val, keepBinary0));
+ if (statsEnabled) {
+ CacheMetricsImpl metrics = cctx.cache().metrics0();
- boolean passPred = true;
+ metrics.onRead(true);
- if (keyValFilter != null) {
- Object key0 = next0.getKey();
- Object val0 = next0.getValue();
+ metrics.addGetTimeNanos(System.nanoTime() - start);
+ }
- if (keepBinary0 && !keepBinary) {
- key0 = (K)cctx.unwrapBinaryIfNeeded(key0, keepBinary);
- val0 = (V)cctx.unwrapBinaryIfNeeded(val0, keepBinary);
+ if (scanFilter == null || scanFilter.apply(key0, val0)) {
+ if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
+ cctx.gridEvents().record(new CacheQueryReadEvent<>(
+ cctx.localNode(),
+ "Scan query entry read.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.SCAN.name(),
+ cacheName,
+ null,
+ null,
+ scanFilter,
+ null,
+ null,
+ subjId,
+ taskName,
+ key0,
+ val0,
+ null,
+ null));
}
- passPred = keyValFilter.apply((K)key0, (V)val0);
- }
+ next = transform == null ? new CacheQueryEntry<>(key0, val0)
+ : transform.apply(new CacheQueryEntry<>(key0, val0));
- if (passPred)
break;
- else
- next0 = null;
+ }
}
}
- next = next0;
-
- if (next == null)
- sendTtlUpdate();
- }
-
- /**
- * Sends TTL update.
- */
- private void sendTtlUpdate() {
- if (dht != null && expiryPlc != null) {
+ if ((this.next = next) == null && expiryPlc != null && dht != null) {
dht.sendTtlUpdateRequest(expiryPlc);
expiryPlc = null;
}
}
-
- /**
- * @param key Key.
- * @return Value.
- * @throws IgniteCheckedException If failed to peek value.
- */
- private CacheObject value(KeyCacheObject key) throws IgniteCheckedException {
- while (true) {
- try {
- GridCacheEntryEx entry = cache.entryEx(key);
-
- entry.unswap();
-
- CacheObject cacheObj = entry.peek(true, true, topVer, expiryPlc);
-
- cctx.evicts().touch(entry, topVer);
-
- return cacheObj;
- }
- catch (GridCacheEntryRemovedException ignore) {
- // No-op.
- }
- }
- }
- }
-
- /**
- * The map prevents put to the map in case the specified request has been removed previously.
- */
- private class RequestFutureMap extends LinkedHashMap<Long, GridFutureAdapter<QueryResult<K, V>>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Count of canceled keys */
- private static final int CANCELED_COUNT = 128;
-
- /**
- * The ID of the canceled request is stored to the set in case
- * remove(reqId) is called before put(reqId, future).
- */
- private Set<Long> canceled;
-
- /** {@inheritDoc} */
- @Override public GridFutureAdapter<QueryResult<K, V>> remove(Object key) {
- if (containsKey(key))
- return super.remove(key);
- else {
- if (canceled == null) {
- canceled = Collections.newSetFromMap(
- new LinkedHashMap<Long, Boolean>() {
- @Override protected boolean removeEldestEntry(Map.Entry<Long, Boolean> eldest) {
- return size() > CANCELED_COUNT;
- }
- });
- }
-
- canceled.add((Long)key);
-
- return null;
- }
- }
-
- /**
- * @return true if the key is canceled
- */
- boolean isCanceled(Long key) {
- return canceled != null && canceled.contains(key);
- }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 70711e5..17be90f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cacheobject;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.UUID;
@@ -40,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -341,166 +339,4 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
return false;
}
- /**
- * Wraps key provided by user, must be serialized before stored in cache.
- */
- private static class UserKeyCacheObjectImpl extends KeyCacheObjectImpl {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- *
- */
- public UserKeyCacheObjectImpl() {
- //No-op.
- }
-
- /**
- * @param key Key.
- * @param part Partition.
- */
- UserKeyCacheObjectImpl(Object key, int part) {
- super(key, null, part);
- }
-
- /**
- * @param key Key.
- * @param valBytes Marshalled key.
- * @param part Partition.
- */
- UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) {
- super(key, valBytes, part);
- }
-
- /** {@inheritDoc} */
- @Override public KeyCacheObject copy(int part) {
- if (this.partition() == part)
- return this;
-
- return new UserKeyCacheObjectImpl(val, valBytes, part);
- }
-
- /** {@inheritDoc} */
- @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
- try {
- IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
-
- if (!proc.immutable(val)) {
- if (valBytes == null)
- valBytes = proc.marshal(ctx, val);
-
- boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
-
- ClassLoader ldr = p2pEnabled ?
- IgniteUtils.detectClassLoader(IgniteUtils.detectClass(this.val)) : U.gridClassLoader();
-
- Object val = proc.unmarshal(ctx, valBytes, ldr);
-
- KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
-
- key.partition(partition());
-
- return key;
- }
-
- KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
-
- key.partition(partition());
-
- return key;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Failed to marshal object: " + val, e);
- }
- }
- }
-
- /**
- * Wraps value provided by user, must be serialized before stored in cache.
- */
- private static class UserCacheObjectImpl extends CacheObjectImpl {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- *
- */
- public UserCacheObjectImpl() {
- //No-op.
- }
-
- /**
- * @param val Value.
- * @param valBytes Value bytes.
- */
- public UserCacheObjectImpl(Object val, byte[] valBytes) {
- super(val, valBytes);
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
- return super.value(ctx, false); // Do not need copy since user value is not in cache.
- }
-
- /** {@inheritDoc} */
- @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
- try {
- IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
-
- if (valBytes == null)
- valBytes = proc.marshal(ctx, val);
-
- if (ctx.storeValue()) {
- boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
-
- ClassLoader ldr = p2pEnabled ?
- IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader();
-
- Object val = this.val != null && proc.immutable(this.val) ? this.val :
- proc.unmarshal(ctx, valBytes, ldr);
-
- return new CacheObjectImpl(val, valBytes);
- }
-
- return new CacheObjectImpl(null, valBytes);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Failed to marshal object: " + val, e);
- }
- }
- }
-
- /**
- * Wraps value provided by user, must be copied before stored in cache.
- */
- private static class UserCacheObjectByteArrayImpl extends CacheObjectByteArrayImpl {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- *
- */
- public UserCacheObjectByteArrayImpl() {
- // No-op.
- }
-
- /**
- * @param val Value.
- */
- public UserCacheObjectByteArrayImpl(byte[] val) {
- super(val);
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
- return super.value(ctx, false); // Do not need copy since user value is not in cache.
- }
-
- /** {@inheritDoc} */
- @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
- byte[] valCpy = Arrays.copyOf(val, val.length);
-
- return new CacheObjectByteArrayImpl(valCpy);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java
new file mode 100644
index 0000000..aa4d5f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cacheobject;
+
+import java.util.Arrays;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wraps value provided by user, must be copied before stored in cache.
+ */
+public class UserCacheObjectByteArrayImpl extends CacheObjectByteArrayImpl {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ *
+ */
+ public UserCacheObjectByteArrayImpl() {
+ // No-op.
+ }
+
+ /**
+ * @param val Value.
+ */
+ public UserCacheObjectByteArrayImpl(byte[] val) {
+ super(val);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
+ return super.value(ctx, false); // Do not need copy since user value is not in cache.
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
+ byte[] valCpy = Arrays.copyOf(val, val.length);
+
+ return new CacheObjectByteArrayImpl(valCpy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java
new file mode 100644
index 0000000..241c12b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cacheobject;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wraps value provided by user, must be serialized before stored in cache.
+ */
+public class UserCacheObjectImpl extends CacheObjectImpl {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ *
+ */
+ public UserCacheObjectImpl() {
+ //No-op.
+ }
+
+ /**
+ * @param val Value.
+ * @param valBytes Value bytes.
+ */
+ public UserCacheObjectImpl(Object val, byte[] valBytes) {
+ super(val, valBytes);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
+ return super.value(ctx, false); // Do not need copy since user value is not in cache.
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
+ try {
+ IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
+
+ if (valBytes == null)
+ valBytes = proc.marshal(ctx, val);
+
+ if (ctx.storeValue()) {
+ boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
+
+ ClassLoader ldr = p2pEnabled ?
+ IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader();
+
+ Object val = this.val != null && proc.immutable(this.val) ? this.val :
+ proc.unmarshal(ctx, valBytes, ldr);
+
+ return new CacheObjectImpl(val, valBytes);
+ }
+
+ return new CacheObjectImpl(null, valBytes);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to marshal object: " + val, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java
new file mode 100644
index 0000000..de57667
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cacheobject;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Wraps key provided by user, must be serialized before stored in cache.
+ */
+public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ *
+ */
+ public UserKeyCacheObjectImpl() {
+ //No-op.
+ }
+
+ /**
+ * @param key Key.
+ * @param part Partition.
+ */
+ UserKeyCacheObjectImpl(Object key, int part) {
+ super(key, null, part);
+ }
+
+ /**
+ * @param key Key.
+ * @param valBytes Marshalled key.
+ * @param part Partition.
+ */
+ UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) {
+ super(key, valBytes, part);
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject copy(int part) {
+ if (this.partition() == part)
+ return this;
+
+ return new UserKeyCacheObjectImpl(val, valBytes, part);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
+ try {
+ IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
+
+ if (!proc.immutable(val)) {
+ if (valBytes == null)
+ valBytes = proc.marshal(ctx, val);
+
+ boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
+
+ ClassLoader ldr = p2pEnabled ?
+ IgniteUtils.detectClassLoader(IgniteUtils.detectClass(this.val)) : U.gridClassLoader();
+
+ Object val = proc.unmarshal(ctx, valBytes, ldr);
+
+ KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
+
+ key.partition(partition());
+
+ return key;
+ }
+
+ KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
+
+ key.partition(partition());
+
+ return key;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to marshal object: " + val, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 46fcfea..1d8720c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -26,7 +26,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -61,7 +60,6 @@ import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
-import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -1309,7 +1307,9 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
return cache.context().itHolder().iterator(iter,
new CacheIteratorConverter<Cache.Entry<Object, Object>, Map.Entry<Object, Object>>() {
@Override protected Cache.Entry<Object, Object> convert(Map.Entry<Object, Object> e) {
- return new CacheEntryImpl<>(e.getKey(), e.getValue());
+ // Actually Scan Query returns Iterator<CacheQueryEntry> by default,
+ // CacheQueryEntry implements both Map.Entry and Cache.Entry interfaces.
+ return (Cache.Entry<Object, Object>)e;
}
@Override protected void remove(Cache.Entry<Object, Object> item) {