You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/11/03 14:21:22 UTC
[1/3] ignite git commit: IGNITE-426 Fixed tests.
Repository: ignite
Updated Branches:
refs/heads/ignite-426-2-reb 2d053b258 -> 20324bc0f
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedSelfTest.java
new file mode 100644
index 0000000..84c9131
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedSelfTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicReplicatedSelfTest
+ extends CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return REPLICATED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
+ return PRIMARY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
deleted file mode 100644
index db5b8cb..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
-import org.apache.ignite.cache.CacheMode;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-
-/**
- *
- */
-public class CacheContinuousQueryFailoverAtomicReplicatedTest
- extends CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest {
- /** {@inheritDoc} */
- @Override protected CacheMode cacheMode() {
- return REPLICATED;
- }
-
- /** {@inheritDoc} */
- @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
- return PRIMARY;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedSelfTest.java
new file mode 100644
index 0000000..7c4f180
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedSelfTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverTxReplicatedSelfTest extends CacheContinuousQueryFailoverTxSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return REPLICATED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
deleted file mode 100644
index 746f0eb..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.cache.CacheMode;
-
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-
-/**
- *
- */
-public class CacheContinuousQueryFailoverTxReplicatedTest extends CacheContinuousQueryFailoverTxTest {
- /** {@inheritDoc} */
- @Override protected CacheMode cacheMode() {
- return REPLICATED;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java
new file mode 100644
index 0000000..789a105
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverTxSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxTest.java
deleted file mode 100644
index 8e3a575..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-
-/**
- *
- */
-public class CacheContinuousQueryFailoverTxTest extends CacheContinuousQueryFailoverAbstractTest {
- /** {@inheritDoc} */
- @Override protected CacheMode cacheMode() {
- return PARTITIONED;
- }
-
- /** {@inheritDoc} */
- @Override protected CacheAtomicityMode atomicityMode() {
- return TRANSACTIONAL;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
index 108b11d..ba5e15b 100644
--- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PSameClassLoaderSelfTest.java
@@ -23,7 +23,11 @@ import java.net.URLClassLoader;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.config.GridTestProperties;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
@@ -41,6 +45,9 @@ public class GridP2PSameClassLoaderSelfTest extends GridCommonAbstractTest {
private static final String TEST_TASK2_NAME = "org.apache.ignite.tests.p2p.P2PTestTaskExternalPath2";
/** */
+ private static final TcpDiscoveryIpFinder FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
private static final ClassLoader CLASS_LOADER;
/** Current deployment mode. Used in {@link #getConfiguration(String)}. */
@@ -66,6 +73,7 @@ public class GridP2PSameClassLoaderSelfTest extends GridCommonAbstractTest {
cfg.setDeploymentMode(depMode);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setHeartbeatFrequency(500);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(FINDER);
cfg.setCacheConfiguration();
@@ -81,10 +89,16 @@ public class GridP2PSameClassLoaderSelfTest extends GridCommonAbstractTest {
@SuppressWarnings({"unchecked"})
private void processTest(boolean isIsolatedDifferentTask, boolean isIsolatedDifferentNode) throws Exception {
try {
- Ignite ignite1 = startGrid(1);
+ final Ignite ignite1 = startGrid(1);
Ignite ignite2 = startGrid(2);
Ignite ignite3 = startGrid(3);
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return ignite1.cluster().nodes().size() == 3;
+ }
+ }, 20000L);
+
Class task1 = CLASS_LOADER.loadClass(TEST_TASK1_NAME);
Class task2 = CLASS_LOADER.loadClass(TEST_TASK2_NAME);
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index e54cf98..8e20f9d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -18,82 +18,13 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
-import org.apache.ignite.internal.processors.cache.CacheLocalQueryMetricsSelfTest;
-import org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsDistributedSelfTest;
-import org.apache.ignite.internal.processors.cache.CachePartitionedQueryMetricsLocalSelfTest;
-import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsDistributedSelfTest;
-import org.apache.ignite.internal.processors.cache.CacheReplicatedQueryMetricsLocalSelfTest;
-import org.apache.ignite.internal.processors.cache.CacheScanPartitionQueryFallbackSelfTest;
-import org.apache.ignite.internal.processors.cache.GridCacheCrossCacheQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.GridCacheQueryIndexDisabledSelfTest;
-import org.apache.ignite.internal.processors.cache.GridCacheQueryIndexingDisabledSelfTest;
-import org.apache.ignite.internal.processors.cache.GridCacheQueryInternalKeysSelfTest;
-import org.apache.ignite.internal.processors.cache.GridCacheQuerySerializationSelfTest;
-import org.apache.ignite.internal.processors.cache.GridCacheReduceQueryMultithreadedSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheCollocatedQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheFieldsQueryNoDataSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheLargeResultSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapEvictQueryTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapTieredMultithreadedSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingQueryErrorTest;
-import org.apache.ignite.internal.processors.cache.IgniteCachePartitionedQueryMultiThreadedSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheQueryEvictsMultiThreadedSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheQueryIndexSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheQueryLoadSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheQueryMultiThreadedOffHeapTieredSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheQueryMultiThreadedSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheQueryOffheapMultiThreadedSelfTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheSqlQueryMultiThreadedSelfTest;
-import org.apache.ignite.internal.processors.cache.SqlFieldsQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicFieldsQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicNearEnabledFieldsQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicNearEnabledQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheAtomicQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheClientQueryReplicatedNodeRestartSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQueryP2PEnabledSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQueryP2PDisabledSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest2;
-import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQueryP2PDisabledSelfTest;
-import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedOnlySelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedP2PDisabledSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest;
-import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
-import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest;
-import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest;
-import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest;
-import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryReplicatedSelfTest;
-import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
-import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest;
-import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
-import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest;
-import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;
/**
* Test suite for cache queries.
@@ -172,10 +103,10 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class);
- suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.class);
- suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedTest.class);
- suite.addTestSuite(CacheContinuousQueryFailoverTxTest.class);
- suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
+ suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
// Reduce fields queries.
// suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);
[3/3] ignite git commit: IGNITE-426 Fixed tests.
Posted by nt...@apache.org.
IGNITE-426 Fixed tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/20324bc0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/20324bc0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/20324bc0
Branch: refs/heads/ignite-426-2-reb
Commit: 20324bc0f0d5b125d8f7ba18c8824e97a5ffde59
Parents: 2d053b2
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Nov 3 16:21:19 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Nov 3 16:21:19 2015 +0300
----------------------------------------------------------------------
...ContinuousQueryFailoverAbstractSelfTest.java | 2531 ++++++++++++++++++
...acheContinuousQueryFailoverAbstractTest.java | 2522 -----------------
...ryFailoverAtomicNearEnabledSelfSelfTest.java | 46 +
...FailoverAtomicPrimaryWriteOrderSelfTest.java | 44 +
...ueryFailoverAtomicPrimaryWriteOrderTest.java | 44 -
...usQueryFailoverAtomicReplicatedSelfTest.java | 40 +
...inuousQueryFailoverAtomicReplicatedTest.java | 40 -
...inuousQueryFailoverTxReplicatedSelfTest.java | 32 +
...ContinuousQueryFailoverTxReplicatedTest.java | 32 -
.../CacheContinuousQueryFailoverTxSelfTest.java | 39 +
.../CacheContinuousQueryFailoverTxTest.java | 39 -
.../p2p/GridP2PSameClassLoaderSelfTest.java | 16 +-
.../IgniteCacheQuerySelfTestSuite.java | 85 +-
13 files changed, 2755 insertions(+), 2755 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
new file mode 100644
index 0000000..edf257e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -0,0 +1,2531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.PAX;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int BACKUP_ACK_THRESHOLD = 100;
+
+ /** */
+ private static volatile boolean err;
+
+ /** */
+ private boolean client;
+
+ /** */
+ private int backups = 1;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ TestCommunicationSpi commSpi = new TestCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+ commSpi.setIdleConnectionTimeout(100);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(cacheMode());
+ ccfg.setAtomicityMode(atomicityMode());
+ ccfg.setAtomicWriteOrderMode(writeOrderMode());
+ ccfg.setBackups(backups);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setNearConfiguration(nearCacheConfiguration());
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @return Near cache configuration.
+ */
+ protected NearCacheConfiguration nearCacheConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 5 * 60_000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ err = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @return Cache mode.
+ */
+ protected abstract CacheMode cacheMode();
+
+ /**
+ * @return Atomicity mode.
+ */
+ protected abstract CacheAtomicityMode atomicityMode();
+
+ /**
+ * @return Write order mode for atomic cache.
+ */
+ protected CacheAtomicWriteOrderMode writeOrderMode() {
+ return PRIMARY;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFirstFilteredEvent() throws Exception {
+ this.backups = 2;
+
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
+
+ final CacheEventListener3 lsnr = new CacheEventListener3();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ qry.setRemoteFilter(new CacheEventFilter());
+
+ try (QueryCursor<?> cur = qryClnCache.query(qry)) {
+ List<Integer> keys = testKeys(grid(0).cache(null), 1);
+
+ for (Integer key : keys)
+ qryClnCache.put(key, -1);
+
+ qryClnCache.put(keys.get(0), 100);
+ }
+
+ assertEquals(lsnr.evts.size(), 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRebalanceVersion() throws Exception {
+ Ignite ignite0 = startGrid(0);
+ GridDhtPartitionTopology top0 = ((IgniteKernal)ignite0).context().cache().context().cacheContext(1).topology();
+
+ assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(1)));
+ assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(2)));
+
+ Ignite ignite1 = startGrid(1);
+ GridDhtPartitionTopology top1 = ((IgniteKernal)ignite1).context().cache().context().cacheContext(1).topology();
+
+ waitRebalanceFinished(ignite0, 2);
+ waitRebalanceFinished(ignite1, 2);
+
+ assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(3)));
+ assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(3)));
+
+ Ignite ignite2 = startGrid(2);
+ GridDhtPartitionTopology top2 = ((IgniteKernal)ignite2).context().cache().context().cacheContext(1).topology();
+
+ waitRebalanceFinished(ignite0, 3);
+ waitRebalanceFinished(ignite1, 3);
+ waitRebalanceFinished(ignite2, 3);
+
+ assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(4)));
+ assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(4)));
+ assertFalse(top2.rebalanceFinished(new AffinityTopologyVersion(4)));
+
+ client = true;
+
+ Ignite ignite3 = startGrid(3);
+ GridDhtPartitionTopology top3 = ((IgniteKernal)ignite3).context().cache().context().cacheContext(1).topology();
+
+ assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(4)));
+ assertTrue(top1.rebalanceFinished(new AffinityTopologyVersion(4)));
+ assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(4)));
+ assertTrue(top3.rebalanceFinished(new AffinityTopologyVersion(4)));
+
+ stopGrid(1);
+
+ waitRebalanceFinished(ignite0, 5);
+ waitRebalanceFinished(ignite2, 5);
+ waitRebalanceFinished(ignite3, 5);
+
+ stopGrid(3);
+
+ assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(6)));
+ assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(6)));
+
+ stopGrid(0);
+
+ waitRebalanceFinished(ignite2, 7);
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param topVer Topology version.
+ * @throws Exception If failed.
+ */
+ private void waitRebalanceFinished(Ignite ignite, long topVer) throws Exception {
+ final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer);
+
+ final GridDhtPartitionTopology top =
+ ((IgniteKernal)ignite).context().cache().context().cacheContext(1).topology();
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return top.rebalanceFinished(topVer0);
+ }
+ }, 5000);
+
+ assertTrue(top.rebalanceFinished(topVer0));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOneBackup() throws Exception {
+ checkBackupQueue(1, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOneBackupClientUpdate() throws Exception {
+ checkBackupQueue(1, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartStopQuery() throws Exception {
+ this.backups = 1;
+
+ final int SRV_NODES = 3;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ final Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> clnCache = qryClient.cache(null);
+
+ IgniteOutClosure<IgniteCache<Integer, Integer>> rndCache =
+ new IgniteOutClosure<IgniteCache<Integer, Integer>>() {
+ int cnt = 0;
+
+ @Override public IgniteCache<Integer, Integer> apply() {
+ ++cnt;
+
+ return grid(cnt % SRV_NODES + 1).cache(null);
+ }
+ };
+
+ Ignite igniteSrv = ignite(0);
+
+ IgniteCache<Object, Object> srvCache = igniteSrv.cache(null);
+
+ List<Integer> keys = testKeys(srvCache, 3);
+
+ int keyCnt = keys.size();
+
+ for (int j = 0; j < 50; ++j) {
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final CacheEventListener3 lsnr = new CacheEventListener3();
+
+ qry.setLocalListener(lsnr);
+
+ qry.setRemoteFilter(lsnr);
+
+ int keyIter = 0;
+
+ for (; keyIter < keyCnt / 2; keyIter++) {
+ int key = keys.get(keyIter);
+
+ rndCache.apply().put(key, key);
+ }
+
+ assert lsnr.evts.isEmpty();
+
+ QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
+
+ Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+ final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
+ Affinity<Object> aff = affinity(srvCache);
+
+ boolean filtered = false;
+
+ for (; keyIter < keys.size(); keyIter++) {
+ int key = keys.get(keyIter);
+
+ int val = filtered ? 1 : 2;
+
+ log.info("Put [key=" + key + ", val=" + val + ", part=" + aff.partition(key) + ']');
+
+ T2<Object, Object> t = updates.get(key);
+
+ if (t == null) {
+ // Check filtered.
+ if (!filtered) {
+ updates.put(key, new T2<>((Object)val, null));
+
+ expEvts.add(new T3<>((Object)key, (Object)val, null));
+ }
+ }
+ else {
+ // Check filtered.
+ if (!filtered) {
+ updates.put(key, new T2<>((Object)val, (Object)t.get1()));
+
+ expEvts.add(new T3<>((Object)key, (Object)val, (Object)t.get1()));
+ }
+ }
+
+ rndCache.apply().put(key, val);
+
+ filtered = !filtered;
+ }
+
+ checkEvents(expEvts, lsnr, false);
+
+ query.close();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLeftPrimaryAndBackupNodes() throws Exception {
+ if (cacheMode() == REPLICATED)
+ return;
+
+ this.backups = 1;
+
+ final int SRV_NODES = 3;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ final Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final CacheEventListener3 lsnr = new CacheEventListener3();
+
+ qry.setLocalListener(lsnr);
+
+ qry.setRemoteFilter(lsnr);
+
+ IgniteCache<Object, Object> clnCache = qryClient.cache(null);
+
+ QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
+
+ Ignite igniteSrv = ignite(0);
+
+ IgniteCache<Object, Object> srvCache = igniteSrv.cache(null);
+
+ Affinity<Object> aff = affinity(srvCache);
+
+ List<Integer> keys = testKeys(srvCache, 1);
+
+ Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(keys.get(0));
+
+ Collection<UUID> ids = F.transform(nodes, new C1<ClusterNode, UUID>() {
+ @Override public UUID apply(ClusterNode node) {
+ return node.id();
+ }
+ });
+
+ int keyIter = 0;
+
+ boolean filtered = false;
+
+ Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+ final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
+ for (; keyIter < keys.size() / 2; keyIter++) {
+ int key = keys.get(keyIter);
+
+ log.info("Put [key=" + key + ", part=" + aff.partition(key)
+ + ", filtered=" + filtered + ']');
+
+ T2<Object, Object> t = updates.get(key);
+
+ Integer val = filtered ?
+ (key % 2 == 0 ? key + 1 : key) :
+ key * 2;
+
+ if (t == null) {
+ updates.put(key, new T2<>((Object)val, null));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, null));
+ }
+ else {
+ updates.put(key, new T2<>((Object)val, (Object)key));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
+ }
+
+ srvCache.put(key, val);
+
+ filtered = !filtered;
+ }
+
+ checkEvents(expEvts, lsnr, false);
+
+ List<Thread> stopThreads = new ArrayList<>(3);
+
+ // Stop nodes which owning this partition.
+ for (int i = 0; i < SRV_NODES; i++) {
+ Ignite ignite = ignite(i);
+
+ if (ids.contains(ignite.cluster().localNode().id())) {
+ final int i0 = i;
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ spi.skipAllMsg = true;
+
+ stopThreads.add(new Thread() {
+ @Override public void run() {
+ stopGrid(i0, true);
+ }
+ });
+ }
+ }
+
+ // Stop and join threads.
+ for (Thread t : stopThreads)
+ t.start();
+
+ for (Thread t : stopThreads)
+ t.join();
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ // (SRV_NODES + 1 client node) - 1 primary - backup nodes.
+ return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node */)
+ - 1 /** Primary node */ - backups;
+ }
+ }, 5000L);
+
+ for (; keyIter < keys.size(); keyIter++) {
+ int key = keys.get(keyIter);
+
+ log.info("Put [key=" + key + ", filtered=" + filtered + ']');
+
+ T2<Object, Object> t = updates.get(key);
+
+ Integer val = filtered ?
+ (key % 2 == 0 ? key + 1 : key) :
+ key * 2;
+
+ if (t == null) {
+ updates.put(key, new T2<>((Object)val, null));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, null));
+ }
+ else {
+ updates.put(key, new T2<>((Object)val, (Object)key));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
+ }
+
+ clnCache.put(key, val);
+
+ filtered = !filtered;
+ }
+
+ checkEvents(expEvts, lsnr, false);
+
+ query.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoteFilter() throws Exception {
+ this.backups = 2;
+
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+ if (cacheMode() != REPLICATED)
+ assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups());
+
+ Affinity<Object> aff = qryClient.affinity(null);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final CacheEventListener3 lsnr = new CacheEventListener3();
+
+ qry.setLocalListener(lsnr);
+
+ qry.setRemoteFilter(lsnr);
+
+ int PARTS = 10;
+
+ QueryCursor<?> cur = qryClientCache.query(qry);
+
+ Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+ final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
+ for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
+ log.info("Stop iteration: " + i);
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+
+ Ignite ignite = ignite(i);
+
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ List<Integer> keys = testKeys(cache, PARTS);
+
+ boolean first = true;
+
+ boolean filtered = false;
+
+ for (Integer key : keys) {
+ log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key)
+ + ", filtered=" + filtered + ']');
+
+ T2<Object, Object> t = updates.get(key);
+
+ Integer val = filtered ?
+ (key % 2 == 0 ? key + 1 : key) :
+ key * 2;
+
+ if (t == null) {
+ updates.put(key, new T2<>((Object)val, null));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, null));
+ }
+ else {
+ updates.put(key, new T2<>((Object)val, (Object)key));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
+ }
+
+ cache.put(key, val);
+
+ if (first) {
+ spi.skipMsg = true;
+
+ first = false;
+ }
+
+ filtered = !filtered;
+ }
+
+ stopGrid(i);
+
+ boolean check = GridTestUtils.waitForCondition(new PAX() {
+ @Override public boolean applyx() throws IgniteCheckedException {
+ return expEvts.size() == lsnr.keys.size();
+ }
+ }, 5000L);
+
+ if (!check) {
+ Set<Integer> keys0 = new HashSet<>(keys);
+
+ keys0.removeAll(lsnr.keys);
+
+ log.info("Missed events for keys: " + keys0);
+
+ fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + keys0.size() + ']');
+ }
+
+ checkEvents(expEvts, lsnr, false);
+ }
+
+ cur.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testThreeBackups() throws Exception {
+ if (cacheMode() == REPLICATED)
+ return;
+
+ checkBackupQueue(3, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDebug() {
+ return true;
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param updateFromClient If {@code true} executes cache update from client node.
+ * @throws Exception If failed.
+ */
+ private void checkBackupQueue(int backups, boolean updateFromClient) throws Exception {
+ this.backups = atomicityMode() == CacheAtomicityMode.ATOMIC ? backups :
+ backups < 2 ? 2 : backups;
+
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+ Affinity<Object> aff = qryClient.affinity(null);
+
+ CacheEventListener1 lsnr = new CacheEventListener1(false);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = qryClientCache.query(qry);
+
+ int PARTS = 10;
+
+ Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+ List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
+ for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
+ log.info("Stop iteration: " + i);
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+
+ Ignite ignite = ignite(i);
+
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ List<Integer> keys = testKeys(cache, PARTS);
+
+ CountDownLatch latch = new CountDownLatch(keys.size());
+
+ lsnr.latch = latch;
+
+ boolean first = true;
+
+ for (Integer key : keys) {
+ log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']');
+
+ T2<Object, Object> t = updates.get(key);
+
+ if (updateFromClient) {
+ if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+ try (Transaction tx = qryClient.transactions().txStart()) {
+ qryClientCache.put(key, key);
+
+ tx.commit();
+ }
+ catch (CacheException | ClusterTopologyException e) {
+ log.warning("Failed put. [Key=" + key + ", val=" + key + "]");
+
+ continue;
+ }
+ }
+ else
+ qryClientCache.put(key, key);
+ }
+ else {
+ if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+ try (Transaction tx = ignite.transactions().txStart()) {
+ cache.put(key, key);
+
+ tx.commit();
+ }
+ catch (CacheException | ClusterTopologyException e) {
+ log.warning("Failed put. [Key=" + key + ", val=" + key + "]");
+
+ continue;
+ }
+ }
+ else
+ cache.put(key, key);
+ }
+
+ if (t == null) {
+ updates.put(key, new T2<>((Object)key, null));
+
+ expEvts.add(new T3<>((Object)key, (Object)key, null));
+ }
+ else {
+ updates.put(key, new T2<>((Object)key, (Object)key));
+
+ expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
+ }
+
+ if (first) {
+ spi.skipMsg = true;
+
+ first = false;
+ }
+ }
+
+ stopGrid(i);
+
+ if (!latch.await(5, SECONDS)) {
+ Set<Integer> keys0 = new HashSet<>(keys);
+
+ keys0.removeAll(lsnr.keys);
+
+ log.info("Missed events for keys: " + keys0);
+
+ fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+ }
+
+ checkEvents(expEvts, lsnr);
+ }
+
+ for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
+ log.info("Start iteration: " + i);
+
+ Ignite ignite = startGrid(i);
+
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ List<Integer> keys = testKeys(cache, PARTS);
+
+ CountDownLatch latch = new CountDownLatch(keys.size());
+
+ lsnr.latch = latch;
+
+ for (Integer key : keys) {
+ log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']');
+
+ T2<Object, Object> t = updates.get(key);
+
+ if (t == null) {
+ updates.put(key, new T2<>((Object)key, null));
+
+ expEvts.add(new T3<>((Object)key, (Object)key, null));
+ }
+ else {
+ updates.put(key, new T2<>((Object)key, (Object)key));
+
+ expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
+ }
+
+ if (updateFromClient)
+ qryClientCache.put(key, key);
+ else
+ cache.put(key, key);
+ }
+
+ if (!latch.await(10, SECONDS)) {
+ Set<Integer> keys0 = new HashSet<>(keys);
+
+ keys0.removeAll(lsnr.keys);
+
+ log.info("Missed events for keys: " + keys0);
+
+ fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+ }
+
+ checkEvents(expEvts, lsnr);
+ }
+
+ cur.close();
+
+ assertFalse("Unexpected error during test, see log for details.", err);
+ }
+
+ /**
+ * @param expEvts Expected events.
+ * @param lsnr Listener.
+ */
+ private void checkEvents(List<T3<Object, Object, Object>> expEvts, CacheEventListener1 lsnr) {
+ for (T3<Object, Object, Object> exp : expEvts) {
+ CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());
+
+ assertNotNull("No event for key: " + exp.get1(), e);
+ assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
+ }
+
+ expEvts.clear();
+
+ lsnr.evts.clear();
+ }
+
+ /**
+ * @param expEvts Expected events.
+ * @param lsnr Listener.
+ * @param lostAllow If {@code true} than won't assert on lost events.
+ */
+ private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
+ boolean lostAllow) throws Exception {
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return expEvts.size() == lsnr.size();
+ }
+ }, 2000L);
+
+ Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size());
+
+ for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
+ prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
+
+ List<T3<Object, Object, Object>> lostEvents = new ArrayList<>();
+
+ for (T3<Object, Object, Object> exp : expEvts) {
+ List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
+
+ if (F.eq(exp.get2(), exp.get3()))
+ continue;
+
+ if (rcvdEvts == null || rcvdEvts.isEmpty()) {
+ lostEvents.add(exp);
+
+ continue;
+ }
+
+ Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator();
+
+ boolean found = false;
+
+ while (iter.hasNext()) {
+ CacheEntryEvent<?, ?> e = iter.next();
+
+ if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue()))
+ && equalOldValue(e, exp)) {
+ found = true;
+
+ iter.remove();
+
+ break;
+ }
+ }
+
+ // Lost event is acceptable.
+ if (!found)
+ lostEvents.add(exp);
+ }
+
+ boolean dup = false;
+
+ // Check duplicate.
+ if (!lsnr.evts.isEmpty()) {
+ for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) {
+ if (!evts.isEmpty()) {
+ for (CacheEntryEvent<?, ?> e : evts) {
+ boolean found = false;
+
+ for (T3<Object, Object, Object> lostEvt : lostEvents) {
+ if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) {
+ found = true;
+
+ lostEvents.remove(lostEvt);
+
+ break;
+ }
+ }
+
+ if (!found) {
+ dup = true;
+
+ break;
+ }
+ }
+ }
+ }
+
+ if (dup) {
+ for (T3<Object, Object, Object> e : lostEvents)
+ log.error("Lost event: " + e);
+
+ for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) {
+ if (!e.isEmpty()) {
+ for (CacheEntryEvent<?, ?> event : e) {
+ List<CacheEntryEvent<?, ?>> entries = new ArrayList<>();
+
+ for (CacheEntryEvent<?, ?> ev0 : prevMap.get(event.getKey())) {
+ if (F.eq(event.getValue(), ev0.getValue()) && F.eq(event.getOldValue(),
+ ev0.getOldValue()))
+ entries.add(ev0);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (!lostAllow && !lostEvents.isEmpty()) {
+ log.error("Lost event cnt: " + lostEvents.size());
+
+ for (T3<Object, Object, Object> e : lostEvents)
+ log.error("Lost event: " + e);
+
+ fail("Lose events, see log for details.");
+ }
+
+ log.error("Lost event cnt: " + lostEvents.size());
+
+ expEvts.clear();
+
+ lsnr.evts.clear();
+ lsnr.vals.clear();
+ }
+
+ /**
+ * @param e Event
+ * @param expVals expected value
+ * @return {@code True} if entries has the same key, value and oldValue. If cache start without backups
+ * than oldValue ignoring in comparison.
+ */
+ private boolean equalOldValue(CacheEntryEvent<?, ?> e, T3<Object, Object, Object> expVals) {
+ return (e.getOldValue() == null && expVals.get3() == null) // Both null
+ || (e.getOldValue() != null && expVals.get3() != null // Equals
+ && e.getOldValue().equals(expVals.get3()))
+ || (backups == 0); // If we start without backup than oldValue might be lose.
+ }
+
+ /**
+ * @param expEvts Expected events.
+ * @param lsnr Listener.
+ */
+ private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener3 lsnr,
+ boolean allowLoseEvent) throws Exception {
+ if (!allowLoseEvent)
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return lsnr.evts.size() == expEvts.size();
+ }
+ }, 2000L);
+
+ for (T3<Object, Object, Object> exp : expEvts) {
+ CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());
+
+ assertNotNull("No event for key: " + exp.get1(), e);
+ assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
+
+ if (allowLoseEvent)
+ lsnr.evts.remove(exp.get1());
+ }
+
+ if (allowLoseEvent)
+ assert lsnr.evts.isEmpty();
+
+ expEvts.clear();
+
+ lsnr.evts.clear();
+ lsnr.keys.clear();
+ }
+
+ /**
+ * @param cache Cache.
+ * @param parts Number of partitions.
+ * @return Keys.
+ */
+ private List<Integer> testKeys(IgniteCache<Object, Object> cache, int parts) {
+ Ignite ignite = cache.unwrap(Ignite.class);
+
+ List<Integer> res = new ArrayList<>();
+
+ Affinity<Object> aff = ignite.affinity(cache.getName());
+
+ ClusterNode node = ignite.cluster().localNode();
+
+ int[] nodeParts = aff.primaryPartitions(node);
+
+ final int KEYS_PER_PART = 50;
+
+ for (int i = 0; i < parts; i++) {
+ int part = nodeParts[i];
+
+ int cnt = 0;
+
+ for (int key = 0; key < 100_000; key++) {
+ if (aff.partition(key) == part && aff.isPrimary(node, key)) {
+ res.add(key);
+
+ if (++cnt == KEYS_PER_PART)
+ break;
+ }
+ }
+
+ assertEquals(KEYS_PER_PART, cnt);
+ }
+
+ assertEquals(parts * KEYS_PER_PART, res.size());
+
+ return res;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBackupQueueCleanupClientQuery() throws Exception {
+ startGridsMultiThreaded(2);
+
+ client = true;
+
+ Ignite qryClient = startGrid(2);
+
+ CacheEventListener1 lsnr = new CacheEventListener1(false);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = qryClient.cache(null).query(qry);
+
+ final Collection<Object> backupQueue = backupQueue(ignite(1));
+
+ assertEquals(0, backupQueue.size());
+
+ IgniteCache<Object, Object> cache0 = ignite(0).cache(null);
+
+ List<Integer> keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD);
+
+ CountDownLatch latch = new CountDownLatch(keys.size());
+
+ lsnr.latch = latch;
+
+ for (Integer key : keys) {
+ log.info("Put: " + key);
+
+ cache0.put(key, key);
+ }
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return backupQueue.isEmpty();
+ }
+ }, 2000);
+
+ assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
+
+ if (!latch.await(5, SECONDS))
+ fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+
+ keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD / 2);
+
+ latch = new CountDownLatch(keys.size());
+
+ lsnr.latch = latch;
+
+ for (Integer key : keys)
+ cache0.put(key, key);
+
+ final long ACK_FREQ = 5000;
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return backupQueue.isEmpty();
+ }
+ }, ACK_FREQ + 2000);
+
+ assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty());
+
+ if (!latch.await(5, SECONDS))
+ fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+
+ cur.close();
+
+ assertFalse("Unexpected error during test, see log for details.", err);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBackupQueueCleanupServerQuery() throws Exception {
+ Ignite qryClient = startGridsMultiThreaded(2);
+
+ CacheEventListener1 lsnr = new CacheEventListener1(false);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ IgniteCache<Object, Object> cache = qryClient.cache(null);
+
+ QueryCursor<?> cur = cache.query(qry);
+
+ final Collection<Object> backupQueue = backupQueue(ignite(1));
+
+ assertEquals(0, backupQueue.size());
+
+ List<Integer> keys = primaryKeys(cache, BACKUP_ACK_THRESHOLD);
+
+ CountDownLatch latch = new CountDownLatch(keys.size());
+
+ lsnr.latch = latch;
+
+ for (Integer key : keys) {
+ log.info("Put: " + key);
+
+ cache.put(key, key);
+ }
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return backupQueue.isEmpty();
+ }
+ }, 3000);
+
+ assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
+
+ if (!latch.await(5, SECONDS))
+ fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+
+ cur.close();
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @return Backup queue for test query.
+ */
+ private Collection<Object> backupQueue(Ignite ignite) {
+ GridContinuousProcessor proc = ((IgniteKernal)ignite).context().continuous();
+
+ ConcurrentMap<Object, Object> infos = GridTestUtils.getFieldValue(proc, "rmtInfos");
+
+ Collection<Object> backupQueue = null;
+
+ for (Object info : infos.values()) {
+ GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
+
+ if (hnd.isForQuery() && hnd.cacheName() == null) {
+ backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue");
+
+ break;
+ }
+ }
+
+ assertNotNull(backupQueue);
+
+ return backupQueue;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFailover() throws Exception {
+ this.backups = 2;
+
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ final Ignite qryCln = startGrid(SRV_NODES);
+
+ client = false;
+
+ final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null);
+
+ final CacheEventListener2 lsnr = new CacheEventListener2();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = qryClnCache.query(qry);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
+
+ boolean processorPut = false;
+
+ IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ final int idx = SRV_NODES + 1;
+
+ while (!stop.get() && !err) {
+ log.info("Start node: " + idx);
+
+ startGrid(idx);
+
+ Thread.sleep(200);
+
+ log.info("Stop node: " + idx);
+
+ try {
+ stopGrid(idx);
+ }
+ catch (Exception e) {
+ log.warning("Failed to stop nodes.", e);
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ assertTrue(checkLatch.compareAndSet(null, latch));
+
+ if (!stop.get()) {
+ log.info("Wait for event check.");
+
+ assertTrue(latch.await(1, MINUTES));
+ }
+ }
+
+ return null;
+ }
+ });
+
+ final Map<Integer, Integer> vals = new HashMap<>();
+
+ final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
+
+ try {
+ long stopTime = System.currentTimeMillis() + 60_000;
+
+ final int PARTS = qryCln.affinity(null).partitions();
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (System.currentTimeMillis() < stopTime) {
+ Integer key = rnd.nextInt(PARTS);
+
+ Integer prevVal = vals.get(key);
+ Integer val = vals.get(key);
+
+ if (val == null)
+ val = 0;
+ else
+ val = val + 1;
+
+ if (processorPut && prevVal != null) {
+ if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+ try (Transaction tx = qryCln.transactions().txStart()) {
+ qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+ @Override public Void process(MutableEntry<Object, Object> e,
+ Object... arg) throws EntryProcessorException {
+ e.setValue(arg[0]);
+
+ return null;
+ }
+ }, val);
+
+ tx.commit();
+ }
+ catch (CacheException | ClusterTopologyException e) {
+ log.warning("Failed put. [Key=" + key + ", val=" + val + "]");
+
+ continue;
+ }
+ }
+ else
+ qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+ @Override public Void process(MutableEntry<Object, Object> e,
+ Object... arg) throws EntryProcessorException {
+ e.setValue(arg[0]);
+
+ return null;
+ }
+ }, val);
+ }
+ else {
+ if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
+ try (Transaction tx = qryCln.transactions().txStart()) {
+ qryClnCache.put(key, val);
+
+ tx.commit();
+ }
+ catch (CacheException | ClusterTopologyException e) {
+ log.warning("Failed put. [Key=" + key + ", val=" + val + "]");
+
+ continue;
+ }
+ }
+ else
+ qryClnCache.put(key, val);
+ }
+
+ processorPut = !processorPut;
+
+ vals.put(key, val);
+
+ List<T2<Integer, Integer>> keyEvts = expEvts.get(key);
+
+ if (keyEvts == null) {
+ keyEvts = new ArrayList<>();
+
+ expEvts.put(key, keyEvts);
+ }
+
+ keyEvts.add(new T2<>(val, prevVal));
+
+ CountDownLatch latch = checkLatch.get();
+
+ if (latch != null) {
+ log.info("Check events.");
+
+ checkLatch.set(null);
+
+ boolean success = false;
+
+ try {
+ if (err)
+ break;
+
+ boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return checkEvents(false, expEvts, lsnr);
+ }
+ }, 10_000);
+
+ if (!check)
+ assertTrue(checkEvents(true, expEvts, lsnr));
+
+ success = true;
+
+ log.info("Events checked.");
+ }
+ finally {
+ if (!success)
+ err = true;
+
+ latch.countDown();
+ }
+ }
+ }
+ }
+ finally {
+ stop.set(true);
+ }
+
+ CountDownLatch latch = checkLatch.get();
+
+ if (latch != null)
+ latch.countDown();
+
+ restartFut.get();
+
+ boolean check = true;
+
+ if (!expEvts.isEmpty())
+ check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return checkEvents(false, expEvts, lsnr);
+ }
+ }, 10_000);
+
+ if (!check)
+ assertTrue(checkEvents(true, expEvts, lsnr));
+
+ cur.close();
+
+ assertFalse("Unexpected error during test, see log for details.", err);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFailoverFilter() throws Exception {
+ this.backups = 2;
+
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+ final CacheEventListener2 lsnr = new CacheEventListener2();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ qry.setRemoteFilter(new CacheEventFilter());
+
+ QueryCursor<?> cur = qryClientCache.query(qry);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
+
+ IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ final int idx = SRV_NODES + 1;
+
+ while (!stop.get() && !err) {
+ log.info("Start node: " + idx);
+
+ startGrid(idx);
+
+ Thread.sleep(200);
+
+ log.info("Stop node: " + idx);
+
+ stopGrid(idx);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ assertTrue(checkLatch.compareAndSet(null, latch));
+
+ if (!stop.get()) {
+ log.info("Wait for event check.");
+
+ assertTrue(latch.await(1, MINUTES));
+ }
+ }
+
+ return null;
+ }
+ });
+
+ final Map<Integer, Integer> vals = new HashMap<>();
+
+ final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
+
+ try {
+ long stopTime = System.currentTimeMillis() + 60_000;
+
+ final int PARTS = qryClient.affinity(null).partitions();
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ boolean filtered = false;
+
+ boolean processorPut = false;
+
+ while (System.currentTimeMillis() < stopTime) {
+ Integer key = rnd.nextInt(PARTS);
+
+ Integer prevVal = vals.get(key);
+ Integer val = vals.get(key);
+
+ if (val == null)
+ val = 0;
+ else
+ val = Math.abs(val) + 1;
+
+ if (filtered)
+ val = -val;
+
+ if (processorPut && prevVal != null) {
+ qryClientCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+ @Override public Void process(MutableEntry<Object, Object> entry,
+ Object... arguments) throws EntryProcessorException {
+ entry.setValue(arguments[0]);
+
+ return null;
+ }
+ }, val);
+ }
+ else
+ qryClientCache.put(key, val);
+
+ processorPut = !processorPut;
+
+ vals.put(key, val);
+
+ if (val >= 0) {
+ List<T2<Integer, Integer>> keyEvts = expEvts.get(key);
+
+ if (keyEvts == null) {
+ keyEvts = new ArrayList<>();
+
+ expEvts.put(key, keyEvts);
+ }
+
+ keyEvts.add(new T2<>(val, prevVal));
+ }
+
+ filtered = !filtered;
+
+ CountDownLatch latch = checkLatch.get();
+
+ if (latch != null) {
+ log.info("Check events.");
+
+ checkLatch.set(null);
+
+ boolean success = false;
+
+ try {
+ if (err)
+ break;
+
+ boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return checkEvents(false, expEvts, lsnr);
+ }
+ }, 10_000);
+
+ if (!check)
+ assertTrue(checkEvents(true, expEvts, lsnr));
+
+ success = true;
+
+ log.info("Events checked.");
+ }
+ finally {
+ if (!success)
+ err = true;
+
+ latch.countDown();
+ }
+ }
+ }
+ }
+ finally {
+ stop.set(true);
+ }
+
+ CountDownLatch latch = checkLatch.get();
+
+ if (latch != null)
+ latch.countDown();
+
+ restartFut.get();
+
+ boolean check = true;
+
+ if (!expEvts.isEmpty()) {
+ check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return checkEvents(false, expEvts, lsnr);
+ }
+ }, 10_000);
+ }
+
+ if (!check)
+ assertTrue(checkEvents(true, expEvts, lsnr));
+
+ cur.close();
+
+ assertFalse("Unexpected error during test, see log for details.", err);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFailoverStartStopBackup() throws Exception {
+ failoverStartStopFilter(2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartStop() throws Exception {
+ this.backups = 2;
+
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
+
+ Affinity<Object> aff = qryClient.affinity(null);
+
+ final CacheEventListener2 lsnr = new CacheEventListener2();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ qry.setRemoteFilter(new CacheEventFilter());
+
+ QueryCursor<?> cur = qryClnCache.query(qry);
+
+ for (int i = 0; i < 10; i++) {
+ final int idx = i % (SRV_NODES - 1);
+
+ log.info("Stop node: " + idx);
+
+ stopGrid(idx);
+
+ awaitPartitionMapExchange();
+
+ List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
+
+ for (int j = 0; j < aff.partitions(); j++) {
+ Integer oldVal = (Integer)qryClnCache.get(j);
+
+ qryClnCache.put(j, i);
+
+ afterRestEvents.add(new T3<>((Object)j, (Object)i, (Object)oldVal));
+ }
+
+ checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
+
+ log.info("Start node: " + idx);
+
+ startGrid(idx);
+ }
+
+ cur.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void failoverStartStopFilter(int backups) throws Exception {
+ this.backups = backups;
+
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
+
+ final CacheEventListener2 lsnr = new CacheEventListener2();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ qry.setRemoteFilter(new CacheEventFilter());
+
+ QueryCursor<?> cur = qryClnCache.query(qry);
+
+ CacheEventListener2 dinLsnr = null;
+
+ QueryCursor<?> dinQry = null;
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
+
+ IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ while (!stop.get() && !err) {
+ final int idx = ThreadLocalRandom.current().nextInt(SRV_NODES - 1);
+
+ log.info("Stop node: " + idx);
+
+ awaitPartitionMapExchange();
+
+ Thread.sleep(400);
+
+ stopGrid(idx);
+
+ awaitPartitionMapExchange();
+
+ Thread.sleep(400);
+
+ log.info("Start node: " + idx);
+
+ startGrid(idx);
+
+ Thread.sleep(200);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ assertTrue(checkLatch.compareAndSet(null, latch));
+
+ if (!stop.get()) {
+ log.info("Wait for event check.");
+
+ assertTrue(latch.await(1, MINUTES));
+ }
+ }
+
+ return null;
+ }
+ });
+
+ final Map<Integer, Integer> vals = new HashMap<>();
+
+ final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
+
+ final List<T3<Object, Object, Object>> expEvtsNewLsnr = new ArrayList<>();
+
+ final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>();
+
+ try {
+ long stopTime = System.currentTimeMillis() + 60_000;
+
+ // Start new filter each 5 sec.
+ long startFilterTime = System.currentTimeMillis() + 5_000;
+
+ final int PARTS = qryClient.affinity(null).partitions();
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ boolean filtered = false;
+
+ boolean processorPut = false;
+
+ while (System.currentTimeMillis() < stopTime) {
+ Integer key = rnd.nextInt(PARTS);
+
+ Integer prevVal = vals.get(key);
+ Integer val = vals.get(key);
+
+ if (System.currentTimeMillis() > startFilterTime) {
+ // Stop filter and check events.
+ if (dinQry != null) {
+ dinQry.close();
+
+ log.info("Continuous query listener closed. Await events: " + expEvtsNewLsnr.size());
+
+ checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
+ }
+
+ dinLsnr = new CacheEventListener2();
+
+ ContinuousQuery<Object, Object> newQry = new ContinuousQuery<>();
+
+ newQry.setLocalListener(dinLsnr);
+
+ newQry.setRemoteFilter(new CacheEventFilter());
+
+ dinQry = qryClnCache.query(newQry);
+
+ log.info("Continuous query listener started.");
+
+ startFilterTime = System.currentTimeMillis() + 5_000;
+ }
+
+ if (val == null)
+ val = 0;
+ else
+ val = Math.abs(val) + 1;
+
+ if (filtered)
+ val = -val;
+
+ if (processorPut && prevVal != null) {
+ qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
+ @Override public Void process(MutableEntry<Object, Object> entry,
+ Object... arguments) throws EntryProcessorException {
+ entry.setValue(arguments[0]);
+
+ return null;
+ }
+ }, val);
+ }
+ else
+ qryClnCache.put(key, val);
+
+ processorPut = !processorPut;
+
+ vals.put(key, val);
+
+ if (val >= 0) {
+ List<T2<Integer, Integer>> keyEvts = expEvts.get(key);
+
+ if (keyEvts == null) {
+ keyEvts = new ArrayList<>();
+
+ expEvts.put(key, keyEvts);
+ }
+
+ keyEvts.add(new T2<>(val, prevVal));
+
+ T3<Object, Object, Object> tupVal = new T3<>((Object)key, (Object)val, (Object)prevVal);
+
+ expEvtsLsnr.add(tupVal);
+
+ if (dinQry != null)
+ expEvtsNewLsnr.add(tupVal);
+ }
+
+ filtered = !filtered;
+
+ CountDownLatch latch = checkLatch.get();
+
+ if (latch != null) {
+ log.info("Check events.");
+
+ checkLatch.set(null);
+
+ boolean success = false;
+
+ try {
+ if (err)
+ break;
+
+ checkEvents(expEvtsLsnr, lsnr, backups == 0);
+
+ success = true;
+
+ log.info("Events checked.");
+ }
+ finally {
+ if (!success)
+ err = true;
+
+ latch.countDown();
+ }
+ }
+ }
+ }
+ finally {
+ stop.set(true);
+ }
+
+ CountDownLatch latch = checkLatch.get();
+
+ if (latch != null)
+ latch.countDown();
+
+ restartFut.get();
+
+ checkEvents(expEvtsLsnr, lsnr, backups == 0);
+
+ lsnr.evts.clear();
+ lsnr.vals.clear();
+
+ if (dinQry != null) {
+ checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
+
+ dinLsnr.evts.clear();
+ dinLsnr.vals.clear();
+ }
+
+ List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
+
+ for (int i = 0; i < qryClient.affinity(null).partitions(); i++) {
+ Integer oldVal = (Integer)qryClnCache.get(i);
+
+ qryClnCache.put(i, i);
+
+ afterRestEvents.add(new T3<>((Object)i, (Object)i, (Object)oldVal));
+ }
+
+ checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
+
+ cur.close();
+
+ if (dinQry != null) {
+ checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false);
+
+ dinQry.close();
+ }
+
+ assertFalse("Unexpected error during test, see log for details.", err);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiThreadedFailover() throws Exception {
+ this.backups = 2;
+
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ final Ignite qryCln = startGrid(SRV_NODES);
+
+ client = false;
+
+ final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null);
+
+ final CacheEventListener2 lsnr = new CacheEventListener2();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = qryClnCache.query(qry);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final int THREAD = 4;
+
+ final int PARTS = THREAD;
+
+ final List<List<T3<Object, Object, Object>>> expEvts = new ArrayList<>(THREAD + 5);
+
+ for (int i = 0; i < THREAD; i++)
+ expEvts.add(i, new ArrayList<T3<Object, Object, Object>>());
+
+ final AtomicReference<CyclicBarrier> checkBarrier = new AtomicReference<>();
+
+ IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ final int idx = SRV_NODES + 1;
+
+ while (!stop.get() && !err) {
+ log.info("Start node: " + idx);
+
+ startGrid(idx);
+
+ awaitPartitionMapExchange();
+
+ Thread.sleep(100);
+
+ try {
+ log.info("Stop node: " + idx);
+
+ stopGrid(idx);
+
+ awaitPartitionMapExchange();
+
+ Thread.sleep(100);
+ }
+ catch (Exception e) {
+ log.warning("Failed to stop nodes.", e);
+ }
+
+ CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() {
+ @Override public void run() {
+ try {
+ GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ int size = 0;
+
+ for (List<T3<Object, Object, Object>> evt : expEvts)
+ size += evt.size();
+
+ return lsnr.size() <= size;
+ }
+ }, 2000L);
+
+ List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
+
+ for (List<T3<Object, Object, Object>> evt : expEvts)
+ expEvts0.addAll(evt);
+
+ checkEvents(expEvts0, lsnr, false);
+
+ for (List<T3<Object, Object, Object>> evt : expEvts)
+ evt.clear();
+ }
+ catch (Exception e) {
+ log.error("Failed.", e);
+
+ err = true;
+
+ stop.set(true);
+ }
+ finally {
+ checkBarrier.set(null);
+ }
+ }
+ });
+
+ assertTrue(checkBarrier.compareAndSet(null, bar));
+
+ if (!stop.get() && !err)
+ bar.await(5, MINUTES);
+ }
+
+ return null;
+ }
+ });
+
+ final long stopTime = System.currentTimeMillis() + 60_000;
+
+ final AtomicInteger valCntr = new AtomicInteger(0);
+
+ final AtomicInteger threadSeq = new AtomicInteger(0);
+
+ GridTestUtils.runMultiThreaded(new Runnable() {
+ @Override public void run() {
+ try {
+ final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ final int threadId = threadSeq.getAndIncrement();
+
+ log.error("Thread id: " + threadId);
+
+ while (System.currentTimeMillis() < stopTime && !stop.get() && !err) {
+ Integer key = rnd.nextInt(PARTS);
+
+ Integer val = valCntr.incrementAndGet();
+
+ Integer prevVal = (Integer)qryClnCache.getAndPut(key, val);
+
+ expEvts.get(threadId).add(new T3<>((Object)key, (Object)val, (Object)prevVal));
+
+ CyclicBarrier bar = checkBarrier.get();
+
+ if (bar != null)
+ bar.await();
+ }
+ }
+ catch (Exception e){
+ log.error("Failed.", e);
+
+ err = true;
+
+ stop.set(true);
+ }
+ finally {
+ stop.set(true);
+ }
+ }
+ }, THREAD, "update-thread");
+
+ restartFut.get();
+
+ List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
+
+ for (List<T3<Object, Object, Object>> evt : expEvts) {
+ expEvts0.addAll(evt);
+
+ evt.clear();
+ }
+
+ if (!expEvts0.isEmpty())
+ checkEvents(expEvts0, lsnr, true);
+
+ cur.close();
+
+ assertFalse("Unexpected error during test, see log for details.", err);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultiThreaded() throws Exception {
+ this.backups = 2;
+
+ final int SRV_NODES = 3;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ Ignite qryClient = startGrid(SRV_NODES);
+
+ final IgniteCache<Object, Object> cache = qryClient.cache(null);
+
+ CacheEventListener1 lsnr = new CacheEventListener1(true);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ QueryCursor<?> cur = cache.query(qry);
+
+ client = false;
+
+ final int SRV_IDX = SRV_NODES - 1;
+
+ List<Integer> keys = primaryKeys(ignite(SRV_IDX).cache(null), 10);
+
+ final int THREADS = 10;
+
+ for (int i = 0; i < keys.size(); i++) {
+ log.info("Iteration: " + i);
+
+ Ignite srv = ignite(SRV_IDX);
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+ spi.sndFirstOnly = new AtomicBoolean(false);
+
+ final Integer key = keys.get(i);
+
+ final AtomicInteger val = new AtomicInteger();
+
+ CountDownLatch latch = new CountDownLatch(THREADS);
+
+ lsnr.latch = latch;
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Integer val0 = val.getAndIncrement();
+
+ cache.put(key, val0);
+
+ return null;
+ }
+ }, THREADS, "update-thread");
+
+ fut.get();
+
+ stopGrid(SRV_IDX);
+
+ if (!latch.await(5, SECONDS))
+ fail("Failed to wait for notifications [exp=" + THREADS + ", left=" + lsnr.latch.getCount() + ']');
+
+ assertEquals(THREADS, lsnr.allEvts.size());
+
+ Set<Integer> vals = new HashSet<>();
+
+ boolean err = false;
+
+ for (CacheEntryEvent<?, ?> evt : lsnr.allEvts) {
+ assertEquals(key, evt.getKey());
+ assertNotNull(evt.getValue());
+
+ if (!vals.add((Integer)evt.getValue())) {
+ err = true;
+
+ log.info("Extra event: " + evt);
+ }
+ }
+
+ for (int v = 0; v < THREADS; v++) {
+ if (!vals.contains(v)) {
+ err = true;
+
+ log.info("Event for value not received: " + v);
+ }
+ }
+
+ assertFalse("Invalid events, see log for details.", err);
+
+ lsnr.allEvts.clear();
+
+ startGrid(SRV_IDX);
+ }
+
+ cur.close();
+ }
+
+ /**
+ * @param logAll If {@code true} logs all unexpected values.
+ * @param expEvts Expected values.
+ * @param lsnr Listener.
+ * @return Check status.
+ */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ private boolean checkEvents(boolean logAll,
+ Map<Integer, List<T2<Integer, Integer>>> expEvts,
+ CacheEventListener2 lsnr) {
+ assertTrue(!expEvts.isEmpty());
+
+ boolean pass = true;
+
+ for (Map.Entry<Integer, List<T2<Integer, Integer>>> e : expEvts.entrySet()) {
+ Integer key = e.getKey();
+ List<T2<Integer, Integer>> exp = e.getValue();
+
+ List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(key);
+
+ if (rcvdEvts == null) {
+ pass = false;
+
+ log.info("No events for key [key=" + key + ", exp=" + e.getValue() + ']');
+
+ if (!logAll)
+ return false;
+ }
+ else {
+ synchronized (rcvdEvts) {
+ if (rcvdEvts.size() != exp.size()) {
+ pass = false;
+
+ log.info("Missed or extra events for key [key=" + key +
+ ", exp=" + e.getValue() +
+ ", rcvd=" + rcvdEvts + ']');
+
+ if (!logAll)
+ return false;
+ }
+
+ int cnt = Math.min(rcvdEvts.size(), exp.size());
+
+ for (int i = 0; i < cnt; i++) {
+ T2<Integer, Integer> expEvt = exp.get(i);
+ CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i);
+
+ if (pass) {
+ assertEquals(key, rcvdEvt.getKey());
+ assertEquals(expEvt.get1(), rcvdEvt.getValue());
+ }
+ else {
+ if (!key.equals(rcvdEvt.getKey()) || !expEvt.get1().equals(rcvdEvt.getValue()))
+ log.warning("Missed events. [key=" + key + ", actKey=" + rcvdEvt.getKey()
+ + ", expVal=" + expEvt.get1() + ", actVal=" + rcvdEvt.getValue() + "]");
+ }
+ }
+
+ if (!pass) {
+ for (int i = cnt; i < exp.size(); i++) {
+ T2<Integer, Integer> val = exp.get(i);
+
+ log.warning("Missed events. [key=" + key + ", expVal=" + val.get1()
+ + ", prevVal=" + val.get2() + "]");
+ }
+ }
+ }
+ }
+ }
+
+ if (pass) {
+ expEvts.clear();
+ lsnr.evts.clear();
+ }
+
+ return pass;
+ }
+
+ /**
+ *
+ */
+ private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object, Object> {
+ /** */
+ private volatile CountDownLatch latch;
+
+ /** */
+ private GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>();
+
+ /** */
+ private ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
+
+ /** */
+ private List<CacheEntryEvent<?, ?>> allEvts;
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /**
+ * @param saveAll Save all events flag.
+ */
+ CacheEventListener1(boolean saveAll) {
+ if (saveAll)
+ allEvts = new ArrayList<>();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+ try {
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ CountDownLatch latch = this.latch;
+
+ log.info("Received cache event [evt=" + evt +
+ ", left=" + (latch != null ? latch.getCount() : null) + ']');
+
+ this.evts.put(evt.getKey(), evt);
+
+ keys.add((Integer)evt.getKey());
+
+ if (allEvts != null)
+ allEvts.add(evt);
+
+ assertTrue(latch != null);
+ assertTrue(latch.getCount() > 0);
+
+ latch.countDown();
+
+ if (latch.getCount() == 0) {
+ this.latch = null;
+
+ keys.clear();
+ }
+ }
+ }
+ catch (Throwable e) {
+ err = true;
+
+ log.error("Unexpected error", e);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object, Object> {
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ private final ConcurrentHashMap<Integer, Integer> vals = new ConcurrentHashMap<>();
+
+ /** */
+ private final ConcurrentHashMap<Integer, List<CacheEntryEvent<?, ?>>> evts = new ConcurrentHashMap<>();
+
+ /**
+ * @return Count events.
+ */
+ public int size() {
+ int size = 0;
+
+ for (List<CacheEntryEvent<?, ?>> e : evts.values())
+ size += e.size();
+
+ return size;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
+ throws CacheEntryListenerException {
+ try {
+ for (CacheEntryEvent<?, ?> evt : evts) {
+ Integer key = (Integer)evt.getKey();
+ Integer val = (Integer)evt.getValue();
+
+ assertNotNull(key);
+ assertNotNull(val);
+
+ Integer prevVal = vals.get(key);
+
+ boolean dup = false;
+
+ if (prevVal != null && prevVal.equals(val))
+ dup = true;
+
+ if (!dup) {
+ vals.put(key, val);
+
+ List<CacheEntryEvent<?, ?>> keyEvts = this.evts.get(key);
+
+ if (keyEvts == null) {
+ keyEvts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
+
+ this.evts.put(key, keyEvts);
+ }
+
+ keyEvts.add(evt);
+ }
+ }
+ }
+ catch (Throwable e) {
+ err = true;
+
+ log.error("Unexpected error", e);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public static class CacheEventListener3 implements CacheEntryUpdatedListener<Object, Object>,
+ CacheEntryEventSerializableFilter<Object, Object> {
+ /** Keys. */
+ GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>();
+
+ /** Events. */
+ private final ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> events) throws CacheEntryListenerException {
+ for (CacheEntryEvent<?, ?> e : events) {
+ Integer key = (Integer)e.getKey();
+
+ keys.add(key);
+
+ assert evts.put(key, e) == null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> e) throws CacheEntryListenerException {
+ return (Integer)e.getValue() % 2 == 0;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+ return ((Integer)event.getValue()) >= 0;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ private volatile boolean skipMsg;
+
+ /** */
+ private volatile boolean skipAllMsg;
+
+ /** */
+ private volatile AtomicBoolean sndFirstOnly;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
+ throws IgniteSpiException {
+ Object msg0 = ((GridIoMessage)msg).message();
+
+ if (skipAllMsg)
+ return;
+
+ if (msg0 instanceof GridContinuousMessage) {
+ if (skipMsg) {
+ if (log.isDebugEnabled())
+ log.debug("Skip continuous message: " + msg0);
+
+ return;
+ }
+ else {
+ AtomicBoolean sndFirstOnly = this.sndFirstOnly;
+
+ if (sndFirstOnly != null && !sndFirstOnly.compareAndSet(false, true)) {
+ if (log.isDebugEnabled())
+ log.debug("Skip continuous message: " + msg0);
+
+ return;
+ }
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+}
[2/3] ignite git commit: IGNITE-426 Fixed tests.
Posted by nt...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
deleted file mode 100644
index 95781e0..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ /dev/null
@@ -1,2522 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.cache.Cache;
-import javax.cache.CacheException;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
-import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
-import org.apache.ignite.cache.CacheEntryProcessor;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.query.ContinuousQuery;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
-import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
-import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.PA;
-import org.apache.ignite.internal.util.typedef.PAX;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.T3;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.transactions.Transaction;
-
-import static java.util.concurrent.TimeUnit.MINUTES;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
-/**
- *
- */
-public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommonAbstractTest {
- /** */
- private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private static final int BACKUP_ACK_THRESHOLD = 100;
-
- /** */
- private static volatile boolean err;
-
- /** */
- private boolean client;
-
- /** */
- private int backups = 1;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
-
- TestCommunicationSpi commSpi = new TestCommunicationSpi();
-
- commSpi.setSharedMemoryPort(-1);
- commSpi.setIdleConnectionTimeout(100);
-
- cfg.setCommunicationSpi(commSpi);
-
- CacheConfiguration ccfg = new CacheConfiguration();
-
- ccfg.setCacheMode(cacheMode());
- ccfg.setAtomicityMode(atomicityMode());
- ccfg.setAtomicWriteOrderMode(writeOrderMode());
- ccfg.setBackups(backups);
- ccfg.setWriteSynchronizationMode(FULL_SYNC);
-
- cfg.setCacheConfiguration(ccfg);
-
- cfg.setClientMode(client);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 5 * 60_000;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- err = false;
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- stopAllGrids();
- }
-
- /**
- * @return Cache mode.
- */
- protected abstract CacheMode cacheMode();
-
- /**
- * @return Atomicity mode.
- */
- protected abstract CacheAtomicityMode atomicityMode();
-
- /**
- * @return Write order mode for atomic cache.
- */
- protected CacheAtomicWriteOrderMode writeOrderMode() {
- return PRIMARY;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testFirstFilteredEvent() throws Exception {
- this.backups = 2;
-
- final int SRV_NODES = 4;
-
- startGridsMultiThreaded(SRV_NODES);
-
- client = true;
-
- Ignite qryClient = startGrid(SRV_NODES);
-
- client = false;
-
- IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
-
- final CacheEventListener3 lsnr = new CacheEventListener3();
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- qry.setLocalListener(lsnr);
-
- qry.setRemoteFilter(new CacheEventFilter());
-
- try (QueryCursor<?> cur = qryClnCache.query(qry)) {
- List<Integer> keys = testKeys(grid(0).cache(null), 1);
-
- for (Integer key : keys)
- qryClnCache.put(key, -1);
-
- qryClnCache.put(keys.get(0), 100);
- }
-
- assertEquals(lsnr.evts.size(), 1);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRebalanceVersion() throws Exception {
- Ignite ignite0 = startGrid(0);
- GridDhtPartitionTopology top0 = ((IgniteKernal)ignite0).context().cache().context().cacheContext(1).topology();
-
- assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(1)));
- assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(2)));
-
- Ignite ignite1 = startGrid(1);
- GridDhtPartitionTopology top1 = ((IgniteKernal)ignite1).context().cache().context().cacheContext(1).topology();
-
- waitRebalanceFinished(ignite0, 2);
- waitRebalanceFinished(ignite1, 2);
-
- assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(3)));
- assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(3)));
-
- Ignite ignite2 = startGrid(2);
- GridDhtPartitionTopology top2 = ((IgniteKernal)ignite2).context().cache().context().cacheContext(1).topology();
-
- waitRebalanceFinished(ignite0, 3);
- waitRebalanceFinished(ignite1, 3);
- waitRebalanceFinished(ignite2, 3);
-
- assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(4)));
- assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(4)));
- assertFalse(top2.rebalanceFinished(new AffinityTopologyVersion(4)));
-
- client = true;
-
- Ignite ignite3 = startGrid(3);
- GridDhtPartitionTopology top3 = ((IgniteKernal)ignite3).context().cache().context().cacheContext(1).topology();
-
- assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(4)));
- assertTrue(top1.rebalanceFinished(new AffinityTopologyVersion(4)));
- assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(4)));
- assertTrue(top3.rebalanceFinished(new AffinityTopologyVersion(4)));
-
- stopGrid(1);
-
- waitRebalanceFinished(ignite0, 5);
- waitRebalanceFinished(ignite2, 5);
- waitRebalanceFinished(ignite3, 5);
-
- stopGrid(3);
-
- assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(6)));
- assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(6)));
-
- stopGrid(0);
-
- waitRebalanceFinished(ignite2, 7);
- }
-
- /**
- * @param ignite Ignite.
- * @param topVer Topology version.
- * @throws Exception If failed.
- */
- private void waitRebalanceFinished(Ignite ignite, long topVer) throws Exception {
- final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer);
-
- final GridDhtPartitionTopology top =
- ((IgniteKernal)ignite).context().cache().context().cacheContext(1).topology();
-
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return top.rebalanceFinished(topVer0);
- }
- }, 5000);
-
- assertTrue(top.rebalanceFinished(topVer0));
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testOneBackup() throws Exception {
- checkBackupQueue(1, false);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testOneBackupClientUpdate() throws Exception {
- checkBackupQueue(1, true);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartStopQuery() throws Exception {
- this.backups = 1;
-
- final int SRV_NODES = 3;
-
- startGridsMultiThreaded(SRV_NODES);
-
- client = true;
-
- final Ignite qryClient = startGrid(SRV_NODES);
-
- client = false;
-
- IgniteCache<Object, Object> clnCache = qryClient.cache(null);
-
- IgniteOutClosure<IgniteCache<Integer, Integer>> rndCache =
- new IgniteOutClosure<IgniteCache<Integer, Integer>>() {
- int cnt = 0;
-
- @Override public IgniteCache<Integer, Integer> apply() {
- ++cnt;
-
- return grid(cnt % SRV_NODES + 1).cache(null);
- }
- };
-
- Ignite igniteSrv = ignite(0);
-
- IgniteCache<Object, Object> srvCache = igniteSrv.cache(null);
-
- List<Integer> keys = testKeys(srvCache, 3);
-
- int keyCnt = keys.size();
-
- for (int j = 0; j < 50; ++j) {
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- final CacheEventListener3 lsnr = new CacheEventListener3();
-
- qry.setLocalListener(lsnr);
-
- qry.setRemoteFilter(lsnr);
-
- int keyIter = 0;
-
- for (; keyIter < keyCnt / 2; keyIter++) {
- int key = keys.get(keyIter);
-
- rndCache.apply().put(key, key);
- }
-
- assert lsnr.evts.isEmpty();
-
- QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
-
- Map<Object, T2<Object, Object>> updates = new HashMap<>();
-
- final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
-
- Affinity<Object> aff = affinity(srvCache);
-
- boolean filtered = false;
-
- for (; keyIter < keys.size(); keyIter++) {
- int key = keys.get(keyIter);
-
- int val = filtered ? 1 : 2;
-
- log.info("Put [key=" + key + ", val=" + val + ", part=" + aff.partition(key) + ']');
-
- T2<Object, Object> t = updates.get(key);
-
- if (t == null) {
- // Check filtered.
- if (!filtered) {
- updates.put(key, new T2<>((Object)val, null));
-
- expEvts.add(new T3<>((Object)key, (Object)val, null));
- }
- }
- else {
- // Check filtered.
- if (!filtered) {
- updates.put(key, new T2<>((Object)val, (Object)t.get1()));
-
- expEvts.add(new T3<>((Object)key, (Object)val, (Object)t.get1()));
- }
- }
-
- rndCache.apply().put(key, val);
-
- filtered = !filtered;
- }
-
- checkEvents(expEvts, lsnr, false);
-
- query.close();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLeftPrimaryAndBackupNodes() throws Exception {
- if (cacheMode() == REPLICATED)
- return;
-
- this.backups = 1;
-
- final int SRV_NODES = 3;
-
- startGridsMultiThreaded(SRV_NODES);
-
- client = true;
-
- final Ignite qryClient = startGrid(SRV_NODES);
-
- client = false;
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- final CacheEventListener3 lsnr = new CacheEventListener3();
-
- qry.setLocalListener(lsnr);
-
- qry.setRemoteFilter(lsnr);
-
- IgniteCache<Object, Object> clnCache = qryClient.cache(null);
-
- QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
-
- Ignite igniteSrv = ignite(0);
-
- IgniteCache<Object, Object> srvCache = igniteSrv.cache(null);
-
- Affinity<Object> aff = affinity(srvCache);
-
- List<Integer> keys = testKeys(srvCache, 1);
-
- Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(keys.get(0));
-
- Collection<UUID> ids = F.transform(nodes, new C1<ClusterNode, UUID>() {
- @Override public UUID apply(ClusterNode node) {
- return node.id();
- }
- });
-
- int keyIter = 0;
-
- boolean filtered = false;
-
- Map<Object, T2<Object, Object>> updates = new HashMap<>();
-
- final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
-
- for (; keyIter < keys.size() / 2; keyIter++) {
- int key = keys.get(keyIter);
-
- log.info("Put [key=" + key + ", part=" + aff.partition(key)
- + ", filtered=" + filtered + ']');
-
- T2<Object, Object> t = updates.get(key);
-
- Integer val = filtered ?
- (key % 2 == 0 ? key + 1 : key) :
- key * 2;
-
- if (t == null) {
- updates.put(key, new T2<>((Object)val, null));
-
- if (!filtered)
- expEvts.add(new T3<>((Object)key, (Object)val, null));
- }
- else {
- updates.put(key, new T2<>((Object)val, (Object)key));
-
- if (!filtered)
- expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
- }
-
- srvCache.put(key, val);
-
- filtered = !filtered;
- }
-
- checkEvents(expEvts, lsnr, false);
-
- List<Thread> stopThreads = new ArrayList<>(3);
-
- // Stop nodes which owning this partition.
- for (int i = 0; i < SRV_NODES; i++) {
- Ignite ignite = ignite(i);
-
- if (ids.contains(ignite.cluster().localNode().id())) {
- final int i0 = i;
-
- TestCommunicationSpi spi = (TestCommunicationSpi)ignite.configuration().getCommunicationSpi();
-
- spi.skipAllMsg = true;
-
- stopThreads.add(new Thread() {
- @Override public void run() {
- stopGrid(i0, true);
- }
- });
- }
- }
-
- // Stop and join threads.
- for (Thread t : stopThreads)
- t.start();
-
- for (Thread t : stopThreads)
- t.join();
-
- assert GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- // (SRV_NODES + 1 client node) - 1 primary - backup nodes.
- return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node */)
- - 1 /** Primary node */ - backups;
- }
- }, 5000L);
-
- for (; keyIter < keys.size(); keyIter++) {
- int key = keys.get(keyIter);
-
- log.info("Put [key=" + key + ", filtered=" + filtered + ']');
-
- T2<Object, Object> t = updates.get(key);
-
- Integer val = filtered ?
- (key % 2 == 0 ? key + 1 : key) :
- key * 2;
-
- if (t == null) {
- updates.put(key, new T2<>((Object)val, null));
-
- if (!filtered)
- expEvts.add(new T3<>((Object)key, (Object)val, null));
- }
- else {
- updates.put(key, new T2<>((Object)val, (Object)key));
-
- if (!filtered)
- expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
- }
-
- clnCache.put(key, val);
-
- filtered = !filtered;
- }
-
- checkEvents(expEvts, lsnr, false);
-
- query.close();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testRemoteFilter() throws Exception {
- this.backups = 2;
-
- final int SRV_NODES = 4;
-
- startGridsMultiThreaded(SRV_NODES);
-
- client = true;
-
- Ignite qryClient = startGrid(SRV_NODES);
-
- client = false;
-
- IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
-
- if (cacheMode() != REPLICATED)
- assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups());
-
- Affinity<Object> aff = qryClient.affinity(null);
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- final CacheEventListener3 lsnr = new CacheEventListener3();
-
- qry.setLocalListener(lsnr);
-
- qry.setRemoteFilter(lsnr);
-
- int PARTS = 10;
-
- QueryCursor<?> cur = qryClientCache.query(qry);
-
- Map<Object, T2<Object, Object>> updates = new HashMap<>();
-
- final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
-
- for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
- log.info("Stop iteration: " + i);
-
- TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
-
- Ignite ignite = ignite(i);
-
- IgniteCache<Object, Object> cache = ignite.cache(null);
-
- List<Integer> keys = testKeys(cache, PARTS);
-
- boolean first = true;
-
- boolean filtered = false;
-
- for (Integer key : keys) {
- log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key)
- + ", filtered=" + filtered + ']');
-
- T2<Object, Object> t = updates.get(key);
-
- Integer val = filtered ?
- (key % 2 == 0 ? key + 1 : key) :
- key * 2;
-
- if (t == null) {
- updates.put(key, new T2<>((Object)val, null));
-
- if (!filtered)
- expEvts.add(new T3<>((Object)key, (Object)val, null));
- }
- else {
- updates.put(key, new T2<>((Object)val, (Object)key));
-
- if (!filtered)
- expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
- }
-
- cache.put(key, val);
-
- if (first) {
- spi.skipMsg = true;
-
- first = false;
- }
-
- filtered = !filtered;
- }
-
- stopGrid(i);
-
- boolean check = GridTestUtils.waitForCondition(new PAX() {
- @Override public boolean applyx() throws IgniteCheckedException {
- return expEvts.size() == lsnr.keys.size();
- }
- }, 5000L);
-
- if (!check) {
- Set<Integer> keys0 = new HashSet<>(keys);
-
- keys0.removeAll(lsnr.keys);
-
- log.info("Missed events for keys: " + keys0);
-
- fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + keys0.size() + ']');
- }
-
- checkEvents(expEvts, lsnr, false);
- }
-
- cur.close();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testThreeBackups() throws Exception {
- if (cacheMode() == REPLICATED)
- return;
-
- checkBackupQueue(3, false);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isDebug() {
- return true;
- }
-
- /**
- * @param backups Number of backups.
- * @param updateFromClient If {@code true} executes cache update from client node.
- * @throws Exception If failed.
- */
- private void checkBackupQueue(int backups, boolean updateFromClient) throws Exception {
- this.backups = atomicityMode() == CacheAtomicityMode.ATOMIC ? backups :
- backups < 2 ? 2 : backups;
-
- final int SRV_NODES = 4;
-
- startGridsMultiThreaded(SRV_NODES);
-
- client = true;
-
- Ignite qryClient = startGrid(SRV_NODES);
-
- client = false;
-
- IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
-
- Affinity<Object> aff = qryClient.affinity(null);
-
- CacheEventListener1 lsnr = new CacheEventListener1(false);
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- qry.setLocalListener(lsnr);
-
- QueryCursor<?> cur = qryClientCache.query(qry);
-
- int PARTS = 10;
-
- Map<Object, T2<Object, Object>> updates = new HashMap<>();
-
- List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
-
- for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
- log.info("Stop iteration: " + i);
-
- TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
-
- Ignite ignite = ignite(i);
-
- IgniteCache<Object, Object> cache = ignite.cache(null);
-
- List<Integer> keys = testKeys(cache, PARTS);
-
- CountDownLatch latch = new CountDownLatch(keys.size());
-
- lsnr.latch = latch;
-
- boolean first = true;
-
- for (Integer key : keys) {
- log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']');
-
- T2<Object, Object> t = updates.get(key);
-
- if (updateFromClient) {
- if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
- try (Transaction tx = qryClient.transactions().txStart()) {
- qryClientCache.put(key, key);
-
- tx.commit();
- }
- catch (CacheException | ClusterTopologyException e) {
- log.warning("Failed put. [Key=" + key + ", val=" + key + "]");
-
- continue;
- }
- }
- else
- qryClientCache.put(key, key);
- }
- else {
- if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
- try (Transaction tx = ignite.transactions().txStart()) {
- cache.put(key, key);
-
- tx.commit();
- }
- catch (CacheException | ClusterTopologyException e) {
- log.warning("Failed put. [Key=" + key + ", val=" + key + "]");
-
- continue;
- }
- }
- else
- cache.put(key, key);
- }
-
- if (t == null) {
- updates.put(key, new T2<>((Object)key, null));
-
- expEvts.add(new T3<>((Object)key, (Object)key, null));
- }
- else {
- updates.put(key, new T2<>((Object)key, (Object)key));
-
- expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
- }
-
- if (first) {
- spi.skipMsg = true;
-
- first = false;
- }
- }
-
- stopGrid(i);
-
- if (!latch.await(5, SECONDS)) {
- Set<Integer> keys0 = new HashSet<>(keys);
-
- keys0.removeAll(lsnr.keys);
-
- log.info("Missed events for keys: " + keys0);
-
- fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
- }
-
- checkEvents(expEvts, lsnr);
- }
-
- for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) {
- log.info("Start iteration: " + i);
-
- Ignite ignite = startGrid(i);
-
- IgniteCache<Object, Object> cache = ignite.cache(null);
-
- List<Integer> keys = testKeys(cache, PARTS);
-
- CountDownLatch latch = new CountDownLatch(keys.size());
-
- lsnr.latch = latch;
-
- for (Integer key : keys) {
- log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']');
-
- T2<Object, Object> t = updates.get(key);
-
- if (t == null) {
- updates.put(key, new T2<>((Object)key, null));
-
- expEvts.add(new T3<>((Object)key, (Object)key, null));
- }
- else {
- updates.put(key, new T2<>((Object)key, (Object)key));
-
- expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
- }
-
- if (updateFromClient)
- qryClientCache.put(key, key);
- else
- cache.put(key, key);
- }
-
- if (!latch.await(10, SECONDS)) {
- Set<Integer> keys0 = new HashSet<>(keys);
-
- keys0.removeAll(lsnr.keys);
-
- log.info("Missed events for keys: " + keys0);
-
- fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
- }
-
- checkEvents(expEvts, lsnr);
- }
-
- cur.close();
-
- assertFalse("Unexpected error during test, see log for details.", err);
- }
-
- /**
- * @param expEvts Expected events.
- * @param lsnr Listener.
- */
- private void checkEvents(List<T3<Object, Object, Object>> expEvts, CacheEventListener1 lsnr) {
- for (T3<Object, Object, Object> exp : expEvts) {
- CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());
-
- assertNotNull("No event for key: " + exp.get1(), e);
- assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
- }
-
- expEvts.clear();
-
- lsnr.evts.clear();
- }
-
- /**
- * @param expEvts Expected events.
- * @param lsnr Listener.
- * @param lostAllow If {@code true} than won't assert on lost events.
- */
- private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
- boolean lostAllow) throws Exception {
- GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return expEvts.size() == lsnr.size();
- }
- }, 2000L);
-
- Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size());
-
- for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
- prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
-
- List<T3<Object, Object, Object>> lostEvents = new ArrayList<>();
-
- for (T3<Object, Object, Object> exp : expEvts) {
- List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
-
- if (F.eq(exp.get2(), exp.get3()))
- continue;
-
- if (rcvdEvts == null || rcvdEvts.isEmpty()) {
- lostEvents.add(exp);
-
- continue;
- }
-
- Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator();
-
- boolean found = false;
-
- while (iter.hasNext()) {
- CacheEntryEvent<?, ?> e = iter.next();
-
- if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue()))
- && equalOldValue(e, exp)) {
- found = true;
-
- iter.remove();
-
- break;
- }
- }
-
- // Lost event is acceptable.
- if (!found)
- lostEvents.add(exp);
- }
-
- boolean dup = false;
-
- // Check duplicate.
- if (!lsnr.evts.isEmpty()) {
- for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) {
- if (!evts.isEmpty()) {
- for (CacheEntryEvent<?, ?> e : evts) {
- boolean found = false;
-
- for (T3<Object, Object, Object> lostEvt : lostEvents) {
- if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) {
- found = true;
-
- lostEvents.remove(lostEvt);
-
- break;
- }
- }
-
- if (!found) {
- dup = true;
-
- break;
- }
- }
- }
- }
-
- if (dup) {
- for (T3<Object, Object, Object> e : lostEvents)
- log.error("Lost event: " + e);
-
- for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) {
- if (!e.isEmpty()) {
- for (CacheEntryEvent<?, ?> event : e) {
- List<CacheEntryEvent<?, ?>> entries = new ArrayList<>();
-
- for (CacheEntryEvent<?, ?> ev0 : prevMap.get(event.getKey())) {
- if (F.eq(event.getValue(), ev0.getValue()) && F.eq(event.getOldValue(),
- ev0.getOldValue()))
- entries.add(ev0);
- }
- }
- }
- }
- }
- }
-
- if (!lostAllow && !lostEvents.isEmpty()) {
- log.error("Lost event cnt: " + lostEvents.size());
-
- for (T3<Object, Object, Object> e : lostEvents)
- log.error("Lost event: " + e);
-
- fail("Lose events, see log for details.");
- }
-
- log.error("Lost event cnt: " + lostEvents.size());
-
- expEvts.clear();
-
- lsnr.evts.clear();
- lsnr.vals.clear();
- }
-
- /**
- * @param e Event
- * @param expVals expected value
- * @return {@code True} if entries has the same key, value and oldValue. If cache start without backups
- * than oldValue ignoring in comparison.
- */
- private boolean equalOldValue(CacheEntryEvent<?, ?> e, T3<Object, Object, Object> expVals) {
- return (e.getOldValue() == null && expVals.get3() == null) // Both null
- || (e.getOldValue() != null && expVals.get3() != null // Equals
- && e.getOldValue().equals(expVals.get3()))
- || (backups == 0); // If we start without backup than oldValue might be lose.
- }
-
- /**
- * @param expEvts Expected events.
- * @param lsnr Listener.
- */
- private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener3 lsnr,
- boolean allowLoseEvent) throws Exception {
- if (!allowLoseEvent)
- assert GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- return lsnr.evts.size() == expEvts.size();
- }
- }, 2000L);
-
- for (T3<Object, Object, Object> exp : expEvts) {
- CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());
-
- assertNotNull("No event for key: " + exp.get1(), e);
- assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
-
- if (allowLoseEvent)
- lsnr.evts.remove(exp.get1());
- }
-
- if (allowLoseEvent)
- assert lsnr.evts.isEmpty();
-
- expEvts.clear();
-
- lsnr.evts.clear();
- lsnr.keys.clear();
- }
-
- /**
- * @param cache Cache.
- * @param parts Number of partitions.
- * @return Keys.
- */
- private List<Integer> testKeys(IgniteCache<Object, Object> cache, int parts) {
- Ignite ignite = cache.unwrap(Ignite.class);
-
- List<Integer> res = new ArrayList<>();
-
- Affinity<Object> aff = ignite.affinity(cache.getName());
-
- ClusterNode node = ignite.cluster().localNode();
-
- int[] nodeParts = aff.primaryPartitions(node);
-
- final int KEYS_PER_PART = 50;
-
- for (int i = 0; i < parts; i++) {
- int part = nodeParts[i];
-
- int cnt = 0;
-
- for (int key = 0; key < 100_000; key++) {
- if (aff.partition(key) == part && aff.isPrimary(node, key)) {
- res.add(key);
-
- if (++cnt == KEYS_PER_PART)
- break;
- }
- }
-
- assertEquals(KEYS_PER_PART, cnt);
- }
-
- assertEquals(parts * KEYS_PER_PART, res.size());
-
- return res;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testBackupQueueCleanupClientQuery() throws Exception {
- startGridsMultiThreaded(2);
-
- client = true;
-
- Ignite qryClient = startGrid(2);
-
- CacheEventListener1 lsnr = new CacheEventListener1(false);
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- qry.setLocalListener(lsnr);
-
- QueryCursor<?> cur = qryClient.cache(null).query(qry);
-
- final Collection<Object> backupQueue = backupQueue(ignite(1));
-
- assertEquals(0, backupQueue.size());
-
- IgniteCache<Object, Object> cache0 = ignite(0).cache(null);
-
- List<Integer> keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD);
-
- CountDownLatch latch = new CountDownLatch(keys.size());
-
- lsnr.latch = latch;
-
- for (Integer key : keys) {
- log.info("Put: " + key);
-
- cache0.put(key, key);
- }
-
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return backupQueue.isEmpty();
- }
- }, 2000);
-
- assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
-
- if (!latch.await(5, SECONDS))
- fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
-
- keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD / 2);
-
- latch = new CountDownLatch(keys.size());
-
- lsnr.latch = latch;
-
- for (Integer key : keys)
- cache0.put(key, key);
-
- final long ACK_FREQ = 5000;
-
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return backupQueue.isEmpty();
- }
- }, ACK_FREQ + 2000);
-
- assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty());
-
- if (!latch.await(5, SECONDS))
- fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
-
- cur.close();
-
- assertFalse("Unexpected error during test, see log for details.", err);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testBackupQueueCleanupServerQuery() throws Exception {
- Ignite qryClient = startGridsMultiThreaded(2);
-
- CacheEventListener1 lsnr = new CacheEventListener1(false);
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- qry.setLocalListener(lsnr);
-
- IgniteCache<Object, Object> cache = qryClient.cache(null);
-
- QueryCursor<?> cur = cache.query(qry);
-
- final Collection<Object> backupQueue = backupQueue(ignite(1));
-
- assertEquals(0, backupQueue.size());
-
- List<Integer> keys = primaryKeys(cache, BACKUP_ACK_THRESHOLD);
-
- CountDownLatch latch = new CountDownLatch(keys.size());
-
- lsnr.latch = latch;
-
- for (Integer key : keys) {
- log.info("Put: " + key);
-
- cache.put(key, key);
- }
-
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return backupQueue.isEmpty();
- }
- }, 3000);
-
- assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
-
- if (!latch.await(5, SECONDS))
- fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
-
- cur.close();
- }
-
- /**
- * @param ignite Ignite.
- * @return Backup queue for test query.
- */
- private Collection<Object> backupQueue(Ignite ignite) {
- GridContinuousProcessor proc = ((IgniteKernal)ignite).context().continuous();
-
- ConcurrentMap<Object, Object> infos = GridTestUtils.getFieldValue(proc, "rmtInfos");
-
- Collection<Object> backupQueue = null;
-
- for (Object info : infos.values()) {
- GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
-
- if (hnd.isForQuery() && hnd.cacheName() == null) {
- backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue");
-
- break;
- }
- }
-
- assertNotNull(backupQueue);
-
- return backupQueue;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testFailover() throws Exception {
- this.backups = 2;
-
- final int SRV_NODES = 4;
-
- startGridsMultiThreaded(SRV_NODES);
-
- client = true;
-
- final Ignite qryCln = startGrid(SRV_NODES);
-
- client = false;
-
- final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null);
-
- final CacheEventListener2 lsnr = new CacheEventListener2();
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- qry.setLocalListener(lsnr);
-
- QueryCursor<?> cur = qryClnCache.query(qry);
-
- final AtomicBoolean stop = new AtomicBoolean();
-
- final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
-
- boolean processorPut = false;
-
- IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- final int idx = SRV_NODES + 1;
-
- while (!stop.get() && !err) {
- log.info("Start node: " + idx);
-
- startGrid(idx);
-
- Thread.sleep(200);
-
- log.info("Stop node: " + idx);
-
- try {
- stopGrid(idx);
- }
- catch (Exception e) {
- log.warning("Failed to stop nodes.", e);
- }
-
- CountDownLatch latch = new CountDownLatch(1);
-
- assertTrue(checkLatch.compareAndSet(null, latch));
-
- if (!stop.get()) {
- log.info("Wait for event check.");
-
- assertTrue(latch.await(1, MINUTES));
- }
- }
-
- return null;
- }
- });
-
- final Map<Integer, Integer> vals = new HashMap<>();
-
- final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
-
- try {
- long stopTime = System.currentTimeMillis() + 60_000;
-
- final int PARTS = qryCln.affinity(null).partitions();
-
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- while (System.currentTimeMillis() < stopTime) {
- Integer key = rnd.nextInt(PARTS);
-
- Integer prevVal = vals.get(key);
- Integer val = vals.get(key);
-
- if (val == null)
- val = 0;
- else
- val = val + 1;
-
- if (processorPut && prevVal != null) {
- if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
- try (Transaction tx = qryCln.transactions().txStart()) {
- qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
- @Override public Void process(MutableEntry<Object, Object> e,
- Object... arg) throws EntryProcessorException {
- e.setValue(arg[0]);
-
- return null;
- }
- }, val);
-
- tx.commit();
- }
- catch (CacheException | ClusterTopologyException e) {
- log.warning("Failed put. [Key=" + key + ", val=" + val + "]");
-
- continue;
- }
- }
- else
- qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
- @Override public Void process(MutableEntry<Object, Object> e,
- Object... arg) throws EntryProcessorException {
- e.setValue(arg[0]);
-
- return null;
- }
- }, val);
- }
- else {
- if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) {
- try (Transaction tx = qryCln.transactions().txStart()) {
- qryClnCache.put(key, val);
-
- tx.commit();
- }
- catch (CacheException | ClusterTopologyException e) {
- log.warning("Failed put. [Key=" + key + ", val=" + val + "]");
-
- continue;
- }
- }
- else
- qryClnCache.put(key, val);
- }
-
- processorPut = !processorPut;
-
- vals.put(key, val);
-
- List<T2<Integer, Integer>> keyEvts = expEvts.get(key);
-
- if (keyEvts == null) {
- keyEvts = new ArrayList<>();
-
- expEvts.put(key, keyEvts);
- }
-
- keyEvts.add(new T2<>(val, prevVal));
-
- CountDownLatch latch = checkLatch.get();
-
- if (latch != null) {
- log.info("Check events.");
-
- checkLatch.set(null);
-
- boolean success = false;
-
- try {
- if (err)
- break;
-
- boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return checkEvents(false, expEvts, lsnr);
- }
- }, 10_000);
-
- if (!check)
- assertTrue(checkEvents(true, expEvts, lsnr));
-
- success = true;
-
- log.info("Events checked.");
- }
- finally {
- if (!success)
- err = true;
-
- latch.countDown();
- }
- }
- }
- }
- finally {
- stop.set(true);
- }
-
- CountDownLatch latch = checkLatch.get();
-
- if (latch != null)
- latch.countDown();
-
- restartFut.get();
-
- boolean check = true;
-
- if (!expEvts.isEmpty())
- check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return checkEvents(false, expEvts, lsnr);
- }
- }, 10_000);
-
- if (!check)
- assertTrue(checkEvents(true, expEvts, lsnr));
-
- cur.close();
-
- assertFalse("Unexpected error during test, see log for details.", err);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testFailoverFilter() throws Exception {
- this.backups = 2;
-
- final int SRV_NODES = 4;
-
- startGridsMultiThreaded(SRV_NODES);
-
- client = true;
-
- Ignite qryClient = startGrid(SRV_NODES);
-
- client = false;
-
- IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
-
- final CacheEventListener2 lsnr = new CacheEventListener2();
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- qry.setLocalListener(lsnr);
-
- qry.setRemoteFilter(new CacheEventFilter());
-
- QueryCursor<?> cur = qryClientCache.query(qry);
-
- final AtomicBoolean stop = new AtomicBoolean();
-
- final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
-
- IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- final int idx = SRV_NODES + 1;
-
- while (!stop.get() && !err) {
- log.info("Start node: " + idx);
-
- startGrid(idx);
-
- Thread.sleep(200);
-
- log.info("Stop node: " + idx);
-
- stopGrid(idx);
-
- CountDownLatch latch = new CountDownLatch(1);
-
- assertTrue(checkLatch.compareAndSet(null, latch));
-
- if (!stop.get()) {
- log.info("Wait for event check.");
-
- assertTrue(latch.await(1, MINUTES));
- }
- }
-
- return null;
- }
- });
-
- final Map<Integer, Integer> vals = new HashMap<>();
-
- final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
-
- try {
- long stopTime = System.currentTimeMillis() + 60_000;
-
- final int PARTS = qryClient.affinity(null).partitions();
-
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- boolean filtered = false;
-
- boolean processorPut = false;
-
- while (System.currentTimeMillis() < stopTime) {
- Integer key = rnd.nextInt(PARTS);
-
- Integer prevVal = vals.get(key);
- Integer val = vals.get(key);
-
- if (val == null)
- val = 0;
- else
- val = Math.abs(val) + 1;
-
- if (filtered)
- val = -val;
-
- if (processorPut && prevVal != null) {
- qryClientCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
- @Override public Void process(MutableEntry<Object, Object> entry,
- Object... arguments) throws EntryProcessorException {
- entry.setValue(arguments[0]);
-
- return null;
- }
- }, val);
- }
- else
- qryClientCache.put(key, val);
-
- processorPut = !processorPut;
-
- vals.put(key, val);
-
- if (val >= 0) {
- List<T2<Integer, Integer>> keyEvts = expEvts.get(key);
-
- if (keyEvts == null) {
- keyEvts = new ArrayList<>();
-
- expEvts.put(key, keyEvts);
- }
-
- keyEvts.add(new T2<>(val, prevVal));
- }
-
- filtered = !filtered;
-
- CountDownLatch latch = checkLatch.get();
-
- if (latch != null) {
- log.info("Check events.");
-
- checkLatch.set(null);
-
- boolean success = false;
-
- try {
- if (err)
- break;
-
- boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return checkEvents(false, expEvts, lsnr);
- }
- }, 10_000);
-
- if (!check)
- assertTrue(checkEvents(true, expEvts, lsnr));
-
- success = true;
-
- log.info("Events checked.");
- }
- finally {
- if (!success)
- err = true;
-
- latch.countDown();
- }
- }
- }
- }
- finally {
- stop.set(true);
- }
-
- CountDownLatch latch = checkLatch.get();
-
- if (latch != null)
- latch.countDown();
-
- restartFut.get();
-
- boolean check = true;
-
- if (!expEvts.isEmpty()) {
- check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return checkEvents(false, expEvts, lsnr);
- }
- }, 10_000);
- }
-
- if (!check)
- assertTrue(checkEvents(true, expEvts, lsnr));
-
- cur.close();
-
- assertFalse("Unexpected error during test, see log for details.", err);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testFailoverStartStopBackup() throws Exception {
- failoverStartStopFilter(2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testStartStop() throws Exception {
- this.backups = 2;
-
- final int SRV_NODES = 4;
-
- startGridsMultiThreaded(SRV_NODES);
-
- client = true;
-
- Ignite qryClient = startGrid(SRV_NODES);
-
- client = false;
-
- IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
-
- Affinity<Object> aff = qryClient.affinity(null);
-
- final CacheEventListener2 lsnr = new CacheEventListener2();
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- qry.setLocalListener(lsnr);
-
- qry.setRemoteFilter(new CacheEventFilter());
-
- QueryCursor<?> cur = qryClnCache.query(qry);
-
- for (int i = 0; i < 10; i++) {
- final int idx = i % (SRV_NODES - 1);
-
- log.info("Stop node: " + idx);
-
- stopGrid(idx);
-
- awaitPartitionMapExchange();
-
- List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
-
- for (int j = 0; j < aff.partitions(); j++) {
- Integer oldVal = (Integer)qryClnCache.get(j);
-
- qryClnCache.put(j, i);
-
- afterRestEvents.add(new T3<>((Object)j, (Object)i, (Object)oldVal));
- }
-
- checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
-
- log.info("Start node: " + idx);
-
- startGrid(idx);
- }
-
- cur.close();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void failoverStartStopFilter(int backups) throws Exception {
- this.backups = backups;
-
- final int SRV_NODES = 4;
-
- startGridsMultiThreaded(SRV_NODES);
-
- client = true;
-
- Ignite qryClient = startGrid(SRV_NODES);
-
- client = false;
-
- IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
-
- final CacheEventListener2 lsnr = new CacheEventListener2();
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- qry.setLocalListener(lsnr);
-
- qry.setRemoteFilter(new CacheEventFilter());
-
- QueryCursor<?> cur = qryClnCache.query(qry);
-
- CacheEventListener2 dinLsnr = null;
-
- QueryCursor<?> dinQry = null;
-
- final AtomicBoolean stop = new AtomicBoolean();
-
- final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
-
- IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- while (!stop.get() && !err) {
- final int idx = ThreadLocalRandom.current().nextInt(SRV_NODES - 1);
-
- log.info("Stop node: " + idx);
-
- awaitPartitionMapExchange();
-
- Thread.sleep(400);
-
- stopGrid(idx);
-
- awaitPartitionMapExchange();
-
- Thread.sleep(400);
-
- log.info("Start node: " + idx);
-
- startGrid(idx);
-
- Thread.sleep(200);
-
- CountDownLatch latch = new CountDownLatch(1);
-
- assertTrue(checkLatch.compareAndSet(null, latch));
-
- if (!stop.get()) {
- log.info("Wait for event check.");
-
- assertTrue(latch.await(1, MINUTES));
- }
- }
-
- return null;
- }
- });
-
- final Map<Integer, Integer> vals = new HashMap<>();
-
- final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
-
- final List<T3<Object, Object, Object>> expEvtsNewLsnr = new ArrayList<>();
-
- final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>();
-
- try {
- long stopTime = System.currentTimeMillis() + 60_000;
-
- // Start new filter each 5 sec.
- long startFilterTime = System.currentTimeMillis() + 5_000;
-
- final int PARTS = qryClient.affinity(null).partitions();
-
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- boolean filtered = false;
-
- boolean processorPut = false;
-
- while (System.currentTimeMillis() < stopTime) {
- Integer key = rnd.nextInt(PARTS);
-
- Integer prevVal = vals.get(key);
- Integer val = vals.get(key);
-
- if (System.currentTimeMillis() > startFilterTime) {
- // Stop filter and check events.
- if (dinQry != null) {
- dinQry.close();
-
- log.info("Continuous query listener closed. Await events: " + expEvtsNewLsnr.size());
-
- checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
- }
-
- dinLsnr = new CacheEventListener2();
-
- ContinuousQuery<Object, Object> newQry = new ContinuousQuery<>();
-
- newQry.setLocalListener(dinLsnr);
-
- newQry.setRemoteFilter(new CacheEventFilter());
-
- dinQry = qryClnCache.query(newQry);
-
- log.info("Continuous query listener started.");
-
- startFilterTime = System.currentTimeMillis() + 5_000;
- }
-
- if (val == null)
- val = 0;
- else
- val = Math.abs(val) + 1;
-
- if (filtered)
- val = -val;
-
- if (processorPut && prevVal != null) {
- qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() {
- @Override public Void process(MutableEntry<Object, Object> entry,
- Object... arguments) throws EntryProcessorException {
- entry.setValue(arguments[0]);
-
- return null;
- }
- }, val);
- }
- else
- qryClnCache.put(key, val);
-
- processorPut = !processorPut;
-
- vals.put(key, val);
-
- if (val >= 0) {
- List<T2<Integer, Integer>> keyEvts = expEvts.get(key);
-
- if (keyEvts == null) {
- keyEvts = new ArrayList<>();
-
- expEvts.put(key, keyEvts);
- }
-
- keyEvts.add(new T2<>(val, prevVal));
-
- T3<Object, Object, Object> tupVal = new T3<>((Object)key, (Object)val, (Object)prevVal);
-
- expEvtsLsnr.add(tupVal);
-
- if (dinQry != null)
- expEvtsNewLsnr.add(tupVal);
- }
-
- filtered = !filtered;
-
- CountDownLatch latch = checkLatch.get();
-
- if (latch != null) {
- log.info("Check events.");
-
- checkLatch.set(null);
-
- boolean success = false;
-
- try {
- if (err)
- break;
-
- checkEvents(expEvtsLsnr, lsnr, backups == 0);
-
- success = true;
-
- log.info("Events checked.");
- }
- finally {
- if (!success)
- err = true;
-
- latch.countDown();
- }
- }
- }
- }
- finally {
- stop.set(true);
- }
-
- CountDownLatch latch = checkLatch.get();
-
- if (latch != null)
- latch.countDown();
-
- restartFut.get();
-
- checkEvents(expEvtsLsnr, lsnr, backups == 0);
-
- lsnr.evts.clear();
- lsnr.vals.clear();
-
- if (dinQry != null) {
- checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
-
- dinLsnr.evts.clear();
- dinLsnr.vals.clear();
- }
-
- List<T3<Object, Object, Object>> afterRestEvents = new ArrayList<>();
-
- for (int i = 0; i < qryClient.affinity(null).partitions(); i++) {
- Integer oldVal = (Integer)qryClnCache.get(i);
-
- qryClnCache.put(i, i);
-
- afterRestEvents.add(new T3<>((Object)i, (Object)i, (Object)oldVal));
- }
-
- checkEvents(new ArrayList<>(afterRestEvents), lsnr, false);
-
- cur.close();
-
- if (dinQry != null) {
- checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false);
-
- dinQry.close();
- }
-
- assertFalse("Unexpected error during test, see log for details.", err);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testMultiThreadedFailover() throws Exception {
- this.backups = 2;
-
- final int SRV_NODES = 4;
-
- startGridsMultiThreaded(SRV_NODES);
-
- client = true;
-
- final Ignite qryCln = startGrid(SRV_NODES);
-
- client = false;
-
- final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null);
-
- final CacheEventListener2 lsnr = new CacheEventListener2();
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- qry.setLocalListener(lsnr);
-
- QueryCursor<?> cur = qryClnCache.query(qry);
-
- final AtomicBoolean stop = new AtomicBoolean();
-
- final int THREAD = 4;
-
- final int PARTS = THREAD;
-
- final List<List<T3<Object, Object, Object>>> expEvts = new ArrayList<>(THREAD + 5);
-
- for (int i = 0; i < THREAD; i++)
- expEvts.add(i, new ArrayList<T3<Object, Object, Object>>());
-
- final AtomicReference<CyclicBarrier> checkBarrier = new AtomicReference<>();
-
- IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
- final int idx = SRV_NODES + 1;
-
- while (!stop.get() && !err) {
- log.info("Start node: " + idx);
-
- startGrid(idx);
-
- awaitPartitionMapExchange();
-
- Thread.sleep(100);
-
- try {
- log.info("Stop node: " + idx);
-
- stopGrid(idx);
-
- awaitPartitionMapExchange();
-
- Thread.sleep(100);
- }
- catch (Exception e) {
- log.warning("Failed to stop nodes.", e);
- }
-
- CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() {
- @Override public void run() {
- try {
- GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
- int size = 0;
-
- for (List<T3<Object, Object, Object>> evt : expEvts)
- size += evt.size();
-
- return lsnr.size() <= size;
- }
- }, 2000L);
-
- List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
-
- for (List<T3<Object, Object, Object>> evt : expEvts)
- expEvts0.addAll(evt);
-
- checkEvents(expEvts0, lsnr, false);
-
- for (List<T3<Object, Object, Object>> evt : expEvts)
- evt.clear();
- }
- catch (Exception e) {
- log.error("Failed.", e);
-
- err = true;
-
- stop.set(true);
- }
- finally {
- checkBarrier.set(null);
- }
- }
- });
-
- assertTrue(checkBarrier.compareAndSet(null, bar));
-
- if (!stop.get() && !err)
- bar.await(5, MINUTES);
- }
-
- return null;
- }
- });
-
- final long stopTime = System.currentTimeMillis() + 60_000;
-
- final AtomicInteger valCntr = new AtomicInteger(0);
-
- final AtomicInteger threadSeq = new AtomicInteger(0);
-
- GridTestUtils.runMultiThreaded(new Runnable() {
- @Override public void run() {
- try {
- final ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- final int threadId = threadSeq.getAndIncrement();
-
- log.error("Thread id: " + threadId);
-
- while (System.currentTimeMillis() < stopTime && !stop.get() && !err) {
- Integer key = rnd.nextInt(PARTS);
-
- Integer val = valCntr.incrementAndGet();
-
- Integer prevVal = (Integer)qryClnCache.getAndPut(key, val);
-
- expEvts.get(threadId).add(new T3<>((Object)key, (Object)val, (Object)prevVal));
-
- CyclicBarrier bar = checkBarrier.get();
-
- if (bar != null)
- bar.await();
- }
- }
- catch (Exception e){
- log.error("Failed.", e);
-
- err = true;
-
- stop.set(true);
- }
- finally {
- stop.set(true);
- }
- }
- }, THREAD, "update-thread");
-
- restartFut.get();
-
- List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>();
-
- for (List<T3<Object, Object, Object>> evt : expEvts) {
- expEvts0.addAll(evt);
-
- evt.clear();
- }
-
- if (!expEvts0.isEmpty())
- checkEvents(expEvts0, lsnr, true);
-
- cur.close();
-
- assertFalse("Unexpected error during test, see log for details.", err);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testMultiThreaded() throws Exception {
- this.backups = 2;
-
- final int SRV_NODES = 3;
-
- startGridsMultiThreaded(SRV_NODES);
-
- client = true;
-
- Ignite qryClient = startGrid(SRV_NODES);
-
- final IgniteCache<Object, Object> cache = qryClient.cache(null);
-
- CacheEventListener1 lsnr = new CacheEventListener1(true);
-
- ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
-
- qry.setLocalListener(lsnr);
-
- QueryCursor<?> cur = cache.query(qry);
-
- client = false;
-
- final int SRV_IDX = SRV_NODES - 1;
-
- List<Integer> keys = primaryKeys(ignite(SRV_IDX).cache(null), 10);
-
- final int THREADS = 10;
-
- for (int i = 0; i < keys.size(); i++) {
- log.info("Iteration: " + i);
-
- Ignite srv = ignite(SRV_IDX);
-
- TestCommunicationSpi spi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
-
- spi.sndFirstOnly = new AtomicBoolean(false);
-
- final Integer key = keys.get(i);
-
- final AtomicInteger val = new AtomicInteger();
-
- CountDownLatch latch = new CountDownLatch(THREADS);
-
- lsnr.latch = latch;
-
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Integer val0 = val.getAndIncrement();
-
- cache.put(key, val0);
-
- return null;
- }
- }, THREADS, "update-thread");
-
- fut.get();
-
- stopGrid(SRV_IDX);
-
- if (!latch.await(5, SECONDS))
- fail("Failed to wait for notifications [exp=" + THREADS + ", left=" + lsnr.latch.getCount() + ']');
-
- assertEquals(THREADS, lsnr.allEvts.size());
-
- Set<Integer> vals = new HashSet<>();
-
- boolean err = false;
-
- for (CacheEntryEvent<?, ?> evt : lsnr.allEvts) {
- assertEquals(key, evt.getKey());
- assertNotNull(evt.getValue());
-
- if (!vals.add((Integer)evt.getValue())) {
- err = true;
-
- log.info("Extra event: " + evt);
- }
- }
-
- for (int v = 0; v < THREADS; v++) {
- if (!vals.contains(v)) {
- err = true;
-
- log.info("Event for value not received: " + v);
- }
- }
-
- assertFalse("Invalid events, see log for details.", err);
-
- lsnr.allEvts.clear();
-
- startGrid(SRV_IDX);
- }
-
- cur.close();
- }
-
- /**
- * @param logAll If {@code true} logs all unexpected values.
- * @param expEvts Expected values.
- * @param lsnr Listener.
- * @return Check status.
- */
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- private boolean checkEvents(boolean logAll,
- Map<Integer, List<T2<Integer, Integer>>> expEvts,
- CacheEventListener2 lsnr) {
- assertTrue(!expEvts.isEmpty());
-
- boolean pass = true;
-
- for (Map.Entry<Integer, List<T2<Integer, Integer>>> e : expEvts.entrySet()) {
- Integer key = e.getKey();
- List<T2<Integer, Integer>> exp = e.getValue();
-
- List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(key);
-
- if (rcvdEvts == null) {
- pass = false;
-
- log.info("No events for key [key=" + key + ", exp=" + e.getValue() + ']');
-
- if (!logAll)
- return false;
- }
- else {
- synchronized (rcvdEvts) {
- if (rcvdEvts.size() != exp.size()) {
- pass = false;
-
- log.info("Missed or extra events for key [key=" + key +
- ", exp=" + e.getValue() +
- ", rcvd=" + rcvdEvts + ']');
-
- if (!logAll)
- return false;
- }
-
- int cnt = Math.min(rcvdEvts.size(), exp.size());
-
- for (int i = 0; i < cnt; i++) {
- T2<Integer, Integer> expEvt = exp.get(i);
- CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i);
-
- if (pass) {
- assertEquals(key, rcvdEvt.getKey());
- assertEquals(expEvt.get1(), rcvdEvt.getValue());
- }
- else {
- if (!key.equals(rcvdEvt.getKey()) || !expEvt.get1().equals(rcvdEvt.getValue()))
- log.warning("Missed events. [key=" + key + ", actKey=" + rcvdEvt.getKey()
- + ", expVal=" + expEvt.get1() + ", actVal=" + rcvdEvt.getValue() + "]");
- }
- }
-
- if (!pass) {
- for (int i = cnt; i < exp.size(); i++) {
- T2<Integer, Integer> val = exp.get(i);
-
- log.warning("Missed events. [key=" + key + ", expVal=" + val.get1()
- + ", prevVal=" + val.get2() + "]");
- }
- }
- }
- }
- }
-
- if (pass) {
- expEvts.clear();
- lsnr.evts.clear();
- }
-
- return pass;
- }
-
- /**
- *
- */
- private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object, Object> {
- /** */
- private volatile CountDownLatch latch;
-
- /** */
- private GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>();
-
- /** */
- private ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
-
- /** */
- private List<CacheEntryEvent<?, ?>> allEvts;
-
- /** */
- @LoggerResource
- private IgniteLogger log;
-
- /**
- * @param saveAll Save all events flag.
- */
- CacheEventListener1(boolean saveAll) {
- if (saveAll)
- allEvts = new ArrayList<>();
- }
-
- /** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
- try {
- for (CacheEntryEvent<?, ?> evt : evts) {
- CountDownLatch latch = this.latch;
-
- log.info("Received cache event [evt=" + evt +
- ", left=" + (latch != null ? latch.getCount() : null) + ']');
-
- this.evts.put(evt.getKey(), evt);
-
- keys.add((Integer)evt.getKey());
-
- if (allEvts != null)
- allEvts.add(evt);
-
- assertTrue(latch != null);
- assertTrue(latch.getCount() > 0);
-
- latch.countDown();
-
- if (latch.getCount() == 0) {
- this.latch = null;
-
- keys.clear();
- }
- }
- }
- catch (Throwable e) {
- err = true;
-
- log.error("Unexpected error", e);
- }
- }
- }
-
- /**
- *
- */
- private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object, Object> {
- /** */
- @LoggerResource
- private IgniteLogger log;
-
- /** */
- private final ConcurrentHashMap<Integer, Integer> vals = new ConcurrentHashMap<>();
-
- /** */
- private final ConcurrentHashMap<Integer, List<CacheEntryEvent<?, ?>>> evts = new ConcurrentHashMap<>();
-
- /**
- * @return Count events.
- */
- public int size() {
- int size = 0;
-
- for (List<CacheEntryEvent<?, ?>> e : evts.values())
- size += e.size();
-
- return size;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
- throws CacheEntryListenerException {
- try {
- for (CacheEntryEvent<?, ?> evt : evts) {
- Integer key = (Integer)evt.getKey();
- Integer val = (Integer)evt.getValue();
-
- assertNotNull(key);
- assertNotNull(val);
-
- Integer prevVal = vals.get(key);
-
- boolean dup = false;
-
- if (prevVal != null && prevVal.equals(val))
- dup = true;
-
- if (!dup) {
- vals.put(key, val);
-
- List<CacheEntryEvent<?, ?>> keyEvts = this.evts.get(key);
-
- if (keyEvts == null) {
- keyEvts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
-
- this.evts.put(key, keyEvts);
- }
-
- keyEvts.add(evt);
- }
- }
- }
- catch (Throwable e) {
- err = true;
-
- log.error("Unexpected error", e);
- }
- }
- }
-
- /**
- *
- */
- public static class CacheEventListener3 implements CacheEntryUpdatedListener<Object, Object>,
- CacheEntryEventSerializableFilter<Object, Object> {
- /** Keys. */
- GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>();
-
- /** Events. */
- private final ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
-
- /** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> events) throws CacheEntryListenerException {
- for (CacheEntryEvent<?, ?> e : events) {
- Integer key = (Integer)e.getKey();
-
- keys.add(key);
-
- assert evts.put(key, e) == null;
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean evaluate(CacheEntryEvent<?, ?> e) throws CacheEntryListenerException {
- return (Integer)e.getValue() % 2 == 0;
- }
- }
-
- /**
- *
- */
- public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object, Object> {
- /** {@inheritDoc} */
- @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
- return ((Integer)event.getValue()) >= 0;
- }
- }
-
- /**
- *
- */
- private static class TestCommunicationSpi extends TcpCommunicationSpi {
- /** */
- @LoggerResource
- private IgniteLogger log;
-
- /** */
- private volatile boolean skipMsg;
-
- /** */
- private volatile boolean skipAllMsg;
-
- /** */
- private volatile AtomicBoolean sndFirstOnly;
-
- /** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
- throws IgniteSpiException {
- Object msg0 = ((GridIoMessage)msg).message();
-
- if (skipAllMsg)
- return;
-
- if (msg0 instanceof GridContinuousMessage) {
- if (skipMsg) {
- if (log.isDebugEnabled())
- log.debug("Skip continuous message: " + msg0);
-
- return;
- }
- else {
- AtomicBoolean sndFirstOnly = this.sndFirstOnly;
-
- if (sndFirstOnly != null && !sndFirstOnly.compareAndSet(false, true)) {
- if (log.isDebugEnabled())
- log.debug("Skip continuous message: " + msg0);
-
- return;
- }
- }
- }
-
- super.sendMessage(node, msg, ackC);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicNearEnabledSelfSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicNearEnabledSelfSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicNearEnabledSelfSelfTest.java
new file mode 100644
index 0000000..b3c18a9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicNearEnabledSelfSelfTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicNearEnabledSelfSelfTest
+ extends CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
+ return PRIMARY;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearCacheConfiguration() {
+ return super.nearCacheConfiguration();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.java
new file mode 100644
index 0000000..e33db45
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
+ return PRIMARY;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20324bc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
deleted file mode 100644
index 8bd7ea7..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
-
-/**
- *
- */
-public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest extends CacheContinuousQueryFailoverAbstractTest {
- /** {@inheritDoc} */
- @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
- return PRIMARY;
- }
-
- /** {@inheritDoc} */
- @Override protected CacheMode cacheMode() {
- return CacheMode.PARTITIONED;
- }
-
- /** {@inheritDoc} */
- @Override protected CacheAtomicityMode atomicityMode() {
- return CacheAtomicityMode.ATOMIC;
- }
-}