You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/11 10:15:55 UTC

[2/2] ignite git commit: Merge branch master into ignite-3477-master

Merge branch master into ignite-3477-master


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3eb05de5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3eb05de5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3eb05de5

Branch: refs/heads/ignite-3477-master
Commit: 3eb05de5e2ee0d14567167bfe8547441cae69523
Parents: 8122099 aeacad6
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Apr 11 13:16:03 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Apr 11 13:16:03 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   8 +-
 .../GridCachePartitionExchangeManager.java      |   6 +-
 .../processors/cache/GridCachePreloader.java    |   4 +-
 .../cache/GridCachePreloaderAdapter.java        |   4 +-
 .../dht/preloader/GridDhtPartitionDemander.java |   2 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |   8 +-
 .../GridDhtPartitionSupplyMessage.java          | 103 +++--
 .../GridDhtPartitionSupplyMessageV2.java        | 422 -------------------
 .../dht/preloader/GridDhtPreloader.java         |   2 +-
 .../resources/META-INF/classnames.properties    |   1 -
 .../cache/ClusterStateAbstractTest.java         |  10 +-
 .../CacheLateAffinityAssignmentTest.java        |   6 +-
 .../IgniteCacheReadFromBackupTest.java          |   6 +-
 .../atomic/IgniteCacheAtomicProtocolTest.java   |   3 +-
 14 files changed, 84 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index b80ad04,7c2599a..f7f0aff
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@@ -332,10 -537,8 +334,10 @@@ class GridDhtPartitionSupplier 
                                      if (!reply(node, d, s, scId))
                                          return;
  
-                                     s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
+                                     s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
 -                                        cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
 +                                        cctx.cacheId(),
 +                                        d.topologyVersion(),
 +                                        cctx.deploymentEnabled());
                                  }
                              }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index cc30321,a01be28..ee461ab
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@@ -66,9 -64,9 +64,14 @@@ public class GridDhtPartitionSupplyMess
      @GridDirectCollection(int.class)
      private Collection<Integer> missed;
  
++    /** Partitions for which we were able to get historical iterator. */
++    @GridToStringInclude
++    @GridDirectCollection(int.class)
++    private Collection<Integer> clean;
++
      /** Entries. */
      @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class)
-     private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>();
+     private Map<Integer, CacheEntryInfoCollection> infos;
  
      /** Message size. */
      @GridDirectTransient
@@@ -159,6 -142,6 +147,25 @@@
      }
  
      /**
++     * @param p Partition to clean.
++     */
++    void clean(int p) {
++        if (clean == null)
++            clean = new HashSet<>();
++
++        if (clean.add(p))
++            msgSize += 4;
++    }
++
++    /**
++     * @param p Partition to check.
++     * @return Check result.
++     */
++    boolean isClean(int p) {
++        return clean != null && clean.contains(p);
++    }
++
++    /**
       * @param p Missed partition.
       */
      void missed(int p) {
@@@ -288,7 -274,7 +298,7 @@@
  
          switch (writer.state()) {
              case 3:
-                 if (!writer.writeBoolean("ack", ack))
 -                if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
++                if (!writer.writeCollection("clean", clean, MessageCollectionItemType.INT))
                      return false;
  
                  writer.incrementState();
@@@ -312,13 -298,7 +322,13 @@@
                  writer.incrementState();
  
              case 7:
-                 if (!writer.writeLong("updateSeq", updateSeq))
++                if (!writer.writeMessage("topVer", topVer))
 +                    return false;
 +
 +                writer.incrementState();
 +
 +            case 8:
-                 if (!writer.writeInt("workerId", workerId))
+                 if (!writer.writeLong("updateSeq", updateSeq))
                      return false;
  
                  writer.incrementState();
@@@ -340,7 -320,7 +350,7 @@@
  
          switch (reader.state()) {
              case 3:
-                 ack = reader.readBoolean("ack");
 -                infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
++                clean = reader.readCollection("clean", MessageCollectionItemType.INT);
  
                  if (!reader.isLastRead())
                      return false;
@@@ -372,15 -352,7 +382,15 @@@
                  reader.incrementState();
  
              case 7:
-                 updateSeq = reader.readLong("updateSeq");
++                topVer = reader.readMessage("topVer");
 +
 +                if (!reader.isLastRead())
 +                    return false;
 +
 +                reader.incrementState();
 +
 +            case 8:
-                 workerId = reader.readInt("workerId");
+                 updateSeq = reader.readLong("updateSeq");
  
                  if (!reader.isLastRead())
                      return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --cc modules/core/src/main/resources/META-INF/classnames.properties
index 473f176,8c5a72e..335a33f
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@@ -763,9 -740,9 +763,8 @@@ org.apache.ignite.internal.processors.c
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap
 -org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$1
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$SupplyContextPhase
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage
- org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$1
  org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$2

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
index f095e79,0000000..ce7829a
mode 100644,000000..100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java
@@@ -1,439 -1,0 +1,439 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + *
 + */
 +
 +package org.apache.ignite.internal.processors.cache;
 +
 +import java.util.Collection;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.locks.Lock;
 +import org.apache.ignite.Ignite;
 +import org.apache.ignite.IgniteCache;
 +import org.apache.ignite.IgniteException;
 +import org.apache.ignite.cluster.ClusterNode;
 +import org.apache.ignite.configuration.CacheConfiguration;
 +import org.apache.ignite.configuration.IgniteConfiguration;
 +import org.apache.ignite.internal.IgniteEx;
 +import org.apache.ignite.internal.IgniteInternalFuture;
 +import org.apache.ignite.internal.managers.communication.GridIoMessage;
 +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
- import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
++import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 +import org.apache.ignite.internal.util.GridConcurrentHashSet;
 +import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 +import org.apache.ignite.internal.util.typedef.internal.U;
 +import org.apache.ignite.lang.IgniteInClosure;
 +import org.apache.ignite.plugin.extensions.communication.Message;
 +import org.apache.ignite.spi.IgniteSpiException;
 +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 +import org.apache.ignite.testframework.GridTestUtils;
 +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 +import org.apache.ignite.transactions.Transaction;
 +
 +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 +
 +/**
 + *
 + */
 +@SuppressWarnings("TooBroadScope")
 +public abstract class ClusterStateAbstractTest extends GridCommonAbstractTest {
 +    /** Entry count. */
 +    public static final int ENTRY_CNT = 5000;
 +
 +    /** */
 +    public static final int GRID_CNT = 4;
 +
 +    /** */
 +    private static final String CACHE_NAME = "cache1";
 +
 +    /** */
 +    private static final Collection<Class> forbidden = new GridConcurrentHashSet<>();
 +
 +    /** */
 +    private static AtomicReference<Exception> errEncountered = new AtomicReference<>();
 +
 +    /** */
 +    private boolean activeOnStart = true;
 +
 +    /** */
 +    private boolean client;
 +
 +    /** {@inheritDoc} */
 +    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
 +        IgniteConfiguration cfg = super.getConfiguration(gridName);
 +
 +        cfg.setActiveOnStart(activeOnStart);
 +
 +        cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME));
 +
 +        if (client)
 +            cfg.setClientMode(true);
 +
 +        cfg.setCommunicationSpi(new TestCommunicationSpi());
 +
 +        return cfg;
 +    }
 +
 +    /**
 +     * @param cacheName Cache name.
 +     * @return Cache configuration.
 +     */
 +    protected abstract CacheConfiguration cacheConfiguration(String cacheName);
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTest() throws Exception {
 +        stopAllGrids();
 +
 +        forbidden.clear();
 +
 +        Exception err = errEncountered.getAndSet(null);
 +
 +        if (err != null)
 +            throw err;
 +    }
 +
 +    /**
 +     * @throws Exception if failed.
 +     */
 +    public void testDynamicCacheStart() throws Exception {
 +        activeOnStart = false;
 +
-         forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++        forbidden.add(GridDhtPartitionSupplyMessage.class);
 +        forbidden.add(GridDhtPartitionDemandMessage.class);
 +
 +        startGrids(GRID_CNT);
 +
 +        checkInactive(GRID_CNT);
 +
 +        forbidden.clear();
 +
 +        grid(0).active(true);
 +
 +        IgniteCache<Object, Object> cache2 = grid(0).createCache(new CacheConfiguration<>("cache2"));
 +
 +        for (int k = 0; k < ENTRY_CNT; k++)
 +            cache2.put(k, k);
 +
 +        grid(0).active(false);
 +
 +        checkInactive(GRID_CNT);
 +
 +        stopAllGrids();
 +    }
 +
 +    /**
 +     * @throws Exception if failed.
 +     */
 +    public void testNoRebalancing() throws Exception {
 +        activeOnStart = false;
 +
-         forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++        forbidden.add(GridDhtPartitionSupplyMessage.class);
 +        forbidden.add(GridDhtPartitionDemandMessage.class);
 +
 +        startGrids(GRID_CNT);
 +
 +        checkInactive(GRID_CNT);
 +
 +        forbidden.clear();
 +
 +        grid(0).active(true);
 +
 +        awaitPartitionMapExchange();
 +
 +        final IgniteCache<Object, Object> cache = grid(0).cache(CACHE_NAME);
 +
 +        for (int k = 0; k < ENTRY_CNT; k++)
 +            cache.put(k, k);
 +
 +        for (int g = 0; g < GRID_CNT; g++) {
 +            // Tests that state changes are propagated to existing and new nodes.
 +            assertTrue(grid(g).active());
 +
 +            IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
 +
 +            for (int k = 0; k < ENTRY_CNT; k++)
 +                assertEquals(k,  cache0.get(k));
 +        }
 +
 +        // Check that new node startup and shutdown works fine after activation.
 +        startGrid(GRID_CNT);
 +        startGrid(GRID_CNT + 1);
 +
 +        for (int g = 0; g < GRID_CNT + 2; g++) {
 +            IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
 +
 +            for (int k = 0; k < ENTRY_CNT; k++)
 +                assertEquals("Failed for [grid=" + g + ", key=" + k + ']', k, cache0.get(k));
 +        }
 +
 +        stopGrid(GRID_CNT + 1);
 +
 +        for (int g = 0; g < GRID_CNT + 1; g++)
 +            grid(g).cache(CACHE_NAME).rebalance().get();
 +
 +        stopGrid(GRID_CNT);
 +
 +        for (int g = 0; g < GRID_CNT; g++) {
 +            IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
 +
 +            for (int k = 0; k < ENTRY_CNT; k++)
 +                assertEquals(k,  cache0.get(k));
 +        }
 +
 +        grid(0).active(false);
 +
 +        GridTestUtils.waitForCondition(new GridAbsPredicate() {
 +            @Override public boolean apply() {
 +                for (int g = 0; g < GRID_CNT; g++) {
 +                    if (grid(g).active())
 +                        return false;
 +                }
 +
 +                return true;
 +            }
 +        }, 5000);
 +
 +        checkInactive(GRID_CNT);
 +
-         forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++        forbidden.add(GridDhtPartitionSupplyMessage.class);
 +        forbidden.add(GridDhtPartitionDemandMessage.class);
 +
 +        // Should stop without exchange.
 +        stopAllGrids();
 +    }
 +
 +    /**
 +     * @throws Exception if failed.
 +     */
 +    public void testActivationFromClient() throws Exception {
-         forbidden.add(GridDhtPartitionSupplyMessageV2.class);
++        forbidden.add(GridDhtPartitionSupplyMessage.class);
 +        forbidden.add(GridDhtPartitionDemandMessage.class);
 +
 +        activeOnStart = false;
 +
 +        startGrids(GRID_CNT);
 +
 +        client = true;
 +
 +        startGrid(GRID_CNT);
 +
 +        checkInactive(GRID_CNT + 1);
 +
 +        Ignite cl = grid(GRID_CNT);
 +
 +        forbidden.clear();
 +
 +        cl.active(true);
 +
 +        awaitPartitionMapExchange();
 +
 +        IgniteCache<Object, Object> cache = cl.cache(CACHE_NAME);
 +
 +        for (int k = 0; k < ENTRY_CNT; k++)
 +            cache.put(k, k);
 +
 +        for (int g = 0; g < GRID_CNT + 1; g++) {
 +            // Tests that state changes are propagated to existing and new nodes.
 +            assertTrue(grid(g).active());
 +
 +            IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME);
 +
 +            for (int k = 0; k < ENTRY_CNT; k++)
 +                assertEquals(k,  cache0.get(k));
 +        }
 +
 +        cl.active(false);
 +
 +        GridTestUtils.waitForCondition(new GridAbsPredicate() {
 +            @Override public boolean apply() {
 +                for (int g = 0; g < GRID_CNT + 1; g++) {
 +                    if (grid(g).active())
 +                        return false;
 +                }
 +
 +                return true;
 +            }
 +        }, 5000);
 +
 +        checkInactive(GRID_CNT + 1);
 +    }
 +
 +    /**
 +     * Tests that state doesn't change until all acquired locks are released.
 +     *
 +     * @throws Exception If fails.
 +     */
 +    public void testDeactivationWithPendingLock() throws Exception {
 +        fail("https://issues.apache.org/jira/browse/IGNITE-4931");
 +
 +        startGrids(GRID_CNT);
 +
 +        final CountDownLatch finishedLatch = new CountDownLatch(1);
 +
 +        Lock lock = grid(0).cache(CACHE_NAME).lock(1);
 +
 +        IgniteInternalFuture<?> fut;
 +
 +        lock.lock();
 +
 +        try {
 +            fut = multithreadedAsync(new Runnable() {
 +                @Override public void run() {
 +                    grid(1).active(false);
 +
 +                    finishedLatch.countDown();
 +                }
 +            }, 1);
 +
 +            U.sleep(2000);
 +
 +            assert !fut.isDone();
 +
 +            boolean hasActive = false;
 +
 +            for (int g = 0; g < GRID_CNT; g++) {
 +                IgniteEx grid = grid(g);
 +
 +                if (grid.active()) {
 +                    hasActive = true;
 +
 +                    break;
 +                }
 +
 +            }
 +
 +            assertTrue(hasActive);
 +        }
 +        finally {
 +            lock.unlock();
 +        }
 +
 +        fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
 +
 +        checkInactive(GRID_CNT);
 +
 +        finishedLatch.await();
 +    }
 +
 +    /**
 +     * Tests that state doesn't change until all pending transactions are finished.
 +     *
 +     * @throws Exception If fails.
 +     */
 +    public void testDeactivationWithPendingTransaction() throws Exception {
 +        fail("https://issues.apache.org/jira/browse/IGNITE-4931");
 +
 +        startGrids(GRID_CNT);
 +
 +        final CountDownLatch finishedLatch = new CountDownLatch(1);
 +
 +        final Ignite ignite0 = grid(0);
 +
 +        final IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME);
 +
 +        IgniteInternalFuture<?> fut;
 +
 +        try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
 +            cache0.get(1);
 +
 +            fut = multithreadedAsync(new Runnable() {
 +                @Override public void run() {
 +                    ignite0.active(false);
 +
 +                    finishedLatch.countDown();
 +                }
 +            }, 1);
 +
 +            U.sleep(2000);
 +
 +            assert !fut.isDone();
 +
 +            boolean hasActive = false;
 +
 +            for (int g = 0; g < GRID_CNT; g++) {
 +                IgniteEx grid = grid(g);
 +
 +                if (grid.active()) {
 +                    hasActive = true;
 +
 +                    break;
 +                }
 +
 +            }
 +
 +            assertTrue(hasActive);
 +
 +            cache0.put(1, 2);
 +
 +            tx.commit();
 +        }
 +
 +        fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
 +
 +        checkInactive(GRID_CNT);
 +
 +        ignite0.active(true);
 +
 +        for (int g = 0; g < GRID_CNT; g++)
 +            assertEquals(2, grid(g).cache(CACHE_NAME).get(1));
 +
 +        finishedLatch.await();
 +    }
 +
 +    /**
 +     *
 +     */
 +    private void checkInactive(int cnt) {
 +        for (int g = 0; g < cnt; g++)
 +            assertFalse(grid(g).active());
 +    }
 +
 +    /**
 +     *
 +     */
 +    private static class TestCommunicationSpi extends TcpCommunicationSpi {
 +        /** {@inheritDoc} */
 +        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
 +            checkForbidden((GridIoMessage)msg);
 +
 +            super.sendMessage(node, msg, ackC);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
 +            checkForbidden((GridIoMessage)msg);
 +
 +            super.sendMessage(node, msg);
 +        }
 +
 +        /**
 +         * @param msg Message to check.
 +         */
 +        private void checkForbidden(GridIoMessage msg) {
 +            if (forbidden.contains(msg.message().getClass())) {
 +                IgniteSpiException err = new IgniteSpiException("Message is forbidden for this test: " + msg.message());
 +
 +                // Set error in case if this exception is not visible to the user code.
 +                errEncountered.compareAndSet(null, err);
 +
 +                throw err;
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------