You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/04/16 13:04:15 UTC

[ignite] branch master updated: IGNITE-11698 Fixed invalid classloader select for p2p - Fixes #6421.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 426e25a  IGNITE-11698 Fixed invalid classloader select for p2p - Fixes #6421.
426e25a is described below

commit 426e25aa4c8f71f0e0339ed9866775922d57901c
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Apr 16 15:48:58 2019 +0300

    IGNITE-11698 Fixed invalid classloader select for p2p - Fixes #6421.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../cache/GridCacheDeploymentManager.java          |  62 ++++--
 ...GridP2PComputeWithNestedEntryProcessorTest.java | 246 +++++++++++++++++++++
 .../ignite/testsuites/IgniteP2PSelfTestSuite.java  |   2 +
 ...heDeploymenComputeWithNestedEntryProcessor.java |  77 +++++++
 .../tests/p2p/pedicates/BinaryPredicate.java       |  49 ++++
 .../tests/p2p/pedicates/CompositePredicate.java    |  53 +++++
 .../p2p/pedicates/FirstConsideredPredicate.java    |  33 +++
 .../p2p/pedicates/SecondConsideredPredicate.java   |  33 +++
 8 files changed, 537 insertions(+), 18 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
index 7bba1de..f0f2ed3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
@@ -92,6 +92,9 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
     /** */
     private boolean depEnabled;
 
+    /** Class loader id for local thread. */
+    private ThreadLocal<IgniteUuid> localLdrId = new ThreadLocal<>();
+
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
         globalLdr = new CacheClassLoader(cctx.gridConfig().getClassLoader());
@@ -365,6 +368,8 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
         DeploymentMode mode,
         Map<UUID, IgniteUuid> participants
     ) {
+        localLdrId.set(ldrId);
+
         assert depEnabled;
 
         if (mode == PRIVATE || mode == ISOLATED) {
@@ -803,31 +808,25 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
                 }
             }
 
-            for (CachedDeploymentInfo<K, V> t : deps.values()) {
-                UUID sndId = t.senderId();
-                IgniteUuid ldrId = t.loaderId();
-                String userVer = t.userVersion();
-                DeploymentMode mode = t.mode();
-                Map<UUID, IgniteUuid> participants = t.participants();
-
-                GridDeployment d = cctx.gridDeploy().getGlobalDeployment(
-                    mode,
-                    name,
-                    name,
-                    userVer,
-                    sndId,
-                    ldrId,
-                    participants,
-                    F.<ClusterNode>alwaysTrue());
+            IgniteUuid curLdrId = localLdrId.get();
 
-                if (d != null) {
-                    Class cls = d.deployedClass(name);
+            if (curLdrId != null) {
+                CachedDeploymentInfo<K, V> t = deps.get(curLdrId);
+
+                if (t != null) {
+                    Class<?> cls = tryToloadClassFromCacheDep(name, t);
 
                     if (cls != null)
                         return cls;
                 }
             }
 
+            for (CachedDeploymentInfo<K, V> t : deps.values()) {
+                Class<?> cls = tryToloadClassFromCacheDep(name, t);
+                if (cls != null)
+                    return cls;
+            }
+
             Class cls = getParent().loadClass(name);
 
             if (cls != null)
@@ -837,6 +836,33 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /**
+         * @param name Name of resource.
+         * @param deploymentInfo Grid cached deployment info.
+         * @return Class if can to load resource with the <code>name</code> or {@code null} otherwise.
+         */
+        @Nullable private Class<?> tryToloadClassFromCacheDep(String name, CachedDeploymentInfo<K, V> deploymentInfo) {
+            UUID sndId = deploymentInfo.senderId();
+            IgniteUuid ldrId = deploymentInfo.loaderId();
+            String userVer = deploymentInfo.userVersion();
+            DeploymentMode mode = deploymentInfo.mode();
+            Map<UUID, IgniteUuid> participants = deploymentInfo.participants();
+
+            GridDeployment d = cctx.gridDeploy().getGlobalDeployment(
+                mode,
+                name,
+                name,
+                userVer,
+                sndId,
+                ldrId,
+                participants,
+                F.<ClusterNode>alwaysTrue());
+
+            Class cls = d != null ? d.deployedClass(name) : null;
+
+            return cls;
+        }
+
+        /**
          * @param name Name of the class.
          * @return {@code True} if locally excluded.
          */
diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java
new file mode 100644
index 0000000..c848dc2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.p2p;
+
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.config.GridTestProperties;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonTest;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Test;
+
+/**
+ * P2P test.
+ */
+@GridCommonTest(group = "P2P")
+public class GridP2PComputeWithNestedEntryProcessorTest extends GridCommonAbstractTest {
+    /** URL of classes. */
+    private static final ClassLoader TEST_CLASS_LOADER;
+
+    /** Deleted filter name. */
+    private static final String FIRST_FILTER_NAME = "org.apache.ignite.tests.p2p.pedicates.FirstConsideredPredicate";
+
+    /** Deleted filter name. */
+    private static final String SECOND_FILTER_NAME = "org.apache.ignite.tests.p2p.pedicates.SecondConsideredPredicate";
+
+    /** Composite filter name. */
+    private static final String COMPOSITE_FILTER_NAME = "org.apache.ignite.tests.p2p.pedicates.CompositePredicate";
+
+    /** Cache entry processor. */
+    private static final String COMPUTE_WITH_NESTED_PROCESSOR = "org.apache.ignite.tests.p2p.CacheDeploymenComputeWithNestedEntryProcessor";
+
+    /** Entries. */
+    public static final int ENTRIES = 100;
+
+    /** Current deployment mode. Used in {@link #getConfiguration(String)}. */
+    private DeploymentMode depMode;
+
+    /** */
+    private static final TcpDiscoveryIpFinder FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Initialize ClassLoader. */
+    static {
+        try {
+            TEST_CLASS_LOADER = new URLClassLoader(
+                new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))},
+                GridP2PComputeWithNestedEntryProcessorTest.class.getClassLoader());
+        }
+        catch (MalformedURLException e) {
+            throw new RuntimeException("Define property p2p.uri.cls", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+            .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME))
+            .setDeploymentMode(depMode);
+
+        if (igniteInstanceName.startsWith("client"))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /**
+     * Test GridDeploymentMode.CONTINUOUS mode.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testContinuousMode() throws Exception {
+        depMode = DeploymentMode.CONTINUOUS;
+
+        processTest();
+    }
+
+    /**
+     * Test GridDeploymentMode.SHARED mode.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSharedMode() throws Exception {
+        depMode = DeploymentMode.SHARED;
+
+        processTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void processTest() throws Exception {
+        try {
+            Ignite ignite = startGrids(2);
+
+            createAndLoadCache(ignite);
+
+            for (int i = 0; i < 10; i++) {
+                try (Ignite client = startGrid("client")) {
+
+                    IgniteCache cache = client.cache(DEFAULT_CACHE_NAME).withKeepBinary();
+
+                    Integer key = primaryKey(ignite(1).cache(DEFAULT_CACHE_NAME));
+
+                    for (Boolean res : runJob(client, 10_000, DEFAULT_CACHE_NAME, key)) {
+                        assertTrue(res);
+                    }
+
+                    scnaCacheData(cache);
+                }
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    @NotNull private void createAndLoadCache(Ignite ignite) {
+        IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary();
+
+        for (int i = 0; i < ENTRIES; i++) {
+            if (i % 2 == 0) {
+                cache.put(i, ignite.binary()
+                    .builder("boType")
+                    .build());
+            }
+            else
+                cache.put(i, ignite.binary()
+                    .builder("boType")
+                    .setField("isDeleted", true)
+                    .build());
+        }
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param timeout Timeout.
+     * @param param Parameter.
+     * @throws Exception If failed.
+     */
+    private Collection<Boolean> runJob(Ignite ignite, long timeout,
+        String cacheName, int param) throws Exception {
+        Constructor ctor = TEST_CLASS_LOADER.loadClass(COMPUTE_WITH_NESTED_PROCESSOR)
+            .getConstructor(String.class, int.class);
+
+        return ignite.compute().withTimeout(timeout).broadcast((IgniteCallable<Boolean>)ctor.newInstance(cacheName, param));
+    }
+
+    /**
+     * @param cache Ignite cache.
+     * @throws Exception If failed.
+     */
+    private void scnaCacheData(IgniteCache cache) throws Exception {
+        scanByCopositeFirstPredicate(cache);
+        scanByCopositeSecondPredicate(cache);
+        scanByCopositeFirstSecondPredicate(cache);
+    }
+
+    /**
+     * @param cache Ignite cache.
+     * @throws Exception If failed.
+     */
+    private void scanByCopositeFirstPredicate(IgniteCache cache) throws Exception {
+        IgniteBiPredicate firstFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(FIRST_FILTER_NAME)
+            .getConstructor().newInstance();
+        IgniteBiPredicate compositeFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME)
+            .getConstructor().newInstance();
+
+        U.invoke(TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME), compositeFilter, "addPredicate", firstFilter);
+
+        List list = cache.query(new ScanQuery().setFilter(compositeFilter)).getAll();
+
+        assertEquals(list.size(), ENTRIES / 2);
+    }
+
+    /**
+     * @param cache Ignite chache.
+     * @throws Exception If failed.
+     */
+    private void scanByCopositeSecondPredicate(IgniteCache cache) throws Exception {
+        IgniteBiPredicate secondFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(SECOND_FILTER_NAME)
+            .getConstructor().newInstance();
+        IgniteBiPredicate compositeFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME)
+            .getConstructor().newInstance();
+
+        U.invoke(TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME), compositeFilter, "addPredicate", secondFilter);
+
+        List list = cache.query(new ScanQuery().setFilter(compositeFilter)).getAll();
+
+        assertEquals(list.size(), ENTRIES / 2);
+    }
+
+    /**
+     * @param cache Ignite cache.
+     * @throws Exception If failed.
+     */
+    private void scanByCopositeFirstSecondPredicate(IgniteCache cache) throws Exception {
+        IgniteBiPredicate firstFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(FIRST_FILTER_NAME)
+            .getConstructor().newInstance();
+        IgniteBiPredicate secondFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(SECOND_FILTER_NAME)
+            .getConstructor().newInstance();
+        IgniteBiPredicate compositeFilter = (IgniteBiPredicate)TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME)
+            .getConstructor().newInstance();
+
+        U.invoke(TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME), compositeFilter, "addPredicate", firstFilter);
+        U.invoke(TEST_CLASS_LOADER.loadClass(COMPOSITE_FILTER_NAME), compositeFilter, "addPredicate", secondFilter);
+
+        List list = cache.query(new ScanQuery().setPageSize(10).setFilter(compositeFilter)).getAll();
+
+        assertEquals(list.size(), 0);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
index 85010c9..6651a69 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentMessageCountSelfTest;
 import org.apache.ignite.p2p.DeploymentClassLoaderCallableTest;
 import org.apache.ignite.p2p.GridP2PClassLoadingSelfTest;
+import org.apache.ignite.p2p.GridP2PComputeWithNestedEntryProcessorTest;
 import org.apache.ignite.p2p.GridP2PContinuousDeploymentSelfTest;
 import org.apache.ignite.p2p.GridP2PDifferentClassLoaderSelfTest;
 import org.apache.ignite.p2p.GridP2PDoubleDeploymentSelfTest;
@@ -64,6 +65,7 @@ import org.junit.runners.Suite;
     SharedDeploymentTest.class,
     P2PScanQueryUndeployTest.class,
     GridDeploymentMessageCountSelfTest.class,
+    GridP2PComputeWithNestedEntryProcessorTest.class,
 })
 public class IgniteP2PSelfTestSuite {
 }
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymenComputeWithNestedEntryProcessor.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymenComputeWithNestedEntryProcessor.java
new file mode 100644
index 0000000..cf4e433
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymenComputeWithNestedEntryProcessor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ *
+ */
+public class CacheDeploymenComputeWithNestedEntryProcessor implements IgniteCallable<Boolean> {
+    /** */
+    @IgniteInstanceResource
+    Ignite ignite;
+
+    /** Logger. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** Key. */
+    private int key;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /**
+     * Default constructor.
+     */
+    public CacheDeploymenComputeWithNestedEntryProcessor() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param key Key.
+     */
+    public CacheDeploymenComputeWithNestedEntryProcessor(String cacheName, int key) {
+        this.key = key;
+        this.cacheName = cacheName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean call() {
+        log.info("!!!!! I am Compute with nested entry processor " + key + " on " + ignite.name());
+
+        return ignite.cache(cacheName).withKeepBinary().invoke(key, new NestedEntryProcessor());
+    }
+
+    /** */
+    private static class NestedEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
+        /** {@inheritDoc} */
+        @Override public Boolean process(MutableEntry entry, Object... arguments) throws EntryProcessorException {
+            return entry.getValue() != null;
+        }
+    }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/BinaryPredicate.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/BinaryPredicate.java
new file mode 100644
index 0000000..b3ecc47
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/BinaryPredicate.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.pedicates;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ *
+ */
+public abstract class BinaryPredicate<K> implements IgniteBiPredicate<K, BinaryObject> {
+    /** Ignite. */
+    @IgniteInstanceResource
+    protected Ignite ignite;
+
+    /** Logger. */
+    @LoggerResource
+    protected IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(K key, BinaryObject bo) {
+        return apply(bo);
+    }
+
+    /**
+     * @param bo Object to check.
+     * @return Predicate result.
+     */
+    public abstract boolean apply(BinaryObject bo);
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/CompositePredicate.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/CompositePredicate.java
new file mode 100644
index 0000000..7517cf5
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/CompositePredicate.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.pedicates;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.ignite.binary.BinaryObject;
+
+/** */
+public class CompositePredicate<K> extends BinaryPredicate<K> implements Serializable {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 238742479L;
+
+    /** Predicates. */
+    private final Collection<BinaryPredicate> predicates = new ArrayList<>();
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(BinaryObject bo) {
+        log.info("CompositePredicate on " + ignite.configuration().getIgniteInstanceName());
+
+        for (BinaryPredicate predicate : predicates) {
+            predicate.ignite = ignite;
+
+            if (!predicate.apply(bo))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * @param predicate Binary predicate.
+     */
+    public void addPredicate(BinaryPredicate predicate) {
+        predicates.add(predicate);
+    }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/FirstConsideredPredicate.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/FirstConsideredPredicate.java
new file mode 100644
index 0000000..39bd176
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/FirstConsideredPredicate.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.pedicates;
+
+import org.apache.ignite.binary.BinaryObject;
+
+/** */
+public class FirstConsideredPredicate extends BinaryPredicate {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 238742455L;
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(BinaryObject bo) {
+        log.info("FirstConsideredPredicate on " + ignite.configuration().getIgniteInstanceName());
+
+        return bo.hasField("isDeleted");
+    }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/SecondConsideredPredicate.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/SecondConsideredPredicate.java
new file mode 100644
index 0000000..d9f4706
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/pedicates/SecondConsideredPredicate.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.pedicates;
+
+import org.apache.ignite.binary.BinaryObject;
+
+/** */
+public class SecondConsideredPredicate extends BinaryPredicate {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 238742456L;
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(BinaryObject bo) {
+        log.info("SecondConsideredPredicate on " + ignite.configuration().getIgniteInstanceName());
+
+        return !bo.hasField("isDeleted");
+    }
+}