You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/04/16 13:04:15 UTC
[ignite] branch master updated: IGNITE-11698 Fixed invalid
classloader select for p2p - Fixes #6421.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 426e25a IGNITE-11698 Fixed invalid classloader select for p2p - Fixes #6421.
426e25a is described below
commit 426e25aa4c8f71f0e0339ed9866775922d57901c
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Apr 16 15:48:58 2019 +0300
IGNITE-11698 Fixed invalid classloader select for p2p - Fixes #6421.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../cache/GridCacheDeploymentManager.java | 62 ++++--
...GridP2PComputeWithNestedEntryProcessorTest.java | 246 +++++++++++++++++++++
.../ignite/testsuites/IgniteP2PSelfTestSuite.java | 2 +
...heDeploymenComputeWithNestedEntryProcessor.java | 77 +++++++
.../tests/p2p/pedicates/BinaryPredicate.java | 49 ++++
.../tests/p2p/pedicates/CompositePredicate.java | 53 +++++
.../p2p/pedicates/FirstConsideredPredicate.java | 33 +++
.../p2p/pedicates/SecondConsideredPredicate.java | 33 +++
8 files changed, 537 insertions(+), 18 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
index 7bba1de..f0f2ed3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
@@ -92,6 +92,9 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
/** */
private boolean depEnabled;
+ /** Class loader id for local thread. */
+ private ThreadLocal<IgniteUuid> localLdrId = new ThreadLocal<>();
+
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
globalLdr = new CacheClassLoader(cctx.gridConfig().getClassLoader());
@@ -365,6 +368,8 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
DeploymentMode mode,
Map<UUID, IgniteUuid> participants
) {
+ localLdrId.set(ldrId);
+
assert depEnabled;
if (mode == PRIVATE || mode == ISOLATED) {
@@ -803,31 +808,25 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
}
}
- for (CachedDeploymentInfo<K, V> t : deps.values()) {
- UUID sndId = t.senderId();
- IgniteUuid ldrId = t.loaderId();
- String userVer = t.userVersion();
- DeploymentMode mode = t.mode();
- Map<UUID, IgniteUuid> participants = t.participants();
-
- GridDeployment d = cctx.gridDeploy().getGlobalDeployment(
- mode,
- name,
- name,
- userVer,
- sndId,
- ldrId,
- participants,
- F.<ClusterNode>alwaysTrue());
+ IgniteUuid curLdrId = localLdrId.get();
- if (d != null) {
- Class cls = d.deployedClass(name);
+ if (curLdrId != null) {
+ CachedDeploymentInfo<K, V> t = deps.get(curLdrId);
+
+ if (t != null) {
+ Class<?> cls = tryToloadClassFromCacheDep(name, t);
if (cls != null)
return cls;
}
}
+ for (CachedDeploymentInfo<K, V> t : deps.values()) {
+ Class<?> cls = tryToloadClassFromCacheDep(name, t);
+ if (cls != null)
+ return cls;
+ }
+
Class cls = getParent().loadClass(name);
if (cls != null)
@@ -837,6 +836,33 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param name Name of resource.
+ * @param deploymentInfo Grid cached deployment info.
+ * @return Class if can to load resource with the <code>name</code> or {@code null} otherwise.
+ */
+ @Nullable private Class<?> tryToloadClassFromCacheDep(String name, CachedDeploymentInfo<K, V> deploymentInfo) {
+ UUID sndId = deploymentInfo.senderId();
+ IgniteUuid ldrId = deploymentInfo.loaderId();
+ String userVer = deploymentInfo.userVersion();
+ DeploymentMode mode = deploymentInfo.mode();
+ Map<UUID, IgniteUuid> participants = deploymentInfo.participants();
+
+ GridDeployment d = cctx.gridDeploy().getGlobalDeployment(
+ mode,
+ name,
+ name,
+ userVer,
+ sndId,
+ ldrId,
+ participants,
+ F.<ClusterNode>alwaysTrue());
+
+ Class cls = d != null ? d.deployedClass(name) : null;
+
+ return cls;
+ }
+
+ /**
* @param name Name of the class.
* @return {@code True} if locally excluded.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java
new file mode 100644
index 0000000..c848dc2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.p2p;
+
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.config.GridTestProperties;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonTest;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Test;
+
+/**
+ * P2P test.
+ */
+@GridCommonTest(group = "P2P")
+public class GridP2PComputeWithNestedEntryProcessorTest extends GridCommonAbstractTest {
+ /** URL of classes. */
+ private static final ClassLoader TEST_CLASS_LOADER;
+
+ /** Deleted filter name. */
+ private static final String FIRST_FILTER_NAME = "org.apache.ignite.tests.p2p.pedicates.FirstConsideredPredicate";
+
+ /** Deleted filter name. */
+ private static final String SECOND_FILTER_NAME = "org.apache.ignite.tests.p2p.pedicates.SecondConsideredPredicate";
+
+ /** Composite filter name. */
+ private static final String COMPOSITE_FILTER_NAME = "org.apache.ignite.tests.p2p.pedicates.CompositePredicate";
+
+ /** Cache entry processor. */
+ private static final String COMPUTE_WITH_NESTED_PROCESSOR = "org.apache.ignite.tests.p2p.CacheDeploymenComputeWithNestedEntryProcessor";
+
+ /** Entries. */
+ public static final int ENTRIES = 100;
+
+ /** Current deployment mode. Used in {@link #getConfiguration(String)}. */
+ private DeploymentMode depMode;
+
+ /** */
+ private static final TcpDiscoveryIpFinder FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Initialize ClassLoader. */
+ static {
+ try {
+ TEST_CLASS_LOADER = new URLClassLoader(
+ new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))},
+ GridP2PComputeWithNestedEntryProcessorTest.class.getClassLoader());
+ }
+ catch (MalformedURLException e) {
+ throw new RuntimeException("Define property p2p.uri.cls", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+ .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME))
+ .setDeploymentMode(depMode);
+
+ if (igniteInstanceName.startsWith("client"))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /**
+ * Test GridDeploymentMode.CONTINUOUS mode.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testContinuousMode() throws Exception {
+ depMode = DeploymentMode.CONTINUOUS;
+
+ processTest();
+ }
+
+ /**
+ * Test GridDeploymentMode.SHARED mode.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSharedMode() throws Exception {
+ depMode = DeploymentMode.SHARED;
+
+ processTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void processTest() throws Exception {
+ try {
+ Ignite ignite = startGrids(2);
+
+ createAndLoadCache(ignite);
+
+ for (int i = 0; i < 10; i++) {
+ try (Ignite client = startGrid("client")) {
+
+ IgniteCache cache = client.cache(DEFAULT_CACHE_NAME).withKeepBinary();
+
+ Integer key = primaryKey(ignite(1).cache(DEFAULT_CACHE_NAME));
+
+ for (Boolean res : runJob(client, 10_000, DEFAULT_CACHE_NAME, key)) {
+ assertTrue(res);
+ }
+
+ scnaCacheData(cache);
+ }
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ @NotNull private void createAndLoadCache(Ignite ignite) {
+ IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary();
+
+ for (int i = 0; i < ENTRIES; i++) {
+ if (i % 2 == 0) {
+ cache.put(i, ignite.binary()
+ .builder("boType")
+ .build());
+ }
+ else
+ cache.put(i, ignite.binary()
+ .builder("boType")
+ .setField("isDeleted", true)
+ .build());
+ }
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param timeout Timeout.
+ * @param param Parameter.
+ * @throws Exception If failed.
+ */
+ private Collection<Boolean> runJob(Ignite ignite, long timeout,
+ String cacheName, int param) throws Exception {
+ Constructor ctor = TEST_CLASS_LOADER.loadClass(COMPUTE_WITH_NESTED_PROCESSOR)
+ .getConstructor(String.class, int.class);
+
+ return ignite.compute().withTimeout(timeout).broadcast((IgniteCallable<Boolean>)ctor.newInstance(cacheName, param));
+ }
+
+ /**
+ * @param cache Ignite cache.
+ * @throws Exception If failed.
+ */
+ private void scnaCacheData(IgniteCache cache) throws Exception {
+ scanByCopositeFirstPredicate(cache);
+ scanByCopositeSecondPredicate(cache);
+ scanByCopositeFirstSecondPredicate(cache);
+ }
+
+ /**
+ * @param cache Ignite cache.
+ * @throws Exception If failed.
+ */
+ private void scanByCopositeFirstPredicate(IgniteCache cache) throws Exception {
+ IgniteBiPredicate firstFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(FIRST_FILTER_NAME)
+ .getConstructor().newInstance();
+ IgniteBiPredicate compositeFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME)
+ .getConstructor().newInstance();
+
+ U.invoke(TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME), compositeFilter, "addPredicate", firstFilter);
+
+ List list = cache.query(new ScanQuery().setFilter(compositeFilter)).getAll();
+
+ assertEquals(list.size(), ENTRIES / 2);
+ }
+
+ /**
+ * @param cache Ignite chache.
+ * @throws Exception If failed.
+ */
+ private void scanByCopositeSecondPredicate(IgniteCache cache) throws Exception {
+ IgniteBiPredicate secondFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(SECOND_FILTER_NAME)
+ .getConstructor().newInstance();
+ IgniteBiPredicate compositeFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME)
+ .getConstructor().newInstance();
+
+ U.invoke(TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME), compositeFilter, "addPredicate", secondFilter);
+
+ List list = cache.query(new ScanQuery().setFilter(compositeFilter)).getAll();
+
+ assertEquals(list.size(), ENTRIES / 2);
+ }
+
+ /**
+ * @param cache Ignite cache.
+ * @throws Exception If failed.
+ */
+ private void scanByCopositeFirstSecondPredicate(IgniteCache cache) throws Exception {
+ IgniteBiPredicate firstFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(FIRST_FILTER_NAME)
+ .getConstructor().newInstance();
+ IgniteBiPredicate secondFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(SECOND_FILTER_NAME)
+ .getConstructor().newInstance();
+ IgniteBiPredicate compositeFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME)
+ .getConstructor().newInstance();
+
+ U.invoke(TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME), compositeFilter, "addPredicate", firstFilter);
+ U.invoke(TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME), compositeFilter, "addPredicate", secondFilter);
+
+ List list = cache.query(new ScanQuery().setPageSize(10).setFilter(compositeFilter)).getAll();
+
+ assertEquals(list.size(), 0);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
index 85010c9..6651a69 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import org.apache.ignite.internal.managers.deployment.GridDeploymentMessageCountSelfTest;
import org.apache.ignite.p2p.DeploymentClassLoaderCallableTest;
import org.apache.ignite.p2p.GridP2PClassLoadingSelfTest;
+import org.apache.ignite.p2p.GridP2PComputeWithNestedEntryProcessorTest;
import org.apache.ignite.p2p.GridP2PContinuousDeploymentSelfTest;
import org.apache.ignite.p2p.GridP2PDifferentClassLoaderSelfTest;
import org.apache.ignite.p2p.GridP2PDoubleDeploymentSelfTest;
@@ -64,6 +65,7 @@ import org.junit.runners.Suite;
SharedDeploymentTest.class,
P2PScanQueryUndeployTest.class,
GridDeploymentMessageCountSelfTest.class,
+ GridP2PComputeWithNestedEntryProcessorTest.class,
})
public class IgniteP2PSelfTestSuite {
}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymenComputeWithNestedEntryProcessor.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymenComputeWithNestedEntryProcessor.java
new file mode 100644
index 0000000..cf4e433
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymenComputeWithNestedEntryProcessor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ *
+ */
+public class CacheDeploymenComputeWithNestedEntryProcessor implements IgniteCallable<Boolean> {
+ /** */
+ @IgniteInstanceResource
+ Ignite ignite;
+
+ /** Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Key. */
+ private int key;
+
+ /** Cache name. */
+ private String cacheName;
+
+ /**
+ * Default constructor.
+ */
+ public CacheDeploymenComputeWithNestedEntryProcessor() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param key Key.
+ */
+ public CacheDeploymenComputeWithNestedEntryProcessor(String cacheName, int key) {
+ this.key = key;
+ this.cacheName = cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean call() {
+ log.info("!!!!! I am Compute with nested entry processor " + key + " on " + ignite.name());
+
+ return ignite.cache(cacheName).withKeepBinary().invoke(key, new NestedEntryProcessor());
+ }
+
+ /** */
+ private static class NestedEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
+ /** {@inheritDoc} */
+ @Override public Boolean process(MutableEntry entry, Object... arguments) throws EntryProcessorException {
+ return entry.getValue() != null;
+ }
+ }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/BinaryPredicate.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/BinaryPredicate.java
new file mode 100644
index 0000000..b3ecc47
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/BinaryPredicate.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.pedicates;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ *
+ */
+public abstract class BinaryPredicate<K> implements IgniteBiPredicate<K, BinaryObject> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /** Logger. */
+ @LoggerResource
+ protected IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(K key, BinaryObject bo) {
+ return apply(bo);
+ }
+
+ /**
+ * @param bo Object to check.
+ * @return Predicate result.
+ */
+ public abstract boolean apply(BinaryObject bo);
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/CompositePredicate.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/CompositePredicate.java
new file mode 100644
index 0000000..7517cf5
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/CompositePredicate.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.pedicates;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.ignite.binary.BinaryObject;
+
+/** */
+public class CompositePredicate<K> extends BinaryPredicate<K> implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 238742479L;
+
+ /** Predicates. */
+ private final Collection<BinaryPredicate> predicates = new ArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BinaryObject bo) {
+ log.info("CompositePredicate on " + ignite.configuration().getIgniteInstanceName());
+
+ for (BinaryPredicate predicate : predicates) {
+ predicate.ignite = ignite;
+
+ if (!predicate.apply(bo))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * @param predicate Binary predicate.
+ */
+ public void addPredicate(BinaryPredicate predicate) {
+ predicates.add(predicate);
+ }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/FirstConsideredPredicate.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/FirstConsideredPredicate.java
new file mode 100644
index 0000000..39bd176
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/FirstConsideredPredicate.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.pedicates;
+
+import org.apache.ignite.binary.BinaryObject;
+
+/** */
+public class FirstConsideredPredicate extends BinaryPredicate {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 238742455L;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BinaryObject bo) {
+ log.info("FirstConsideredPredicate on " + ignite.configuration().getIgniteInstanceName());
+
+ return bo.hasField("isDeleted");
+ }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/SecondConsideredPredicate.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/SecondConsideredPredicate.java
new file mode 100644
index 0000000..d9f4706
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/SecondConsideredPredicate.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.pedicates;
+
+import org.apache.ignite.binary.BinaryObject;
+
+/** */
+public class SecondConsideredPredicate extends BinaryPredicate {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 238742456L;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(BinaryObject bo) {
+ log.info("SecondConsideredPredicate on " + ignite.configuration().getIgniteInstanceName());
+
+ return !bo.hasField("isDeleted");
+ }
+}