You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/02/16 17:36:23 UTC
[22/50] [abbrv] ignite git commit: Merge branch 'ignite-1.7.6'
Merge branch 'ignite-1.7.6'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aaeda721
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aaeda721
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aaeda721
Branch: refs/heads/ignite-comm-balance-master
Commit: aaeda7214f738dff2fbd865e83250413a9b7cc0f
Parents: e1c3dda f350578
Author: agura <ag...@apache.org>
Authored: Thu Feb 9 20:29:48 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Thu Feb 9 20:29:48 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../org/apache/ignite/cache/QueryEntity.java | 21 +
.../org/apache/ignite/cache/query/SqlQuery.java | 25 +
.../processors/cache/EntryGetResult.java | 65 +++
.../processors/cache/GridCacheAdapter.java | 125 +++--
.../processors/cache/GridCacheContext.java | 4 +-
.../processors/cache/GridCacheEntryEx.java | 42 +-
.../processors/cache/GridCacheMapEntry.java | 140 +++++-
.../processors/cache/GridCacheUtils.java | 3 +
.../processors/cache/IgniteCacheProxy.java | 3 +
.../processors/cache/ReaderArguments.java | 74 +++
.../distributed/dht/GridDhtCacheAdapter.java | 9 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 83 ++--
.../distributed/dht/GridDhtGetSingleFuture.java | 75 ++-
.../dht/GridPartitionedGetFuture.java | 10 +-
.../dht/GridPartitionedSingleGetFuture.java | 10 +-
.../dht/atomic/GridDhtAtomicCache.java | 12 +-
.../dht/colocated/GridDhtColocatedCache.java | 10 +-
.../distributed/near/GridNearGetFuture.java | 19 +-
.../local/atomic/GridLocalAtomicCache.java | 11 +-
.../cache/query/GridCacheQueryManager.java | 83 +++-
.../continuous/CacheContinuousQueryHandler.java | 81 +++-
.../cache/transactions/IgniteTxHandler.java | 2 +-
.../transactions/IgniteTxLocalAdapter.java | 65 +--
.../processors/query/GridQueryIndexing.java | 4 +-
.../processors/query/GridQueryProcessor.java | 79 ++--
.../query/GridQueryTypeDescriptor.java | 7 +
.../communication/tcp/TcpCommunicationSpi.java | 16 +
.../ignite/spi/discovery/tcp/ClientImpl.java | 90 +++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 63 ++-
.../messages/TcpDiscoveryAbstractMessage.java | 21 +
modules/core/src/test/config/log4j-test.xml | 6 +
.../cache/CacheConcurrentReadThroughTest.java | 184 ++++++++
.../processors/cache/GridCacheTestEntryEx.java | 30 +-
.../near/GridNearCacheStoreUpdateTest.java | 466 +++++++++++++++++++
.../GridNearOffheapCacheStoreUpdateTest.java | 35 ++
.../cache/query/IndexingSpiQuerySelfTest.java | 69 ++-
.../IndexingSpiQueryWithH2IndexingSelfTest.java | 36 ++
.../tcp/TcpCommunicationSpiDropNodesTest.java | 322 +++++++++++++
.../TcpCommunicationSpiFaultyClientTest.java | 265 +++++++++++
.../ignite/testframework/GridTestNode.java | 1 +
.../testframework/junits/GridAbstractTest.java | 2 +
.../testsuites/IgniteCacheTestSuite2.java | 7 +
.../IgniteSpiCommunicationSelfTestSuite.java | 5 +
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +-
.../processors/query/h2/IgniteH2Indexing.java | 30 +-
...CacheScanPartitionQueryFallbackSelfTest.java | 2 +-
.../cache/IgniteCacheAbstractQuerySelfTest.java | 294 ++++++++++++
.../IgniteCachePartitionedQuerySelfTest.java | 85 ++++
.../h2/GridIndexingSpiAbstractSelfTest.java | 29 +-
50 files changed, 2719 insertions(+), 406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 8d0a962,59665bb..264fa14
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@@ -1995,12 -2023,11 +2016,12 @@@ public abstract class GridCacheAdapter<
GridCacheEntryEx entry = entryEx(key);
try {
- GridCacheVersion verSet = entry.versionedValue(cacheVal,
- ver,
+ T2<CacheObject, GridCacheVersion> verVal = entry.versionedValue(
+ cacheVal,
+ res.version(),
null,
- expiry);
-
- boolean set = verSet != null;
++ expiry,
+ readerArgs);
if (log.isDebugEnabled())
log.debug("Set value loaded from store into entry [" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 9e9b496,51f423a..f26288f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@@ -725,15 -755,15 +755,17 @@@ public interface GridCacheEntryEx
* @param val New value.
* @param curVer Version to match or {@code null} if match is not required.
* @param newVer Version to set.
+ * @param loadExpiryPlc Expiry policy if entry is loaded from store.
- * @return Non null version if value was set.
+ * @param readerArgs Reader will be added if not null.
+ * @return Current version and value.
* @throws IgniteCheckedException If index could not be updated.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- public GridCacheVersion versionedValue(CacheObject val,
+ public T2<CacheObject, GridCacheVersion> versionedValue(CacheObject val,
@Nullable GridCacheVersion curVer,
@Nullable GridCacheVersion newVer,
- @Nullable IgniteCacheExpiryPolicy loadExpiryPlc)
++ @Nullable IgniteCacheExpiryPolicy loadExpiryPlc,
+ @Nullable ReaderArguments readerArgs)
throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 52b779d,59e4181..942ae21
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@@ -3549,14 -3609,28 +3609,29 @@@ public abstract class GridCacheMapEntr
}
/** {@inheritDoc} */
- @Override public synchronized GridCacheVersion versionedValue(CacheObject val,
+ @Override public synchronized void clearReserveForLoad(GridCacheVersion ver) throws IgniteCheckedException {
+ if (obsoleteVersionExtras() != null)
+ return;
+
+ if (ver.equals(this.ver)) {
+ assert evictionDisabled() : this;
+
+ flags &= ~IS_EVICT_DISABLED;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized T2<CacheObject, GridCacheVersion> versionedValue(CacheObject val,
GridCacheVersion curVer,
GridCacheVersion newVer,
- @Nullable IgniteCacheExpiryPolicy loadExpiryPlc)
++ @Nullable IgniteCacheExpiryPolicy loadExpiryPlc,
+ @Nullable ReaderArguments readerArgs)
- throws IgniteCheckedException, GridCacheEntryRemovedException
- {
+ throws IgniteCheckedException, GridCacheEntryRemovedException {
+
checkObsolete();
+ addReaderIfNeed(readerArgs);
+
if (curVer == null || curVer.equals(ver)) {
if (val != this.val) {
GridCacheMvcc mvcc = mvccExtras();
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e657f32,f601e0a..1b6179e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@@ -2381,7 -2293,7 +2383,7 @@@ public class GridDhtAtomicCache<K, V> e
try {
GridCacheVersion ver = entry.version();
-- entry.versionedValue(ctx.toCacheObject(v), null, ver, null);
++ entry.versionedValue(ctx.toCacheObject(v), null, ver, null, null);
}
catch (GridCacheEntryRemovedException e) {
assert false : "Entry should not get obsolete while holding lock [entry=" + entry +
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 63b0717,83edab4..a9a7d7c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@@ -1253,8 -1312,8 +1312,8 @@@ public class CacheContinuousQueryHandle
try {
cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
}
- catch (ClusterTopologyCheckedException e) {
+ catch (ClusterTopologyCheckedException ignored) {
- IgniteLogger log = ctx.log(getClass());
+ IgniteLogger log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY);
if (log.isDebugEnabled())
log.debug("Failed to send acknowledge message, node left " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7ceb701,f05d90d..cd4c55c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -439,8 -437,9 +440,9 @@@ public abstract class IgniteTxLocalAdap
CU.subjectId(this, cctx),
null,
resolveTaskName(),
- expiryPlc,
+ expiryPlc0,
- txEntry == null ? keepBinary : txEntry.keepBinary());
+ txEntry == null ? keepBinary : txEntry.keepBinary(),
+ null);
if (res == null) {
if (misses == null)
@@@ -476,20 -475,22 +478,23 @@@
CacheObject cacheVal = cacheCtx.toCacheObject(val);
while (true) {
- GridCacheEntryEx entry = cacheCtx.cache().entryEx(key);
+ GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer);
try {
- GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null, null);
-
- boolean set = setVer != null;
+ T2<CacheObject, GridCacheVersion> verVal = entry.versionedValue(cacheVal,
+ ver,
+ null,
++ null,
+ null);
- if (set)
- ver = setVer;
+ if (log.isDebugEnabled()) {
+ log.debug("Set value loaded from store into entry [" +
+ "oldVer=" + ver +
+ ", newVer=" + verVal.get2() +
+ ", entry=" + entry + ']');
+ }
- if (log.isDebugEnabled())
- log.debug("Set value loaded from store into entry [set=" + set +
- ", curVer=" + ver + ", newVer=" + setVer + ", " +
- "entry=" + entry + ']');
+ ver = verVal.get2();
break;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 9fdadf3,8db68b4..e76ab40
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@@ -664,10 -686,10 +685,11 @@@ public class GridCacheTestEntryEx exten
}
/** @inheritDoc */
- @Override public GridCacheVersion versionedValue(CacheObject val,
+ @Override public T2<CacheObject, GridCacheVersion> versionedValue(CacheObject val,
GridCacheVersion curVer,
GridCacheVersion newVer,
- IgniteCacheExpiryPolicy loadExpiryPlc) {
++ @Nullable IgniteCacheExpiryPolicy loadExpiryPlc,
+ @Nullable ReaderArguments readerArgs) {
assert false;
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
index 0000000,d29231e..b530e36
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@@ -1,0 -1,322 +1,322 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.spi.communication.tcp;
+
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.CyclicBarrier;
+ import java.util.concurrent.TimeUnit;
+ import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.cluster.ClusterNode;
+ import org.apache.ignite.configuration.IgniteConfiguration;
+ import org.apache.ignite.events.Event;
+ import org.apache.ignite.internal.IgniteEx;
+ import org.apache.ignite.internal.IgniteInternalFuture;
+ import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+ import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+ import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.lang.IgniteBiPredicate;
+ import org.apache.ignite.lang.IgnitePredicate;
+ import org.apache.ignite.lang.IgniteRunnable;
+ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+ import org.apache.ignite.testframework.GridTestUtils;
+ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+
+ /**
+ *
+ */
+ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Nodes count. */
+ private static final int NODES_CNT = 4;
+
+ /** Block. */
+ private static volatile boolean block;
+
+ /** Predicate. */
+ private static IgniteBiPredicate<ClusterNode, ClusterNode> pred;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setClockSyncFrequency(300000);
+ cfg.setFailureDetectionTimeout(1000);
+
+ TestCommunicationSpi spi = new TestCommunicationSpi();
+
+ spi.setIdleConnectionTimeout(100);
+ spi.setSharedMemoryPort(-1);
+
+ TcpDiscoverySpi discoSpi = (TcpDiscoverySpi) cfg.getDiscoverySpi();
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setCommunicationSpi(spi);
+ cfg.setDiscoverySpi(discoSpi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ block = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOneNode() throws Exception {
+ pred = new IgniteBiPredicate<ClusterNode, ClusterNode>() {
+ @Override public boolean apply(ClusterNode locNode, ClusterNode rmtNode) {
+ return block && rmtNode.order() == 3;
+ }
+ };
+
+ startGrids(NODES_CNT);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ grid(0).events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event event) {
+ latch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+
+ U.sleep(1000); // Wait for write timeout and closing idle connections.
+
+ block = true;
+
+ grid(0).compute().broadcast(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+
+ assertTrue(latch.await(15, TimeUnit.SECONDS));
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return grid(3).cluster().topologyVersion() == NODES_CNT + 1;
+ }
+ }, 5000));
+
+ for (int i = 0; i < 10; i++) {
+ U.sleep(1000);
+
+ assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size());
+
+ int liveNodesCnt = 0;
+
+ for (int j = 0; j < NODES_CNT; j++) {
+ IgniteEx ignite;
+
+ try {
+ ignite = grid(j);
+
+ log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes());
+
+ ClusterNode locNode = ignite.localNode();
+
+ if (locNode.order() != 3) {
+ assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size());
+
+ for (ClusterNode node : ignite.cluster().nodes())
+ assertTrue(node.order() != 3);
+
+ liveNodesCnt++;
+ }
+ }
+ catch (Exception e) {
+ log.info("Checking topology for grid(" + j + "): no grid in topology.");
+ }
+ }
+
+ assertEquals(NODES_CNT - 1, liveNodesCnt);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTwoNodesEachOther() throws Exception {
+ pred = new IgniteBiPredicate<ClusterNode, ClusterNode>() {
+ @Override public boolean apply(ClusterNode locNode, ClusterNode rmtNode) {
+ return block && (locNode.order() == 2 || locNode.order() == 4) &&
+ (rmtNode.order() == 2 || rmtNode.order() == 4);
+ }
+ };
+
+ startGrids(NODES_CNT);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ grid(0).events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event event) {
+ latch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+
+ U.sleep(1000); // Wait for write timeout and closing idle connections.
+
+ block = true;
+
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ IgniteInternalFuture<Void> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ barrier.await();
+
+ grid(1).compute().withNoFailover().broadcast(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<Void> fut2 = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ barrier.await();
+
+ grid(3).compute().withNoFailover().broadcast(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+
+ return null;
+ }
+ });
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return grid(2).cluster().nodes().size() == NODES_CNT - 1;
+ }
+ }, 5000);
+
+ try {
+ fut1.get();
+ }
+ catch (IgniteCheckedException e) {
+ // No-op.
+ }
+
+ try {
+ fut2.get();
+ }
+ catch (IgniteCheckedException e) {
+ // No-op.
+ }
+
+ long failedNodeOrder = 1 + 2 + 3 + 4;
+
+ for (ClusterNode node : grid(0).cluster().nodes())
+ failedNodeOrder -= node.order();
+
+ for (int i = 0; i < 10; i++) {
+ U.sleep(1000);
+
+ assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size());
+
+ int liveNodesCnt = 0;
+
+ for (int j = 0; j < NODES_CNT; j++) {
+ IgniteEx ignite;
+
+ try {
+ ignite = grid(j);
+
+ log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes());
+
+ ClusterNode locNode = ignite.localNode();
+
+ if (locNode.order() != failedNodeOrder) {
+ assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size());
+
+ for (ClusterNode node : ignite.cluster().nodes())
+ assertTrue(node.order() != failedNodeOrder);
+
+ liveNodesCnt++;
+ }
+ }
+ catch (Exception e) {
+ log.info("Checking topology for grid(" + j + "): no grid in topology.");
+ }
+ }
+
+ assertEquals(NODES_CNT - 1, liveNodesCnt);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
- @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
++ @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+ if (pred.apply(getLocalNode(), node)) {
+ Map<String, Object> attrs = new HashMap<>(node.attributes());
+
+ attrs.put(createAttributeName(ATTR_ADDRS), Collections.singleton("127.0.0.1"));
+ attrs.put(createAttributeName(ATTR_PORT), 47200);
+ attrs.put(createAttributeName(ATTR_EXT_ADDRS), Collections.emptyList());
+ attrs.put(createAttributeName(ATTR_HOST_NAMES), Collections.emptyList());
+
+ ((TcpDiscoveryNode)node).setAttributes(attrs);
+ }
+
- return super.createTcpClient(node);
++ return super.createTcpClient(node, connIdx);
+ }
+
+ /**
+ * @param name Name.
+ */
+ private String createAttributeName(String name) {
+ return getClass().getSimpleName() + '.' + name;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
index 0000000,6e99487..c21e6ce
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
@@@ -1,0 -1,270 +1,265 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.spi.communication.tcp;
+
+ import java.io.IOException;
-import java.io.OutputStream;
+ import java.net.InetAddress;
+ import java.net.ServerSocket;
-import java.net.Socket;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.TimeUnit;
-import org.apache.ignite.Ignite;
+ import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.IgniteException;
+ import org.apache.ignite.cluster.ClusterNode;
+ import org.apache.ignite.configuration.IgniteConfiguration;
+ import org.apache.ignite.events.Event;
+ import org.apache.ignite.internal.IgniteInternalFuture;
+ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+ import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+ import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+ import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.lang.IgnitePredicate;
+ import org.apache.ignite.lang.IgniteRunnable;
+ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
+ import org.apache.ignite.testframework.GridTestUtils;
+ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+
+ /**
+ * Tests that faulty client will be failed if connection can't be established.
+ */
+ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Predicate. */
+ private static final IgnitePredicate<ClusterNode> PRED = new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return block && node.order() == 3;
+ }
+ };
+
+ /** Client mode. */
+ private static boolean clientMode;
+
+ /** Block. */
+ private static volatile boolean block;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setClockSyncFrequency(300000);
+ cfg.setFailureDetectionTimeout(1000);
+ cfg.setClientMode(clientMode);
+
+ TestCommunicationSpi spi = new TestCommunicationSpi();
+
+ spi.setIdleConnectionTimeout(100);
+ spi.setSharedMemoryPort(-1);
+
+ TcpDiscoverySpi discoSpi = (TcpDiscoverySpi) cfg.getDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+ discoSpi.setClientReconnectDisabled(true);
+
+ cfg.setCommunicationSpi(spi);
+ cfg.setDiscoverySpi(discoSpi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ block = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoServerOnHost() throws Exception {
+ testFailClient(null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNotAcceptedConnection() throws Exception {
+ testFailClient(new FakeServer());
+ }
+
+ /**
+ * @param srv Server.
+ * @throws Exception If failed.
+ */
+ private void testFailClient(FakeServer srv) throws Exception {
+ IgniteInternalFuture<Long> fut = null;
+
+ try {
+ if (srv != null)
+ fut = GridTestUtils.runMultiThreadedAsync(srv, 1, "fake-server");
+
+ clientMode = false;
+
+ startGrids(2);
+
+ clientMode = true;
+
+ startGrid(2);
+ startGrid(3);
+
+ U.sleep(1000); // Wait for write timeout and closing idle connections.
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ grid(0).events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event event) {
+ latch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+
+ block = true;
+
+ try {
+ grid(0).compute(grid(0).cluster().forClients()).withNoFailover().broadcast(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+ }
+ catch (IgniteException e) {
+ // No-op.
+ }
+
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return grid(0).cluster().forClients().nodes().size() == 1;
+ }
+ }, 5000));
+
+ for (int i = 0; i < 5; i++) {
+ U.sleep(1000);
+
+ log.info("Check topology (" + (i + 1) + "): " + grid(0).cluster().nodes());
+
+ assertEquals(1, grid(0).cluster().forClients().nodes().size());
+ }
+ }
+ finally {
+ if (srv != null) {
+ srv.stop();
+
+ assert fut != null;
+
+ fut.get();
+ }
+
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Server that emulates connection troubles.
+ */
+ private static class FakeServer implements Runnable {
+ /** Server. */
+ private final ServerSocket srv;
+
+ /** Stop. */
+ private volatile boolean stop;
+
+ /**
+ * Default constructor.
+ */
+ FakeServer() throws IOException {
+ this.srv = new ServerSocket(47200, 50, InetAddress.getByName("127.0.0.1"));
+ }
+
+ /**
+ *
+ */
+ public void stop() {
+ stop = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ while (!stop) {
+ try {
+ U.sleep(10);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ // No-op.
+ }
+ }
+ }
+ finally {
+ U.closeQuiet(srv);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** {@inheritDoc} */
- @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
++ @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
+ if (PRED.apply(node)) {
+ Map<String, Object> attrs = new HashMap<>(node.attributes());
+
+ attrs.put(createAttributeName(ATTR_ADDRS), Collections.singleton("127.0.0.1"));
+ attrs.put(createAttributeName(ATTR_PORT), 47200);
+ attrs.put(createAttributeName(ATTR_EXT_ADDRS), Collections.emptyList());
+ attrs.put(createAttributeName(ATTR_HOST_NAMES), Collections.emptyList());
+
+ ((TcpDiscoveryNode)node).setAttributes(attrs);
+ }
+
- return super.createTcpClient(node);
++ return super.createTcpClient(node, connIdx);
+ }
+
+ /**
+ * @param name Name.
+ */
+ private String createAttributeName(String name) {
+ return getClass().getSimpleName() + '.' + name;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 69a65fe,cbf2ebd..9416621
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@@ -1077,10 -1051,9 +1077,10 @@@ public class IgniteH2Indexing implement
final TableDescriptor tbl = tableDescriptor(spaceName, type);
if (tbl == null)
- throw new CacheException("Failed to find SQL table for type: " + type.name());
+ throw new IgniteSQLException("Failed to find SQL table for type: " + type.name(),
+ IgniteQueryErrorCode.TABLE_NOT_FOUND);
- String sql = generateQuery(qry, tbl);
+ String sql = generateQuery(qry, alias, tbl);
Connection conn = connectionForThread(tbl.schemaName());
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------