You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2020/07/07 13:42:38 UTC
[ignite] branch ignite-2.9 updated: IGNITE-13212 Fixed p2p
deployment of scan queries transformer class - Fixes #8000.
This is an automated email from the ASF dual-hosted git repository.
ilyak pushed a commit to branch ignite-2.9
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.9 by this push:
new 8faaae9 IGNITE-13212 Fixed p2p deployment of scan queries transformer class - Fixes #8000.
8faaae9 is described below
commit 8faaae93199555ee170ffecc356ebde3b5dc93f8
Author: Sergey Chugunov <sc...@gridgain.com>
AuthorDate: Tue Jul 7 16:07:07 2020 +0300
IGNITE-13212 Fixed p2p deployment of scan queries transformer class - Fixes #8000.
Signed-off-by: Ilya Kasnacheev <il...@gmail.com>
---
.../query/GridCacheDistributedQueryManager.java | 5 +-
...GridP2PComputeWithNestedEntryProcessorTest.java | 4 +-
.../p2p/GridP2PScanQueryWithTransformerTest.java | 406 +++++++++++++++++++++
.../ignite/testsuites/IgniteP2PSelfTestSuite.java | 4 +-
.../tests/p2p/cache/ScanQueryTestTransformer.java | 42 +++
.../p2p/cache/ScanQueryTestTransformerWrapper.java | 41 +++
6 files changed, 498 insertions(+), 4 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index b03909f..3abebf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -539,6 +539,9 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
Boolean dataPageScanEnabled = qry.query().isDataPageScanEnabled();
MvccSnapshot mvccSnapshot = qry.query().mvccSnapshot();
+ boolean deployFilterOrTransformer = (qry.query().scanFilter() != null || qry.query().transform() != null)
+ && cctx.gridDeploy().enabled();
+
final GridCacheQueryRequest req = new GridCacheQueryRequest(
cctx.cacheId(),
reqId,
@@ -562,7 +565,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
queryTopologyVersion(),
mvccSnapshot,
// Force deployment anyway if scan query is used.
- cctx.deploymentEnabled() || (qry.query().scanFilter() != null && cctx.gridDeploy().enabled()),
+ cctx.deploymentEnabled() || deployFilterOrTransformer,
dataPageScanEnabled);
addQueryFuture(req.id(), fut);
diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java
index f3b1731..4959c0d 100644
--- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java
@@ -136,7 +136,7 @@ public class GridP2PComputeWithNestedEntryProcessorTest extends GridCommonAbstra
assertTrue(key >= ENTRIES || res);
}
- scnaCacheData(cache);
+ scanCacheData(cache);
}
}
}
@@ -183,7 +183,7 @@ public class GridP2PComputeWithNestedEntryProcessorTest extends GridCommonAbstra
* @param cache Ignite cache.
* @throws Exception If failed.
*/
- private void scnaCacheData(IgniteCache cache) throws Exception {
+ private void scanCacheData(IgniteCache cache) throws Exception {
scanByCopositeFirstPredicate(cache);
scanByCopositeSecondPredicate(cache);
scanByCopositeFirstSecondPredicate(cache);
diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PScanQueryWithTransformerTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PScanQueryWithTransformerTest.java
new file mode 100644
index 0000000..28a1e6f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PScanQueryWithTransformerTest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.p2p;
+
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.config.GridTestProperties;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class GridP2PScanQueryWithTransformerTest extends GridCommonAbstractTest {
+ /** Test class loader. */
+ private static final ClassLoader TEST_CLASS_LOADER;
+
+ /** Name of explicit class used as a Transformer for Scan Query. */
+ private static final String TRANSFORMER_CLASS_NAME = "org.apache.ignite.tests.p2p.cache.ScanQueryTestTransformer";
+
+ /** Name of class-wrapper for anonymous class used as a Transformer for Scan Query. */
+ private static final String TRANSFORMER_CLO_WRAPPER_CLASS_NAME =
+ "org.apache.ignite.tests.p2p.cache.ScanQueryTestTransformerWrapper";
+
+ /** */
+ private static final int SCALE_FACTOR = 7;
+
+ /** */
+ private static final int CACHE_SIZE = 10;
+
+ /** Initialize ClassLoader. */
+ static {
+ try {
+ TEST_CLASS_LOADER = new URLClassLoader(
+ new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))},
+ GridP2PScanQueryWithTransformerTest.class.getClassLoader());
+ }
+ catch (MalformedURLException e) {
+ throw new RuntimeException("Define property p2p.uri.cls", e);
+ }
+ }
+
+ /** */
+ private boolean p2pEnabled;
+
+ /** */
+ private ClassLoader clsLoader;
+
+ /** */
+ private IgniteLogger logger;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setPeerClassLoadingEnabled(p2pEnabled);
+ cfg.setClassLoader(clsLoader);
+ if (logger != null)
+ cfg.setGridLogger(logger);
+
+ return cfg;
+ }
+
+ /**
+ * Verifies that Scan Query Transformer is loaded by p2p mechanism <b>from client node</b>
+ * when it is missing on server's classpath.
+ *
+ * Scan Query result set is examined by iteration over Cursor.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testScanQueryCursorFromClientNodeWithExplicitClass() throws Exception {
+ p2pEnabled = true;
+
+ executeP2PClassLoadingEnabledTest(true);
+ }
+
+ /**
+ * Verifies that Scan Query Transformer is loaded by p2p mechanism <b>from another server node</b>
+ * when it is missing on server's classpath.
+ *
+ * Scan Query result set is examined by iteration over Cursor.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testScanQueryCursorFromServerNodeWithExplicitClass() throws Exception {
+ p2pEnabled = true;
+
+ executeP2PClassLoadingEnabledTest(false);
+ }
+
+ /**
+ * Verifies that Scan Query Transformer is loaded by p2p mechanism <b>from client node</b>
+ * when it is missing on server's classpath.
+ *
+ * Scan Query result set is examined by getAll call from Cursor..
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testScanQueryGetAllFromClientNodeWithExplicitClass() throws Exception {
+ p2pEnabled = true;
+
+ IgniteEx ig0 = startGrid(0);
+
+ IgniteCache<Integer, Integer> cache = ig0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ populateCache(cache);
+
+ IgniteEx client = startClientGrid(1);
+
+ IgniteCache<Object, Object> clientCache = client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ QueryCursor<Integer> query = clientCache.query(new ScanQuery<Integer, Integer>(), loadTransformerClass());
+
+ List<Integer> results = query.getAll();
+
+ assertNotNull(results);
+ assertEquals(CACHE_SIZE, results.size());
+ }
+
+ /**
+ * Verifies that Scan Query Transformer is loaded by p2p mechanism <b>from client node</b>
+ * when it is missing on server's classpath.
+ *
+ * Transformer class is implemented as an anonymous instance of {@link IgniteClosure} interface.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testScanQueryCursorFromClientNodeWithAnonymousClass() throws Exception {
+ p2pEnabled = true;
+
+ IgniteEx ig0 = startGrid(0);
+
+ IgniteCache<Integer, Integer> cache = ig0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ int sumPopulated = populateCache(cache);
+
+ IgniteEx client = startClientGrid(1);
+
+ IgniteCache<Object, Object> clientCache = client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ QueryCursor<Integer> query = clientCache.query(new ScanQuery<Integer, Integer>(), loadTransformerClosure());
+
+ int sumQueried = 0;
+
+ for (Integer val : query)
+ sumQueried += val;
+
+ assertTrue(sumQueried == sumPopulated * SCALE_FACTOR);
+ }
+
+ /**
+ * Verifies that execution <b>from client node</b> of Scan Query with Transformer
+ * fails if Transformer class is missing on server node and p2p class loading is disabled.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testScanQueryFromClientFailsIfP2PClassLoadingIsDisabled() throws Exception {
+ p2pEnabled = false;
+
+ executeP2PClassLoadingDisabledTest(true);
+ }
+
+ /**
+ * Verifies that execution <b>from server node</b> of Scan Query with Transformer
+ * fails if Transformer class is missing on server node and p2p class loading is disabled.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testScanQueryFromServerFailsIfP2PClassLoadingIsDisabled() throws Exception {
+ p2pEnabled = false;
+
+ executeP2PClassLoadingDisabledTest(false);
+ }
+
+ /**
+ * Verifies that deployment isn't needed if Transformer class is available on classpath of all nodes.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSharedTransformerWorksWhenP2PIsDisabled() throws Exception {
+ p2pEnabled = false;
+
+ IgniteEx ig0 = startGrid(0);
+
+ IgniteCache<Integer, Integer> cache = ig0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ populateCache(cache);
+
+ IgniteEx client = startClientGrid(1);
+
+ IgniteCache<Object, Object> clientCache = client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ QueryCursor<Integer> query = clientCache.query(new ScanQuery<Integer, Integer>(),
+ new SharedTransformer(SCALE_FACTOR));
+
+ List<Integer> results = query.getAll();
+
+ assertNotNull(results);
+ assertEquals(CACHE_SIZE, results.size());
+ }
+
+ /**
+ * Executes scenario with successful p2p loading of Transformer class
+ * with client or server node sending Scan Query request and iterating over result set.
+ *
+ * @param withClientNode Flag to execute scan query from client or server node.
+ * @throws Exception If failed.
+ */
+ private void executeP2PClassLoadingEnabledTest(boolean withClientNode) throws Exception {
+ ListeningTestLogger listeningLogger = new ListeningTestLogger();
+ LogListener clsDeployedMsgLsnr = LogListener.matches(
+ "Class was deployed in SHARED or CONTINUOUS mode: " +
+ "class org.apache.ignite.tests.p2p.cache.ScanQueryTestTransformer")
+ .build();
+ listeningLogger.registerListener(clsDeployedMsgLsnr);
+ logger = listeningLogger;
+
+ IgniteEx ig0 = startGrid(0);
+
+ logger = null;
+
+ IgniteCache<Integer, Integer> cache = ig0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ int sumPopulated = populateCache(cache);
+
+ IgniteEx requestingNode;
+
+ if (withClientNode)
+ requestingNode = startClientGrid(1);
+ else {
+ clsLoader = TEST_CLASS_LOADER;
+ requestingNode = startGrid(1);
+ }
+
+ IgniteCache<Object, Object> reqNodeCache = requestingNode.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ QueryCursor<Integer> query = reqNodeCache.query(new ScanQuery<Integer, Integer>(), loadTransformerClass());
+
+ int sumQueried = 0;
+
+ for (Integer val : query)
+ sumQueried += val;
+
+ assertTrue(sumQueried == sumPopulated * SCALE_FACTOR);
+ assertTrue(clsDeployedMsgLsnr.check());
+ }
+
+ /**
+ * Executes scenario with p2p loading of Transformer class failed
+ * with client or server node sending Scan Query request.
+ *
+ * @param withClientNode Flag to execute scan query from client or server node.
+ * @throws Exception If test scenario failed.
+ */
+ private void executeP2PClassLoadingDisabledTest(boolean withClientNode) throws Exception {
+ ListeningTestLogger listeningLogger = new ListeningTestLogger();
+ LogListener clsDeployedMsgLsnr = LogListener.matches(
+ "Class was deployed in SHARED or CONTINUOUS mode: " +
+ "class org.apache.ignite.tests.p2p.cache.ScanQueryTestTransformerWrapper")
+ .build();
+ listeningLogger.registerListener(clsDeployedMsgLsnr);
+ logger = listeningLogger;
+
+ IgniteEx ig0 = startGrid(0);
+
+ logger = null;
+
+ IgniteCache<Integer, Integer> cache = ig0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ populateCache(cache);
+
+ IgniteEx requestNode;
+
+ if (withClientNode)
+ requestNode = startClientGrid(1);
+ else {
+ clsLoader = TEST_CLASS_LOADER;
+ requestNode = startGrid(1);
+ }
+
+ IgniteCache<Object, Object> reqNodeCache = requestNode.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ QueryCursor<Integer> query = reqNodeCache.query(new ScanQuery<Integer, Integer>(), loadTransformerClosure());
+
+ try {
+ List<Integer> all = query.getAll();
+ }
+ catch (Exception e) {
+ //No-op.
+
+ checkTopology(2);
+
+ assertFalse(clsDeployedMsgLsnr.check());
+
+ return;
+ }
+
+ fail("Expected exception on executing scan query hasn't been not thrown.");
+ }
+
+ /**
+ * @param cache Cache to populate.
+ */
+ private int populateCache(IgniteCache cache) {
+ int sum = 0;
+
+ for (int i = 0; i < CACHE_SIZE; i++) {
+ sum += i;
+
+ cache.put(i, i);
+ }
+
+ return sum;
+ }
+
+ /**
+ * Loads class for query transformer from another package so server doesn't have access to it.
+ *
+ * @return Instance of transformer class.
+ * @throws Exception If load has failed.
+ */
+ private IgniteClosure loadTransformerClass() throws Exception {
+ Constructor ctor = TEST_CLASS_LOADER.loadClass(TRANSFORMER_CLASS_NAME).getConstructor(int.class);
+
+ return (IgniteClosure)ctor.newInstance(SCALE_FACTOR);
+ }
+
+ /**
+ * Loads anonymous class for query transformer from another package
+ * so executing server node doesn't have access to it.
+ *
+ * @return Instance of anonymous class implementing {@link IgniteClosure} to be used as a transformer.
+ * @throws Exception If load has failed.
+ */
+ private IgniteClosure loadTransformerClosure() throws Exception {
+ Constructor<?> ctor = TEST_CLASS_LOADER.loadClass(TRANSFORMER_CLO_WRAPPER_CLASS_NAME).getConstructor(int.class);
+
+ Object wrapper = ctor.newInstance(SCALE_FACTOR);
+
+ return GridTestUtils.getFieldValue(wrapper, "clo");
+ }
+
+ /** */
+ private static final class SharedTransformer implements IgniteClosure<Cache.Entry<Integer, Integer>, Integer> {
+ /** */
+ private final int scaleFactor;
+
+ /** */
+ private SharedTransformer(int factor) {
+ scaleFactor = factor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer apply(Cache.Entry<Integer, Integer> entry) {
+ return entry.getValue() * scaleFactor;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
index 9fdc3fb..a054104 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
@@ -33,6 +33,7 @@ import org.apache.ignite.p2p.GridP2PNodeLeftSelfTest;
import org.apache.ignite.p2p.GridP2PRecursionTaskSelfTest;
import org.apache.ignite.p2p.GridP2PRemoteClassLoadersSelfTest;
import org.apache.ignite.p2p.GridP2PSameClassLoaderSelfTest;
+import org.apache.ignite.p2p.GridP2PScanQueryWithTransformerTest;
import org.apache.ignite.p2p.GridP2PTimeoutSelfTest;
import org.apache.ignite.p2p.GridP2PUndeploySelfTest;
import org.apache.ignite.p2p.P2PScanQueryUndeployTest;
@@ -67,7 +68,8 @@ import org.junit.runners.Suite;
P2PScanQueryUndeployTest.class,
GridDeploymentMessageCountSelfTest.class,
GridP2PComputeWithNestedEntryProcessorTest.class,
- GridP2PCountTiesLoadClassDirectlyFromClassLoaderTest.class
+ GridP2PCountTiesLoadClassDirectlyFromClassLoaderTest.class,
+ GridP2PScanQueryWithTransformerTest.class
})
public class IgniteP2PSelfTestSuite {
}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformer.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformer.java
new file mode 100644
index 0000000..8647f6c
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.cache;
+
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.lang.IgniteClosure;
+
+/**
+ * Test class to verify p2p class loading for transformer of Scan Query
+ * (see {@link IgniteCache#query(Query, IgniteClosure)}).
+ */
+public class ScanQueryTestTransformer implements IgniteClosure<Cache.Entry<Integer, Integer>, Integer> {
+ /** */
+ private final int scaleFactor;
+
+ /** */
+ public ScanQueryTestTransformer(int scaleFactor) {
+ this.scaleFactor = scaleFactor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer apply(Cache.Entry<Integer, Integer> entry) {
+ return entry.getValue() * scaleFactor;
+ }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformerWrapper.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformerWrapper.java
new file mode 100644
index 0000000..18152b5
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformerWrapper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.cache;
+
+import javax.cache.Cache;
+import org.apache.ignite.lang.IgniteClosure;
+
+/** */
+public class ScanQueryTestTransformerWrapper {
+ /** */
+ private final int scaleFactor;
+
+ /** */
+ private final IgniteClosure<Cache.Entry<Integer, Integer>, Integer> clo = new IgniteClosure<Cache.Entry<Integer, Integer>, Integer>() {
+ @Override public Integer apply(Cache.Entry<Integer, Integer> entry) {
+ return entry.getValue() * scaleFactor;
+ }
+ };
+
+ /**
+ * @param scaleFactor Scale factor.
+ */
+ public ScanQueryTestTransformerWrapper(int scaleFactor) {
+ this.scaleFactor = scaleFactor;
+ }
+}