You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/08/31 06:33:18 UTC
[28/38] ignite git commit: IGNITE-2546 - Transformers for SCAN
queries. Fixes #949.
IGNITE-2546 - Transformers for SCAN queries. Fixes #949.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/407071e4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/407071e4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/407071e4
Branch: refs/heads/ignite-3443
Commit: 407071e466d1a4fcec86571d4791ace8bb206f53
Parents: 0465874
Author: Eduard Shangareev <es...@gridgain.com>
Authored: Mon Aug 29 17:32:31 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Mon Aug 29 17:32:31 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 15 +
.../processors/cache/IgniteCacheProxy.java | 112 +++-
.../processors/cache/query/CacheQuery.java | 11 +-
.../query/GridCacheDistributedQueryManager.java | 22 +-
.../cache/query/GridCacheLocalQueryManager.java | 3 +-
.../cache/query/GridCacheQueryAdapter.java | 69 ++-
.../cache/query/GridCacheQueryBean.java | 8 +-
.../cache/query/GridCacheQueryInfo.java | 8 +-
.../cache/query/GridCacheQueryManager.java | 125 ++--
.../cache/query/GridCacheQueryRequest.java | 6 +-
.../GridCacheQueryTransformerSelfTest.java | 570 +++++++++++++++++++
.../multijvm/IgniteCacheProcessProxy.java | 6 +
.../IgniteCacheQuerySelfTestSuite.java | 2 +
13 files changed, 832 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 40eedaf..2290fc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -56,6 +56,7 @@ import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
import org.apache.ignite.transactions.TransactionHeuristicException;
@@ -295,6 +296,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
public <R> QueryCursor<R> query(Query<R> qry);
/**
+ * Queries the cache transforming the entries on the server nodes. Can be used, for example,
+ * to avoid network overhead in case only one field out of the large is required by client.
+ * <p>
+ * Currently transformers are supported ONLY for {@link ScanQuery}. Passing any other
+ * subclass of {@link Query} interface to this method will end up with
+ * {@link UnsupportedOperationException}.
+ *
+ * @param qry Query.
+ * @param transformer Transformer.
+ * @return Cursor.
+ */
+ public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer);
+
+ /**
* Allows for iteration over local cache entries.
*
* @param peekModes Peek modes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 0d7bc6a..9b26c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
import org.apache.ignite.plugin.security.SecurityPermission;
@@ -466,50 +467,74 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
- * @param filter Filter.
+ * @param scanQry ScanQry.
+ * @param transformer Transformer
* @param grp Optional cluster group.
* @return Cursor.
*/
@SuppressWarnings("unchecked")
- private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp)
+ private <T, R> QueryCursor<R> query(
+ final ScanQuery scanQry,
+ @Nullable final IgniteClosure<T, R> transformer,
+ @Nullable ClusterGroup grp)
throws IgniteCheckedException {
- final CacheQuery<Map.Entry<K, V>> qry;
+
+ final CacheQuery<R> qry;
boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary();
- if (filter instanceof ScanQuery) {
- IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
+ IgniteBiPredicate<K, V> p = scanQry.getFilter();
- qry = ctx.queries().createScanQuery(p, ((ScanQuery)filter).getPartition(), isKeepBinary);
+ qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(), isKeepBinary);
- if (grp != null)
- qry.projection(grp);
+ if (grp != null)
+ qry.projection(grp);
- final GridCloseableIterator<Entry<K, V>> iter = ctx.kernalContext().query().executeQuery(ctx,
- new IgniteOutClosureX<GridCloseableIterator<Entry<K, V>>>() {
- @Override public GridCloseableIterator<Entry<K, V>> applyx() throws IgniteCheckedException {
- final GridCloseableIterator<Map.Entry> iter0 = qry.executeScanQuery();
+ final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(ctx,
+ new IgniteOutClosureX<GridCloseableIterator<R>>() {
+ @Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException {
+ final GridCloseableIterator iter0 = qry.executeScanQuery();
- return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() {
- @Override protected Cache.Entry<K, V> onNext() throws IgniteCheckedException {
- Map.Entry<K, V> next = iter0.nextX();
+ final boolean needToConvert = transformer == null;
- return new CacheEntryImpl<>(next.getKey(), next.getValue());
- }
+ return new GridCloseableIteratorAdapter<R>() {
+ @Override protected R onNext() throws IgniteCheckedException {
+ Object next = iter0.nextX();
- @Override protected boolean onHasNext() throws IgniteCheckedException {
- return iter0.hasNextX();
- }
+ if (needToConvert) {
+ Map.Entry<K, V> entry = (Map.Entry<K, V>)next;
- @Override protected void onClose() throws IgniteCheckedException {
- iter0.close();
+ return (R) new CacheEntryImpl<>(entry.getKey(), entry.getValue());
}
- };
- }
- }, false);
- return new QueryCursorImpl<>(iter);
- }
+ return (R)next;
+ }
+
+ @Override protected boolean onHasNext() throws IgniteCheckedException {
+ return iter0.hasNextX();
+ }
+
+ @Override protected void onClose() throws IgniteCheckedException {
+ iter0.close();
+ }
+ };
+ }
+ }, false);
+
+ return new QueryCursorImpl<>(iter);
+ }
+
+ /**
+ * @param filter Filter.
+ * @param grp Optional cluster group.
+ * @return Cursor.
+ */
+ @SuppressWarnings("unchecked")
+ private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp)
+ throws IgniteCheckedException {
+ final CacheQuery<Map.Entry<K, V>> qry;
+
+ boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary();
final CacheQueryFuture<Map.Entry<K, V>> fut;
@@ -692,6 +717,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
}
+ if (qry instanceof ScanQuery)
+ return query((ScanQuery)qry, null, projection(qry.isLocal()));
+
return (QueryCursor<R>)query(qry, projection(qry.isLocal()));
}
catch (Exception e) {
@@ -705,6 +733,36 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
+ /** {@inheritDoc} */
+ @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
+ A.notNull(qry, "qry");
+ A.notNull(transformer, "transformer");
+
+ if (!(qry instanceof ScanQuery))
+ throw new UnsupportedOperationException("Transformers are supported only for SCAN queries.");
+
+ GridCacheGateway<K, V> gate = this.gate;
+
+ CacheOperationContext prev = onEnter(gate, opCtx);
+
+ try {
+ ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ validate(qry);
+
+ return query((ScanQuery<K, V>)qry, transformer, projection(qry.isLocal()));
+ }
+ catch (Exception e) {
+ if (e instanceof CacheException)
+ throw (CacheException)e;
+
+ throw new CacheException(e);
+ }
+ finally {
+ onLeave(gate, prev);
+ }
+ }
+
/**
* @return {@code true} If this is a replicated cache and we are on a data node.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 47c6e89..3fa041b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -273,15 +273,6 @@ public interface CacheQuery<T> {
public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args);
/**
- * Executes the query the same way as {@link #execute(Object...)} method but transforms result remotely.
- *
- * @param rmtTransform Remote transformer.
- * @param args Optional arguments.
- * @return Future for the query result.
- */
- public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, @Nullable Object... args);
-
- /**
* Gets metrics for this query.
*
* @return Query metrics.
@@ -296,5 +287,5 @@ public interface CacheQuery<T> {
/**
* @return Scan query iterator.
*/
- public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException;
+ public GridCloseableIterator executeScanQuery() throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 5f6cb8f..d34047e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -253,7 +253,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
*/
@Nullable private GridCacheQueryInfo distributedQueryInfo(UUID sndId, GridCacheQueryRequest req) {
IgniteReducer<Object, Object> rdc = req.reducer();
- IgniteClosure<Object, Object> trans = req.transformer();
+ IgniteClosure<Object, Object> trans = (IgniteClosure<Object, Object>)req.transformer();
ClusterNode sndNode = cctx.node(sndId);
@@ -578,16 +578,16 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
/** {@inheritDoc} */
@SuppressWarnings({"unchecked", "serial"})
- @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(final GridCacheQueryAdapter qry,
+ @Override public GridCloseableIterator scanQueryDistributed(final GridCacheQueryAdapter qry,
Collection<ClusterNode> nodes) throws IgniteCheckedException {
assert !cctx.isLocal() : cctx.name();
assert qry.type() == GridCacheQueryType.SCAN: qry;
- GridCloseableIterator<Map.Entry<K, V>> locIter0 = null;
+ GridCloseableIterator locIter0 = null;
for (ClusterNode node : nodes) {
if (node.isLocal()) {
- locIter0 = (GridCloseableIterator)scanQueryLocal(qry, false);
+ locIter0 = scanQueryLocal(qry, false);
Collection<ClusterNode> rmtNodes = new ArrayList<>(nodes.size() - 1);
@@ -603,21 +603,21 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
}
}
- final GridCloseableIterator<Map.Entry<K, V>> locIter = locIter0;
+ final GridCloseableIterator locIter = locIter0;
- final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null);
+ final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, qry.<K, V>transform(), null);
- final CacheQueryFuture<Map.Entry<K, V>> fut = (CacheQueryFuture<Map.Entry<K, V>>)queryDistributed(bean, nodes);
+ final CacheQueryFuture fut = (CacheQueryFuture)queryDistributed(bean, nodes);
- return new GridCloseableIteratorAdapter<Map.Entry<K, V>>() {
+ return new GridCloseableIteratorAdapter() {
/** */
- private Map.Entry<K, V> cur;
+ private Object cur;
- @Override protected Map.Entry<K, V> onNext() throws IgniteCheckedException {
+ @Override protected Object onNext() throws IgniteCheckedException {
if (!onHasNext())
throw new NoSuchElementException();
- Map.Entry<K, V> e = cur;
+ Object e = cur;
cur = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
index 183abde..147725b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.query;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -90,7 +89,7 @@ public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V
}
/** {@inheritDoc} */
- @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry,
+ @Override public GridCloseableIterator scanQueryDistributed(GridCacheQueryAdapter qry,
Collection<ClusterNode> nodes) throws IgniteCheckedException {
assert cctx.isLocal() : cctx.name();
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 90e14f4..f65b733 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -87,6 +87,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** */
private final IgniteBiPredicate<Object, Object> filter;
+ /** Transformer. */
+ private IgniteClosure<?, ?> transform;
+
/** Partition. */
private Integer part;
@@ -126,6 +129,39 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/**
* @param cctx Context.
* @param type Query type.
+ * @param filter Scan filter.
+ * @param part Partition.
+ * @param keepBinary Keep binary flag.
+ */
+ public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx,
+ GridCacheQueryType type,
+ @Nullable IgniteBiPredicate<Object, Object> filter,
+ @Nullable IgniteClosure<Map.Entry, Object> transform,
+ @Nullable Integer part,
+ boolean keepBinary) {
+ assert cctx != null;
+ assert type != null;
+ assert part == null || part >= 0;
+
+ this.cctx = cctx;
+ this.type = type;
+ this.filter = filter;
+ this.transform = transform;
+ this.part = part;
+ this.keepBinary = keepBinary;
+
+ log = cctx.logger(getClass());
+
+ metrics = new GridCacheQueryMetricsAdapter();
+
+ this.incMeta = false;
+ this.clsName = null;
+ this.clause = null;
+ }
+
+ /**
+ * @param cctx Context.
+ * @param type Query type.
* @param clsName Class name.
* @param clause Clause.
* @param filter Scan filter.
@@ -376,6 +412,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
/**
+ * @return Transformer.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable public <K, V> IgniteClosure<Map.Entry<K, V>, Object> transform() {
+ return (IgniteClosure<Map.Entry<K, V>, Object>) transform;
+ }
+
+ /**
* @return Partition.
*/
@Nullable public Integer partition() {
@@ -402,17 +446,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** {@inheritDoc} */
@Override public CacheQueryFuture<T> execute(@Nullable Object... args) {
- return execute(null, null, args);
+ return execute0(null, args);
}
/** {@inheritDoc} */
@Override public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args) {
- return execute(rmtReducer, null, args);
- }
-
- /** {@inheritDoc} */
- @Override public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, @Nullable Object... args) {
- return execute(null, rmtTransform, args);
+ return execute0(rmtReducer, args);
}
/** {@inheritDoc} */
@@ -427,13 +466,11 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/**
* @param rmtReducer Optional reducer.
- * @param rmtTransform Optional transformer.
* @param args Arguments.
* @return Future.
*/
@SuppressWarnings({"IfMayBeConditional", "unchecked"})
- private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer,
- @Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) {
+ private <R> CacheQueryFuture<R> execute0(@Nullable IgniteReducer<T, R> rmtReducer, @Nullable Object... args) {
assert type != SCAN : this;
Collection<ClusterNode> nodes;
@@ -455,7 +492,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
if (cctx.deploymentEnabled()) {
try {
- cctx.deploy().registerClasses(filter, rmtReducer, rmtTransform);
+ cctx.deploy().registerClasses(filter, rmtReducer);
cctx.deploy().registerClasses(args);
}
catch (IgniteCheckedException e) {
@@ -469,7 +506,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
taskHash = cctx.kernalContext().job().currentTaskNameHash();
final GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer,
- (IgniteClosure<Object, Object>)rmtTransform, args);
+ null, args);
final GridCacheQueryManager qryMgr = cctx.queries();
@@ -484,8 +521,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** {@inheritDoc} */
@SuppressWarnings({"IfMayBeConditional", "unchecked"})
- @Override public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException {
- assert type == SCAN: "Wrong processing of qyery: " + type;
+ @Override public GridCloseableIterator executeScanQuery() throws IgniteCheckedException {
+ assert type == SCAN : "Wrong processing of qyery: " + type;
Collection<ClusterNode> nodes = nodes();
@@ -508,7 +545,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
final GridCacheQueryManager qryMgr = cctx.queries();
if (part != null && !cctx.isLocal())
- return (GridCloseableIterator<R>)new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
+ return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
else {
boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
@@ -676,7 +713,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
try {
GridCloseableIterator it = qryMgr.scanQueryLocal(qry, true);
- tuple= new T2(it, null);
+ tuple = new T2(it, null);
}
catch (IgniteClientDisconnectedCheckedException e) {
throw CU.convertToCacheException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
index 5a4d693..286ddc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
@@ -33,7 +33,7 @@ public class GridCacheQueryBean {
private final IgniteReducer<Object, Object> rdc;
/** */
- private final IgniteClosure<Object, Object> trans;
+ private final IgniteClosure<?, ?> trans;
/** */
private final Object[] args;
@@ -45,7 +45,7 @@ public class GridCacheQueryBean {
* @param args Optional arguments.
*/
public GridCacheQueryBean(GridCacheQueryAdapter<?> qry, @Nullable IgniteReducer<Object, Object> rdc,
- @Nullable IgniteClosure<Object, Object> trans, @Nullable Object[] args) {
+ @Nullable IgniteClosure<?, ?> trans, @Nullable Object[] args) {
assert qry != null;
this.qry = qry;
@@ -71,7 +71,7 @@ public class GridCacheQueryBean {
/**
* @return Transformer.
*/
- @Nullable public IgniteClosure<Object, Object> transform() {
+ @Nullable public IgniteClosure<?, ?> transform() {
return trans;
@@ -88,4 +88,4 @@ public class GridCacheQueryBean {
@Override public String toString() {
return S.toString(GridCacheQueryBean.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
index 8d2e67d..0a108d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
@@ -31,7 +31,7 @@ class GridCacheQueryInfo {
private boolean loc;
/** */
- private IgniteClosure<Object, Object> trans;
+ private IgniteClosure<?, ?> trans;
/** */
private IgniteReducer<Object, Object> rdc;
@@ -71,7 +71,7 @@ class GridCacheQueryInfo {
*/
GridCacheQueryInfo(
boolean loc,
- IgniteClosure<Object, Object> trans,
+ IgniteClosure<?, ?> trans,
IgniteReducer<Object, Object> rdc,
GridCacheQueryAdapter<?> qry,
GridCacheLocalQueryFuture<?, ?, ?> locFut,
@@ -117,7 +117,7 @@ class GridCacheQueryInfo {
/**
* @return Transformer.
*/
- IgniteClosure<?, Object> transformer() {
+ IgniteClosure<?, ?> transformer() {
return trans;
}
@@ -167,4 +167,4 @@ class GridCacheQueryInfo {
@Override public String toString() {
return S.toString(GridCacheQueryInfo.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 163bac5..454ce04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -506,7 +506,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @return Iterator.
* @throws IgniteCheckedException If failed.
*/
- public abstract GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry,
+ public abstract GridCloseableIterator scanQueryDistributed(GridCacheQueryAdapter qry,
Collection<ClusterNode> nodes) throws IgniteCheckedException;
/**
@@ -1067,13 +1067,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final GridDhtLocalPartition locPart0 = locPart;
return new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), locNode, true) {
- @Override protected void onClose() {
- super.onClose();
+ @Override protected void onClose() {
+ super.onClose();
- if (locPart0 != null)
- locPart0.release();
- }
- };
+ if (locPart0 != null)
+ locPart0.release();
+ }
+ };
}
/**
@@ -1163,9 +1163,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
key = (K)cctx.unwrapBinaryIfNeeded(key, keepBinary);
- if (filter != null || locNode) {
+ if (filter != null || locNode)
val = (V)cctx.unwrapBinaryIfNeeded(val, keepBinary);
- }
if (filter != null && !filter.apply(key, val))
continue;
@@ -1187,14 +1186,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param locNode Local node.
* @return Final key-value iterator.
*/
- private GridIterator<IgniteBiTuple<K,V>> scanExpiryIterator(
+ private GridIterator<IgniteBiTuple<K, V>> scanExpiryIterator(
final Iterator<Map.Entry<byte[], byte[]>> it,
AffinityTopologyVersion topVer,
@Nullable final IgniteBiPredicate<K, V> filter,
ExpiryPolicy expPlc,
final boolean keepBinary,
boolean locNode) {
- Iterator <K> keyIter = new Iterator<K>() {
+ Iterator<K> keyIter = new Iterator<K>() {
/** {@inheritDoc} */
@Override public boolean hasNext() {
return it.hasNext();
@@ -1267,10 +1266,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
try {
// Preparing query closures.
- IgniteClosure<Object, Object> trans = (IgniteClosure<Object, Object>)qryInfo.transformer();
IgniteReducer<Object, Object> rdc = (IgniteReducer<Object, Object>)qryInfo.reducer();
- injectResources(trans);
injectResources(rdc);
GridCacheQueryAdapter<?> qry = qryInfo.query();
@@ -1289,7 +1286,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
res = qryInfo.local() ?
executeFieldsQuery(qry, qryInfo.arguments(), qryInfo.local(), qry.subjectId(), taskName,
- recipient(qryInfo.senderId(), qryInfo.requestId())) :
+ recipient(qryInfo.senderId(), qryInfo.requestId())) :
fieldsQueryResult(qryInfo, taskName);
// If metadata needs to be returned to user and cleaned from internal fields - copy it.
@@ -1460,10 +1457,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
try {
// Preparing query closures.
- IgniteClosure<Map.Entry<K, V>, Object> trans =
- (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+ IgniteClosure<Cache.Entry<K, V>, Object> trans =
+ (IgniteClosure<Cache.Entry<K, V>, Object>)qryInfo.transformer();
- IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
+ IgniteReducer<Cache.Entry<K, V>, Object> rdc = (IgniteReducer<Cache.Entry<K, V>, Object>)qryInfo.reducer();
injectResources(trans);
injectResources(rdc);
@@ -1481,13 +1478,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
res = loc ?
executeQuery(qry, qryInfo.arguments(), loc, qry.subjectId(), taskName,
- recipient(qryInfo.senderId(), qryInfo.requestId())) :
+ recipient(qryInfo.senderId(), qryInfo.requestId())) :
queryResult(qryInfo, taskName);
iter = res.iterator(recipient(qryInfo.senderId(), qryInfo.requestId()));
type = res.type();
- GridCacheAdapter<K, V> cache = cctx.cache();
+ final GridCacheAdapter<K, V> cache = cctx.cache();
if (log.isDebugEnabled())
log.debug("Received index iterator [iterHasNext=" + iter.hasNext() +
@@ -1518,7 +1515,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
break;
}
- K key = row.getKey();
+ final K key = row.getKey();
// Filter backups for SCAN queries, if it isn't partition scan.
// Other types are filtered in indexing manager.
@@ -1561,8 +1558,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
if (readEvt) {
- K key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
- V val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
+ K key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
+ V val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
switch (type) {
case SQL:
@@ -1630,27 +1627,33 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
}
- Map.Entry<K, V> entry = F.t(key, val);
+ if (rdc != null || trans != null) {
+ Cache.Entry<K, V> entry;
- // Unwrap entry for reducer or transformer only.
- if (rdc != null || trans != null)
- entry = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(entry, qry.keepBinary());
+ if (qry.keepBinary())
+ entry = cache.<K, V>keepBinary().getEntry(key);
+ else
+ entry = cache.<K, V>getEntry(key);
- // Reduce.
- if (rdc != null) {
- if (!rdc.collect(entry) || !iter.hasNext()) {
- onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
+ // Reduce.
+ if (rdc != null) {
+ if (!rdc.collect(entry) || !iter.hasNext()) {
+ onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
- pageSent = true;
+ pageSent = true;
- break;
+ break;
+ }
+ else
+ continue;
}
- else
- continue;
+
+ data.add(trans != null ? trans.apply(entry) :
+ !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
}
+ else
+ data.add(!loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
- data.add(trans != null ? trans.apply(entry) :
- !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
if (!loc) {
if (++cnt == pageSize || !iter.hasNext()) {
@@ -1720,7 +1723,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param updStatisticsIfNeeded Update statistics flag.
*/
@SuppressWarnings({"unchecked", "serial"})
- protected GridCloseableIterator<IgniteBiTuple<K, V>> scanQueryLocal(final GridCacheQueryAdapter qry,
+ protected GridCloseableIterator scanQueryLocal(final GridCacheQueryAdapter qry,
final boolean updStatisticsIfNeeded) throws IgniteCheckedException {
if (!enterBusy())
throw new IllegalStateException("Failed to process query request (grid is stopping).");
@@ -1769,8 +1772,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
- return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
- @Override protected IgniteBiTuple<K, V> onNext() throws IgniteCheckedException {
+ return new GridCloseableIteratorAdapter<Object>() {
+ @Override protected Object onNext() throws IgniteCheckedException {
long start = statsEnabled ? System.nanoTime() : 0L;
IgniteBiTuple<K, V> next = iter.nextX();
@@ -1803,7 +1806,20 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null));
}
- return next;
+ IgniteClosure transform = qry.transform();
+
+ if (transform == null)
+ return next;
+
+ Cache.Entry<K, V> entry;
+
+ if (qry.keepBinary())
+ entry = cctx.cache().keepBinary().getEntry(next.getKey());
+ else
+ entry = cctx.cache().getEntry(next.getKey());
+
+
+ return transform.apply(entry);
}
@Override protected boolean onHasNext() throws IgniteCheckedException {
@@ -1832,7 +1848,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @return Iterator.
* @throws IgniteCheckedException In case of error.
*/
- private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
+ private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo,
+ String taskName) throws IgniteCheckedException {
assert qryInfo != null;
final UUID sndId = qryInfo.senderId();
@@ -2845,7 +2862,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
if (locNode)
return new IgniteBiTuple<>(key, val);
- else{
+ else {
if (key instanceof CacheObject)
((CacheObject)key).prepareMarshal(cctx.cacheObjectContext());
@@ -3256,16 +3273,28 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param keepBinary Keep binary flag.
* @return Created query.
*/
- public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
+ public <R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
@Nullable Integer part, boolean keepBinary) {
+ return createScanQuery(filter, null, part, keepBinary);
+ }
- return new GridCacheQueryAdapter<>(cctx,
+ /**
+ * Creates user's predicate based scan query.
+ *
+ * @param filter Scan filter.
+ * @param part Partition.
+ * @param keepBinary Keep binary flag.
+ * @return Created query.
+ */
+ public <T, R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
+ @Nullable IgniteClosure<T, R> trans,
+ @Nullable Integer part, boolean keepBinary) {
+
+ return new GridCacheQueryAdapter(cctx,
SCAN,
- null,
- null,
- (IgniteBiPredicate<Object, Object>)filter,
+ filter,
+ trans,
part,
- false,
keepBinary);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index f50fba0..5610bef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -84,7 +84,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/** */
@GridDirectTransient
- private IgniteClosure<Object, Object> trans;
+ private IgniteClosure<?, ?> trans;
/** */
private byte[] transBytes;
@@ -233,7 +233,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
IgniteBiPredicate<Object, Object> keyValFilter,
@Nullable Integer part,
IgniteReducer<Object, Object> rdc,
- IgniteClosure<Object, Object> trans,
+ IgniteClosure<?, ?> trans,
int pageSize,
boolean incBackups,
Object[] args,
@@ -422,7 +422,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
/**
* @return Transformer.
*/
- public IgniteClosure<Object, Object> transformer() {
+ public IgniteClosure<?, ?> transformer() {
return trans;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
new file mode 100644
index 0000000..6b13e05
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SpiQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for scan query with transformer.
+ */
+public class GridCacheQueryTransformerSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+ cfg.setMarshaller(null);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(3);
+
+ Ignition.setClientMode(true);
+
+ startGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetKeys() throws Exception {
+ IgniteCache<Integer, String> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, "val" + i);
+
+ IgniteClosure<Cache.Entry<Integer, String>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, String>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, String> e) {
+ return e.getKey();
+ }
+ };
+
+ List<Integer> keys = cache.query(new ScanQuery<Integer, String>(), transformer).getAll();
+
+ assertEquals(50, keys.size());
+
+ Collections.sort(keys);
+
+ for (int i = 0; i < 50; i++)
+ assertEquals(i, keys.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetKeysFiltered() throws Exception {
+ IgniteCache<Integer, String> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, "val" + i);
+
+ IgniteBiPredicate<Integer, String> filter = new IgniteBiPredicate<Integer, String>() {
+ @Override public boolean apply(Integer k, String v) {
+ return k % 10 == 0;
+ }
+ };
+
+ IgniteClosure<Cache.Entry<Integer, String>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, String>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, String> e) {
+ return e.getKey();
+ }
+ };
+
+ List<Integer> keys = cache.query(new ScanQuery<>(filter), transformer).getAll();
+
+ assertEquals(5, keys.size());
+
+ Collections.sort(keys);
+
+ for (int i = 0; i < 5; i++)
+ assertEquals(i * 10, keys.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetObjectField() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+ return e.getValue().idx;
+ }
+ };
+
+ List<Integer> res = cache.query(new ScanQuery<Integer, Value>(), transformer).getAll();
+
+ assertEquals(50, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 50; i++)
+ assertEquals(i * 100, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetObjectFieldFiltered() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ IgniteBiPredicate<Integer, Value> filter = new IgniteBiPredicate<Integer, Value>() {
+ @Override public boolean apply(Integer k, Value v) {
+ return v.idx % 1000 == 0;
+ }
+ };
+
+ IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+ return e.getValue().idx;
+ }
+ };
+
+ List<Integer> res = cache.query(new ScanQuery<>(filter), transformer).getAll();
+
+ assertEquals(5, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 5; i++)
+ assertEquals(i * 1000, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testKeepBinary() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ IgniteCache<Integer, BinaryObject> binaryCache = cache.withKeepBinary();
+
+ IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+ return e.getValue().field("idx");
+ }
+ };
+
+ List<Integer> res = binaryCache.query(new ScanQuery<Integer, BinaryObject>(), transformer).getAll();
+
+ assertEquals(50, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 50; i++)
+ assertEquals(i * 100, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testKeepBinaryFiltered() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ IgniteCache<Integer, BinaryObject> binaryCache = cache.withKeepBinary();
+
+ IgniteBiPredicate<Integer, BinaryObject> filter = new IgniteBiPredicate<Integer, BinaryObject>() {
+ @Override public boolean apply(Integer k, BinaryObject v) {
+ return v.<Integer>field("idx") % 1000 == 0;
+ }
+ };
+
+ IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+ return e.getValue().field("idx");
+ }
+ };
+
+ List<Integer> res = binaryCache.query(new ScanQuery<>(filter), transformer).getAll();
+
+ assertEquals(5, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 5; i++)
+ assertEquals(i * 1000, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocal() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public List<Integer> call() throws Exception {
+ IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+ return e.getValue().idx;
+ }
+ };
+
+ return ignite.cache("test-cache").query(new ScanQuery<Integer, Value>().setLocal(true),
+ transformer).getAll();
+ }
+ });
+
+ List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+ assertEquals(50, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 50; i++)
+ assertEquals(i * 100, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocalFiltered() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public List<Integer> call() throws Exception {
+ IgniteBiPredicate<Integer, Value> filter = new IgniteBiPredicate<Integer, Value>() {
+ @Override public boolean apply(Integer k, Value v) {
+ return v.idx % 1000 == 0;
+ }
+ };
+
+ IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+ return e.getValue().idx;
+ }
+ };
+
+ return ignite.cache("test-cache").query(new ScanQuery<>(filter).setLocal(true),
+ transformer).getAll();
+ }
+ });
+
+ List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+ assertEquals(5, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 5; i++)
+ assertEquals(i * 1000, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocalKeepBinary() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public List<Integer> call() throws Exception {
+ IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+ return e.getValue().field("idx");
+ }
+ };
+
+ return ignite.cache("test-cache").withKeepBinary().query(
+ new ScanQuery<Integer, BinaryObject>().setLocal(true), transformer).getAll();
+ }
+ });
+
+ List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+ assertEquals(50, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 50; i++)
+ assertEquals(i * 100, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocalKeepBinaryFiltered() throws Exception {
+ IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+ try {
+ for (int i = 0; i < 50; i++)
+ cache.put(i, new Value("str" + i, i * 100));
+
+ Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public List<Integer> call() throws Exception {
+ IgniteBiPredicate<Integer, BinaryObject> filter = new IgniteBiPredicate<Integer, BinaryObject>() {
+ @Override public boolean apply(Integer k, BinaryObject v) {
+ return v.<Integer>field("idx") % 1000 == 0;
+ }
+ };
+
+ IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+ return e.getValue().field("idx");
+ }
+ };
+
+ return ignite.cache("test-cache").withKeepBinary().query(new ScanQuery<>(filter).setLocal(true),
+ transformer).getAll();
+ }
+ });
+
+ List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+ assertEquals(5, res.size());
+
+ Collections.sort(res);
+
+ for (int i = 0; i < 5; i++)
+ assertEquals(i * 1000, res.get(i).intValue());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUnsupported() throws Exception {
+ final IgniteCache<Integer, Integer> cache = grid().createCache("test-cache");
+
+ final IgniteClosure<Cache.Entry<Integer, Integer>, Integer> transformer =
+ new IgniteClosure<Cache.Entry<Integer, Integer>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, Integer> e) {
+ return null;
+ }
+ };
+
+ try {
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new SqlQuery<Integer, Integer>(Integer.class, "clause"), transformer);
+
+ return null;
+ }
+ },
+ UnsupportedOperationException.class,
+ "Transformers are supported only for SCAN queries."
+ );
+
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new SqlFieldsQuery("clause"), new IgniteClosure<List<?>, Object>() {
+ @Override public Object apply(List<?> objects) {
+ return null;
+ }
+ });
+
+ return null;
+ }
+ },
+ UnsupportedOperationException.class,
+ "Transformers are supported only for SCAN queries."
+ );
+
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new TextQuery<Integer, Integer>(Integer.class, "clause"), transformer);
+
+ return null;
+ }
+ },
+ UnsupportedOperationException.class,
+ "Transformers are supported only for SCAN queries."
+ );
+
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new SpiQuery<Integer, Integer>(), transformer);
+
+ return null;
+ }
+ },
+ UnsupportedOperationException.class,
+ "Transformers are supported only for SCAN queries."
+ );
+
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ cache.query(new ContinuousQuery<Integer, Integer>(), transformer);
+
+ return null;
+ }
+ },
+ UnsupportedOperationException.class,
+ "Transformers are supported only for SCAN queries."
+ );
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ */
+ private static class Value {
+ /** */
+ @SuppressWarnings("unused")
+ private String str;
+
+ /** */
+ private int idx;
+
+ /**
+ * @param str String.
+ * @param idx Integer.
+ */
+ public Value(String str, int idx) {
+ this.str = str;
+ this.idx = idx;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index 740b201..71dc964 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -172,6 +173,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
+ @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
+ throw new UnsupportedOperationException("Method should be supported.");
+ }
+
+ /** {@inheritDoc} */
@Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
return compute.call(new LocalEntriesTask<K, V>(cacheName, isAsync, peekModes));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 1b1908d..3652acd 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite
import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryTransformerSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
@@ -115,6 +116,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteBinaryWrappedObjectFieldsQuerySelfTest.class);
suite.addTestSuite(IgniteCacheQueryH2IndexingLeakTest.class);
suite.addTestSuite(IgniteCacheQueryNoRebalanceSelfTest.class);
+ suite.addTestSuite(GridCacheQueryTransformerSelfTest.class);
return suite;
}