You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/08/17 15:51:52 UTC

incubator-ignite git commit: Squashed commit for ignite-1239

Repository: incubator-ignite
Updated Branches:
  refs/heads/master 7635e5894 -> 45c813af7


Squashed commit for ignite-1239


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/45c813af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/45c813af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/45c813af

Branch: refs/heads/master
Commit: 45c813af7eb4a11ec59d3477a6a0b68791f1d7f2
Parents: 7635e58
Author: Denis Magda <dm...@gridgain.com>
Authored: Mon Aug 17 16:51:41 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Mon Aug 17 16:51:41 2015 +0300

----------------------------------------------------------------------
 .../GridDhtUnreservedPartitionException.java    | 66 ++++++++++++++
 .../cache/query/GridCacheQueryAdapter.java      | 56 ++++++++++--
 .../cache/query/GridCacheQueryManager.java      | 71 ++++++++++-----
 ...CacheScanPartitionQueryFallbackSelfTest.java | 96 ++++++++++++++++++++
 4 files changed, 261 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java
new file mode 100644
index 0000000..d824a47
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.affinity.*;
+
+/**
+ * Exception that is thrown when a partition reservation failed.
+ */
+public class GridDhtUnreservedPartitionException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Partition. */
+    private final int part;
+
+    /** Topology version. */
+    private final AffinityTopologyVersion topVer;
+
+    /**
+     * @param part Partition.
+     * @param topVer Affinity topology version.
+     * @param msg Message.
+     */
+    public GridDhtUnreservedPartitionException(int part, AffinityTopologyVersion topVer, String msg) {
+        super(msg);
+
+        this.part = part;
+        this.topVer = topVer;
+    }
+
+    /**
+     * @return Partition.
+     */
+    public int partition() {
+        return part;
+    }
+
+    /**
+     * @return Affinity topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return getClass() + " [part=" + part + ", msg=" + getMessage() + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 953cb9a..90f9b9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -457,7 +458,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
                 qryMgr.queryFieldsDistributed(bean, nodes));
         else if (type == SCAN && part != null && nodes.size() > 1)
-            return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr);
+            return new CacheQueryFallbackFuture<>(nodes, part, bean, qryMgr, cctx);
         else
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
     }
@@ -554,7 +555,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         private volatile GridCacheQueryFutureAdapter<?, ?, R> fut;
 
         /** Backups. */
-        private final Queue<ClusterNode> nodes;
+        private volatile Queue<ClusterNode> nodes;
+
+        /** Topology version of the last detected {@link GridDhtUnreservedPartitionException}. */
+        private volatile AffinityTopologyVersion unreservedTopVer;
+
+        /** Number of times to retry the query on the nodes failed with {@link GridDhtUnreservedPartitionException}. */
+        private volatile int unreservedNodesRetryCnt = 5;
 
         /** Bean. */
         private final GridCacheQueryBean bean;
@@ -562,16 +569,26 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         /** Query manager. */
         private final GridCacheQueryManager qryMgr;
 
+        /** Cache context. */
+        private final GridCacheContext cctx;
+
+        /** Partition. */
+        private final int part;
+
         /**
          * @param nodes Backups.
+         * @param part Partition.
          * @param bean Bean.
          * @param qryMgr Query manager.
+         * @param cctx Cache context.
          */
-        public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, GridCacheQueryBean bean,
-            GridCacheQueryManager qryMgr) {
+        public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, int part, GridCacheQueryBean bean,
+            GridCacheQueryManager qryMgr, GridCacheContext cctx) {
             this.nodes = fallbacks(nodes);
             this.bean = bean;
             this.qryMgr = qryMgr;
+            this.cctx = cctx;
+            this.part = part;
 
             init();
         }
@@ -598,7 +615,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
          */
         @SuppressWarnings("unchecked")
         private void init() {
-            ClusterNode node = nodes.poll();
+            final ClusterNode node = nodes.poll();
 
             GridCacheQueryFutureAdapter<?, ?, R> fut0 = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ?
                 qryMgr.queryLocal(bean) :
@@ -613,8 +630,33 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                         onDone(e);
                     }
                     catch (IgniteCheckedException e) {
-                        if (F.isEmpty(nodes))
-                            onDone(e);
+                        if (e.hasCause(GridDhtUnreservedPartitionException.class)) {
+                            unreservedTopVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion();
+
+                            assert unreservedTopVer != null;
+                        }
+
+                        if (F.isEmpty(nodes)) {
+                            final AffinityTopologyVersion topVer = unreservedTopVer;
+
+                            if (topVer != null && --unreservedNodesRetryCnt > 0) {
+                                cctx.affinity().affinityReadyFuture(topVer).listen(
+                                    new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                                        @Override public void apply(
+                                            IgniteInternalFuture<AffinityTopologyVersion> future) {
+
+                                            nodes = fallbacks(cctx.topology().owners(part, topVer));
+
+                                            // Race is impossible here because query retries are executed one by one.
+                                            unreservedTopVer = null;
+
+                                            init();
+                                        }
+                                    });
+                            }
+                            else
+                                onDone(e);
+                        }
                         else
                             init();
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 5d3f6a3..bfe5ecc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.indexing.*;
+
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
@@ -170,7 +171,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
-     *  Leaves busy state.
+     * Leaves busy state.
      */
     private void leaveBusy() {
         busyLock.leaveBusy();
@@ -794,7 +795,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                         // double check for owning state
                         if (locPart == null || locPart.state() != OWNING || !locPart.reserve() ||
                             locPart.state() != OWNING)
-                            throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved");
+                            throw new GridDhtUnreservedPartitionException(part,
+                                cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved");
 
                         iter = new Iterator<K>() {
                             private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();
@@ -1083,6 +1085,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             boolean rmvRes = true;
 
+            FieldsResult res = null;
+
             final boolean statsEnabled = cctx.config().isStatisticsEnabled();
 
             final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
@@ -1109,7 +1113,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
 
-                FieldsResult res = qryInfo.local() ?
+                res = qryInfo.local() ?
                     executeFieldsQuery(qry, qryInfo.arguments(), qryInfo.local(), qry.subjectId(), taskName,
                     recipient(qryInfo.senderId(), qryInfo.requestId())) :
                     fieldsQueryResult(qryInfo, taskName);
@@ -1232,7 +1236,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     throw (Error)e;
             }
             finally {
-                if (rmvRes)
+                if (qryInfo.local()) {
+                    // Don't we need to always remove local iterators?
+                    if (rmvRes && res != null) {
+                        try {
+                            res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId()));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
+                                cctx.nodeId() + "]", e);
+                        }
+                    }
+                }
+                else if (rmvRes)
                     removeFieldsQueryResult(qryInfo.senderId(), qryInfo.requestId());
             }
         }
@@ -1260,6 +1276,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         try {
             boolean loc = qryInfo.local();
 
+            QueryResult<K, V> res = null;
+
             if (log.isDebugEnabled())
                 log.debug("Running query: " + qryInfo);
 
@@ -1286,8 +1304,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 IgniteSpiCloseableIterator<IgniteBiTuple<K, V>> iter;
                 GridCacheQueryType type;
 
-                QueryResult<K, V> res;
-
                 res = loc ?
                     executeQuery(qry, qryInfo.arguments(), loc, qry.subjectId(), taskName,
                     recipient(qryInfo.senderId(), qryInfo.requestId())) :
@@ -1350,7 +1366,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                         log.debug("Record [key=" + key +
                             ", val=" + val +
                             ", incBackups=" + incBackups +
-                            ", priNode=" + (primaryNode != null ? U.id8(primaryNode.id())  : null) +
+                            ", priNode=" + (primaryNode != null ? U.id8(primaryNode.id()) : null) +
                             ", node=" + U.id8(cctx.localNode().id()) + ']');
                     }
 
@@ -1496,7 +1512,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     throw (Error)e;
             }
             finally {
-                if (rmvIter)
+                if (loc) {
+                    // Local iterators are always removed.
+                    if (res != null) {
+                        try {
+                            res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId()));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
+                                cctx.nodeId() + "]", e);
+                        }
+                    }
+                }
+                else if (rmvIter)
                     removeQueryResult(qryInfo.senderId(), qryInfo.requestId());
             }
         }
@@ -1552,7 +1580,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @return Iterator.
      * @throws IgniteCheckedException In case of error.
      */
-    @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter",
+    @SuppressWarnings({
+        "SynchronizationOnLocalVariableOrMethodParameter",
         "NonPrivateFieldAccessedInSynchronizedContext"})
     private QueryResult<K, V> queryResult(Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs,
         GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
@@ -1680,7 +1709,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @return Fields query result.
      * @throws IgniteCheckedException In case of error.
      */
-    @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter",
+    @SuppressWarnings({
+        "SynchronizationOnLocalVariableOrMethodParameter",
         "NonPrivateFieldAccessedInSynchronizedContext"})
     private FieldsResult fieldsQueryResult(Map<Long, GridFutureAdapter<FieldsResult>> resMap,
         GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
@@ -1868,8 +1898,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     /**
      * @param <K> Key type.
      * @param <V> Value type.
-     * @return Predicate.
      * @param includeBackups Include backups.
+     * @return Predicate.
      */
     @SuppressWarnings("unchecked")
     @Nullable public <K, V> IndexingQueryFilter backupsFilter(boolean includeBackups) {
@@ -1933,7 +1963,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         /** {@inheritDoc} */
         @Override public Collection<CacheSqlMetadata> call() {
-            final GridKernalContext ctx = ((IgniteKernal) ignite).context();
+            final GridKernalContext ctx = ((IgniteKernal)ignite).context();
 
             Collection<String> cacheNames = F.viewReadOnly(ctx.cache().caches(),
                 new C1<IgniteInternalCache<?, ?>, String>() {
@@ -2507,7 +2537,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             if (!filter.apply(key, val))
                 return null;
 
-            return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value())) ;
+            return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value()));
         }
     }
 
@@ -2546,7 +2576,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             idx++;
 
-            while(idx < iters.size()) {
+            while (idx < iters.size()) {
                 iter = iters.get(idx);
 
                 if (iter.hasNextX())
@@ -2598,7 +2628,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         /** {@inheritDoc} */
         @Override public <T> T unwrap(Class<T> clazz) {
-            if(clazz.isAssignableFrom(getClass()))
+            if (clazz.isAssignableFrom(getClass()))
                 return clazz.cast(this);
 
             throw new IllegalArgumentException();
@@ -2627,7 +2657,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             assert res;
         }
 
-
         /**
          * Close if this result does not have any other recipients.
          *
@@ -2958,8 +2987,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
-     * Creates user's full text query, queried class, and query clause.
-     * For more information refer to {@link CacheQuery} documentation.
+     * Creates user's full text query, queried class, and query clause. For more information refer to {@link CacheQuery}
+     * documentation.
      *
      * @param clsName Query class name.
      * @param search Search clause.
@@ -2982,14 +3011,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
-     * Creates user's SQL fields query for given clause. For more information refer to
-     * {@link CacheQuery} documentation.
+     * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery}
+     * documentation.
      *
      * @param qry Query.
      * @param keepPortable Keep portable flag.
      * @return Created query.
      */
-     public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) {
+    public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepPortable) {
         A.notNull(qry, "qry");
 
         return new GridCacheQueryAdapter<>(cctx,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index 84ceafd..f422e9c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -157,6 +159,90 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
     }
 
     /**
+     * Scan should activate fallback mechanism when new nodes join topology and rebalancing happens in parallel with
+     * scan query.
+     *
+     * @throws Exception In case of error.
+     */
+    public void testScanFallbackOnRebalancing() throws Exception {
+        cacheMode = CacheMode.PARTITIONED;
+        clientMode = false;
+        backups = 1;
+        commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory();
+
+        try {
+            Ignite ignite = startGrids(GRID_CNT);
+
+            final IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite);
+
+            final AtomicBoolean done = new AtomicBoolean(false);
+
+            final AtomicInteger idx = new AtomicInteger(GRID_CNT);
+
+            IgniteInternalFuture fut1 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        int id = idx.getAndIncrement();
+
+                        while (!done.get()) {
+                            startGrid(id);
+                            Thread.sleep(3000);
+
+                            stopGrid(id);
+
+                            if (done.get())
+                                return null;
+
+                            Thread.sleep(3000);
+                        }
+
+                        return null;
+                    }
+                }, GRID_CNT);
+
+            final AtomicInteger nodeIdx = new AtomicInteger();
+
+            IgniteInternalFuture fut2 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        int nodeId = nodeIdx.getAndIncrement();
+
+                        IgniteCacheProxy<Integer, Integer> cache = (IgniteCacheProxy<Integer, Integer>)
+                            grid(nodeId).<Integer, Integer>cache(null);
+
+                        while (!done.get()) {
+                            IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context());
+
+                            int part = tup.get1();
+
+                            try {
+                                CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(
+                                    null, part, false);
+
+                                doTestScanQuery(qry);
+                            }
+                            catch (ClusterGroupEmptyCheckedException e) {
+                                log.warning("Invalid partition: " + part, e);
+                            }
+                        }
+
+                        return null;
+                    }
+                }, GRID_CNT);
+
+            Thread.sleep(60 * 1000); // Test for one minute
+
+            done.set(true);
+
+            fut2.get();
+            fut1.get();
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      * Scan should try first remote node and fallbacks to second remote node.
      *
      * @throws Exception If failed.
@@ -408,4 +494,14 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
             };
         }
     }
+
+    /**
+     *
+     */
+    private static class TestFallbackOnRebalancingCommunicationSpiFactory implements CommunicationSpiFactory {
+        /** {@inheritDoc} */
+        @Override public TcpCommunicationSpi create() {
+            return new TcpCommunicationSpi();
+        }
+    }
 }