You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/03/23 06:20:51 UTC

ignite git commit: IGNITE-2546 - Added transformers to SCAN queries

Repository: ignite
Updated Branches:
  refs/heads/ignite-2546 [created] 5730c06ae


IGNITE-2546 - Added transformers to SCAN queries


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5730c06a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5730c06a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5730c06a

Branch: refs/heads/ignite-2546
Commit: 5730c06ae3d41edb00f46a6b103a421974eac1a9
Parents: 0013955
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Tue Mar 22 22:20:39 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Tue Mar 22 22:20:39 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     |  15 +
 .../processors/cache/IgniteCacheProxy.java      |  81 +++
 .../cache/query/GridCacheQueryManager.java      |  43 +-
 .../GridCacheQueryTransformerSelfTest.java      | 570 +++++++++++++++++++
 .../multijvm/IgniteCacheProcessProxy.java       |   8 +-
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 6 files changed, 688 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5730c06a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index a791e38..5c4e37f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -56,6 +56,7 @@ import org.apache.ignite.lang.IgniteAsyncSupport;
 import org.apache.ignite.lang.IgniteAsyncSupported;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
 import org.jetbrains.annotations.Nullable;
@@ -289,6 +290,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
     public <R> QueryCursor<R> query(Query<R> qry);
 
     /**
+     * Queries the cache transforming the entries on the server nodes. Can be used, for example,
+     * to avoid network overhead in case only one field out of the large is required by client.
+     * <p>
+     * Currently transformers are supported ONLY for {@link ScanQuery}. Passing any other
+     * subclass of {@link Query} interface to this method will end up with
+     * {@link UnsupportedOperationException}.
+     *
+     * @param qry Query.
+     * @param transformer Transformer.
+     * @return Cursor.
+     */
+    public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer);
+
+    /**
      * Allows for iteration over local cache entries.
      *
      * @param peekModes Peek modes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5730c06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 6e8bcbf..91a2ad9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -83,6 +83,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -449,6 +450,56 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     /**
      * @param filter Filter.
      * @param grp Optional cluster group.
+     * @param transformer Transformer.
+     * @return Cursor.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @SuppressWarnings("unchecked")
+    private <R> QueryCursor<R> query(final ScanQuery<K, V> filter, @Nullable ClusterGroup grp,
+        final IgniteClosure transformer) throws IgniteCheckedException {
+        boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary();
+
+        IgniteBiPredicate<K, V> p = filter.getFilter();
+
+        final CacheQuery<Map.Entry<K, V>> qry = ctx.queries().createScanQuery(p, filter.getPartition(), isKeepBinary);
+
+        if (grp != null)
+            qry.projection(grp);
+
+        final CacheQueryFuture<R> fut = ctx.kernalContext().query().executeQuery(ctx,
+            new IgniteOutClosureX<CacheQueryFuture<R>>() {
+                @Override public CacheQueryFuture<R> applyx() throws IgniteCheckedException {
+                    return qry.execute(transformer);
+                }
+            }, false);
+
+        return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<R>() {
+            private R cur;
+
+            @Override protected R onNext() throws IgniteCheckedException {
+                if (!onHasNext())
+                    throw new NoSuchElementException();
+
+                R e = cur;
+
+                cur = null;
+
+                return e;
+            }
+
+            @Override protected boolean onHasNext() throws IgniteCheckedException {
+                return cur != null || (cur = fut.next()) != null;
+            }
+
+            @Override protected void onClose() throws IgniteCheckedException {
+                fut.cancel();
+            }
+        });
+    }
+
+    /**
+     * @param filter Filter.
+     * @param grp Optional cluster group.
      * @return Cursor.
      */
     @SuppressWarnings("unchecked")
@@ -665,6 +716,36 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
+        A.notNull(qry, "qry");
+        A.notNull(transformer, "transformer");
+
+        if (!(qry instanceof ScanQuery))
+            throw new UnsupportedOperationException("Transformers are supported only for SCAN queries.");
+
+        GridCacheGateway<K, V> gate = this.gate;
+
+        CacheOperationContext prev = onEnter(gate, opCtx);
+
+        try {
+            ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+            validate(qry);
+
+            return query((ScanQuery<K, V>)qry, projection(qry.isLocal()), transformer);
+        }
+        catch (Exception e) {
+            if (e instanceof CacheException)
+                throw (CacheException)e;
+
+            throw new CacheException(e);
+        }
+        finally {
+            onLeave(gate, prev);
+        }
+    }
+
     /**
      * @return {@code true} If this is a replicated cache and we are on a data node.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5730c06a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index df95e2e..786052a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -38,6 +38,7 @@ import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
@@ -1337,13 +1339,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             try {
                 // Preparing query closures.
-                IgniteClosure<Map.Entry<K, V>, Object> trans =
-                    (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
-
-                IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
+                IgniteClosure<Cache.Entry<K, V>, Object> trans =
+                    (IgniteClosure<Cache.Entry<K, V>, Object>)qryInfo.transformer();
 
                 injectResources(trans);
-                injectResources(rdc);
 
                 GridCacheQueryAdapter<?> qry = qryInfo.query();
 
@@ -1504,27 +1503,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                         }
                     }
 
-                    Map.Entry<K, V> entry = F.t(key, val);
-
-                    // Unwrap entry for reducer or transformer only.
-                    if (rdc != null || trans != null)
-                        entry = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(entry, qry.keepBinary());
-
-                    // Reduce.
-                    if (rdc != null) {
-                        if (!rdc.collect(entry) || !iter.hasNext()) {
-                            onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
+                    // Unwrap entry for transformer only.
+                    if (trans != null) {
+                        key = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
+                        val = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
 
-                            pageSent = true;
-
-                            break;
-                        }
-                        else
-                            continue;
+                        data.add(trans.apply(new CacheEntryImpl<>(key, val)));
                     }
-
-                    data.add(trans != null ? trans.apply(entry) :
-                        !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
+                    else
+                        data.add(!loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
 
                     if (!loc) {
                         if (++cnt == pageSize || !iter.hasNext()) {
@@ -1548,12 +1535,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     }
                 }
 
-                if (!pageSent) {
-                    if (rdc == null)
-                        onPageReady(loc, qryInfo, data, true, null);
-                    else
-                        onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
-                }
+                if (!pageSent)
+                    onPageReady(loc, qryInfo, data, true, null);
             }
             catch (Throwable e) {
                 if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))

http://git-wip-us.apache.org/repos/asf/ignite/blob/5730c06a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
new file mode 100644
index 0000000..6b13e05
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SpiQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for scan query with transformer.
+ */
+public class GridCacheQueryTransformerSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+        cfg.setMarshaller(null);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(3);
+
+        Ignition.setClientMode(true);
+
+        startGrid();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetKeys() throws Exception {
+        IgniteCache<Integer, String> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, "val" + i);
+
+            IgniteClosure<Cache.Entry<Integer, String>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, String>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, String> e) {
+                        return e.getKey();
+                    }
+                };
+
+            List<Integer> keys = cache.query(new ScanQuery<Integer, String>(), transformer).getAll();
+
+            assertEquals(50, keys.size());
+
+            Collections.sort(keys);
+
+            for (int i = 0; i < 50; i++)
+                assertEquals(i, keys.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetKeysFiltered() throws Exception {
+        IgniteCache<Integer, String> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, "val" + i);
+
+            IgniteBiPredicate<Integer, String> filter = new IgniteBiPredicate<Integer, String>() {
+                @Override public boolean apply(Integer k, String v) {
+                    return k % 10 == 0;
+                }
+            };
+
+            IgniteClosure<Cache.Entry<Integer, String>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, String>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, String> e) {
+                        return e.getKey();
+                    }
+                };
+
+            List<Integer> keys = cache.query(new ScanQuery<>(filter), transformer).getAll();
+
+            assertEquals(5, keys.size());
+
+            Collections.sort(keys);
+
+            for (int i = 0; i < 5; i++)
+                assertEquals(i * 10, keys.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetObjectField() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+                        return e.getValue().idx;
+                    }
+                };
+
+            List<Integer> res = cache.query(new ScanQuery<Integer, Value>(), transformer).getAll();
+
+            assertEquals(50, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 50; i++)
+                assertEquals(i * 100, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetObjectFieldFiltered() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            IgniteBiPredicate<Integer, Value> filter = new IgniteBiPredicate<Integer, Value>() {
+                @Override public boolean apply(Integer k, Value v) {
+                    return v.idx % 1000 == 0;
+                }
+            };
+
+            IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+                        return e.getValue().idx;
+                    }
+                };
+
+            List<Integer> res = cache.query(new ScanQuery<>(filter), transformer).getAll();
+
+            assertEquals(5, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 5; i++)
+                assertEquals(i * 1000, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testKeepBinary() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            IgniteCache<Integer, BinaryObject> binaryCache = cache.withKeepBinary();
+
+            IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+                        return e.getValue().field("idx");
+                    }
+                };
+
+            List<Integer> res = binaryCache.query(new ScanQuery<Integer, BinaryObject>(), transformer).getAll();
+
+            assertEquals(50, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 50; i++)
+                assertEquals(i * 100, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testKeepBinaryFiltered() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            IgniteCache<Integer, BinaryObject> binaryCache = cache.withKeepBinary();
+
+            IgniteBiPredicate<Integer, BinaryObject> filter = new IgniteBiPredicate<Integer, BinaryObject>() {
+                @Override public boolean apply(Integer k, BinaryObject v) {
+                    return v.<Integer>field("idx") % 1000 == 0;
+                }
+            };
+
+            IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+                        return e.getValue().field("idx");
+                    }
+                };
+
+            List<Integer> res = binaryCache.query(new ScanQuery<>(filter), transformer).getAll();
+
+            assertEquals(5, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 5; i++)
+                assertEquals(i * 1000, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocal() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public List<Integer> call() throws Exception {
+                    IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+                        new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+                            @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+                                return e.getValue().idx;
+                            }
+                        };
+
+                    return ignite.cache("test-cache").query(new ScanQuery<Integer, Value>().setLocal(true),
+                        transformer).getAll();
+                }
+            });
+
+            List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+            assertEquals(50, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 50; i++)
+                assertEquals(i * 100, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalFiltered() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public List<Integer> call() throws Exception {
+                    IgniteBiPredicate<Integer, Value> filter = new IgniteBiPredicate<Integer, Value>() {
+                        @Override public boolean apply(Integer k, Value v) {
+                            return v.idx % 1000 == 0;
+                        }
+                    };
+
+                    IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+                        new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+                            @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+                                return e.getValue().idx;
+                            }
+                        };
+
+                    return ignite.cache("test-cache").query(new ScanQuery<>(filter).setLocal(true),
+                        transformer).getAll();
+                }
+            });
+
+            List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+            assertEquals(5, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 5; i++)
+                assertEquals(i * 1000, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalKeepBinary() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public List<Integer> call() throws Exception {
+                    IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+                        new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+                            @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+                                return e.getValue().field("idx");
+                            }
+                        };
+
+                    return ignite.cache("test-cache").withKeepBinary().query(
+                        new ScanQuery<Integer, BinaryObject>().setLocal(true), transformer).getAll();
+                }
+            });
+
+            List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+            assertEquals(50, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 50; i++)
+                assertEquals(i * 100, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalKeepBinaryFiltered() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public List<Integer> call() throws Exception {
+                    IgniteBiPredicate<Integer, BinaryObject> filter = new IgniteBiPredicate<Integer, BinaryObject>() {
+                        @Override public boolean apply(Integer k, BinaryObject v) {
+                            return v.<Integer>field("idx") % 1000 == 0;
+                        }
+                    };
+
+                    IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+                        new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+                            @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+                                return e.getValue().field("idx");
+                            }
+                        };
+
+                    return ignite.cache("test-cache").withKeepBinary().query(new ScanQuery<>(filter).setLocal(true),
+                        transformer).getAll();
+                }
+            });
+
+            List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+            assertEquals(5, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 5; i++)
+                assertEquals(i * 1000, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUnsupported() throws Exception {
+        final IgniteCache<Integer, Integer> cache = grid().createCache("test-cache");
+
+        final IgniteClosure<Cache.Entry<Integer, Integer>, Integer> transformer =
+            new IgniteClosure<Cache.Entry<Integer, Integer>, Integer>() {
+                @Override public Integer apply(Cache.Entry<Integer, Integer> e) {
+                    return null;
+                }
+            };
+
+        try {
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.query(new SqlQuery<Integer, Integer>(Integer.class, "clause"), transformer);
+
+                        return null;
+                    }
+                },
+                UnsupportedOperationException.class,
+                "Transformers are supported only for SCAN queries."
+            );
+
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.query(new SqlFieldsQuery("clause"), new IgniteClosure<List<?>, Object>() {
+                            @Override public Object apply(List<?> objects) {
+                                return null;
+                            }
+                        });
+
+                        return null;
+                    }
+                },
+                UnsupportedOperationException.class,
+                "Transformers are supported only for SCAN queries."
+            );
+
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.query(new TextQuery<Integer, Integer>(Integer.class, "clause"), transformer);
+
+                        return null;
+                    }
+                },
+                UnsupportedOperationException.class,
+                "Transformers are supported only for SCAN queries."
+            );
+
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.query(new SpiQuery<Integer, Integer>(), transformer);
+
+                        return null;
+                    }
+                },
+                UnsupportedOperationException.class,
+                "Transformers are supported only for SCAN queries."
+            );
+
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.query(new ContinuousQuery<Integer, Integer>(), transformer);
+
+                        return null;
+                    }
+                },
+                UnsupportedOperationException.class,
+                "Transformers are supported only for SCAN queries."
+            );
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     */
+    private static class Value {
+        /** */
+        @SuppressWarnings("unused")
+        private String str;
+
+        /** */
+        private int idx;
+
+        /**
+         * @param str String.
+         * @param idx Integer.
+         */
+        public Value(String str, int idx) {
+            this.str = str;
+            this.idx = idx;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5730c06a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index 7286fb3..d8b3c2e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
 import org.apache.ignite.resources.IgniteInstanceResource;
@@ -167,6 +168,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
+        throw new UnsupportedOperationException("Method should be supported.");
+    }
+
+    /** {@inheritDoc} */
     @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
         return compute.call(new LocalEntriesTask<K, V>(cacheName, isAsync, peekModes));
     }
@@ -1486,4 +1492,4 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
             return async ? cache.withAsync() : cache;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5730c06a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 0aa3560..b6b3463 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryTransformerSelfTest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedAtomicTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedTxTest;
@@ -177,6 +178,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheQuerySerializationSelfTest.class);
         suite.addTestSuite(IgniteBinaryObjectFieldsQuerySelfTest.class);
         suite.addTestSuite(IgniteBinaryWrappedObjectFieldsQuerySelfTest.class);
+        suite.addTestSuite(GridCacheQueryTransformerSelfTest.class);
 
         // Scan queries.
         suite.addTestSuite(CacheScanPartitionQueryFallbackSelfTest.class);