You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/11/24 13:24:32 UTC
[22/50] [abbrv] ignite git commit: IGNITE-500
CacheLoadingConcurrentGridStartSelfTest fails (DataStreamer data loss at
unstable topology in !allowOverwrite mode fixed)
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
index c1251ae..3d3f146 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
@@ -125,8 +125,6 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest {
ignite.getOrCreateCache(cacheConfiguration());
try (final DataStreamerImpl dataLdr = (DataStreamerImpl)ignite.dataStreamer(null)) {
- dataLdr.maxRemapCount(0);
-
Random rnd = new Random();
long endTime = U.currentTimeMillis() + 15_000;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
index 4e981b7..766aa84 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.processors.datastreamer;
import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteDataStreamerTimeoutException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -36,7 +38,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
* Test timeout for Data streamer.
*/
public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
-
/** Cache name. */
public static final String CACHE_NAME = "cacheName";
@@ -46,6 +47,9 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
/** Amount of entries. */
public static final int ENTRY_AMOUNT = 100;
+ /** Fail on. */
+ private static volatile int failOn;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -76,6 +80,8 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
* @throws Exception If fail.
*/
public void testTimeoutOnCloseMethod() throws Exception {
+ failOn = 1;
+
Ignite ignite = startGrid(1);
boolean thrown = false;
@@ -85,12 +91,10 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
ldr.receiver(new TestDataReceiver());
ldr.perNodeBufferSize(ENTRY_AMOUNT);
- for (int i=0; i < ENTRY_AMOUNT; i++)
+ for (int i = 0; i < ENTRY_AMOUNT; i++)
ldr.addData(i, i);
-
}
- catch (IgniteDataStreamerTimeoutException e) {
- assertEquals(e.getMessage(), "Data streamer exceeded timeout on flush.");
+ catch (CacheException | IgniteDataStreamerTimeoutException e) {
thrown = true;
}
finally {
@@ -102,40 +106,68 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
/**
* Test timeout on {@code DataStreamer.close()} method
+ *
* @throws Exception If fail.
*/
- public void testTimeoutOnAddDataMethod() throws Exception {
- Ignite ignite = startGrid(1);
+ public void testTimeoutOnAddData() throws Exception {
+ failOn = 1;
- boolean thrown = false;
+ int processed = timeoutOnAddData();
- IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME);
+ assertTrue(processed == (failOn + 1) || processed == failOn);
- try {
- ldr.timeout(TIMEOUT);
- ldr.receiver(new TestDataReceiver());
- ldr.perNodeBufferSize(ENTRY_AMOUNT/2);
- ldr.perNodeParallelOperations(1);
+ failOn = ENTRY_AMOUNT / 2;
- try {
- for (int i=0; i < ENTRY_AMOUNT; i++)
- ldr.addData(i, i);
- }
- catch (IgniteDataStreamerTimeoutException e) {
- assertEquals(e.getMessage(), "Data streamer exceeded timeout when starts parallel operation.");
+ processed = timeoutOnAddData();
+
+ assertTrue(processed == (failOn + 1) || processed == failOn);
+ failOn = ENTRY_AMOUNT;
+
+ processed = timeoutOnAddData();
+
+ assertTrue(processed == (failOn + 1) || processed == failOn);
+ }
+
+ /**
+ *
+ */
+ private int timeoutOnAddData() throws Exception {
+ boolean thrown = false;
+ int processed = 0;
+
+ try {
+ Ignite ignite = startGrid(1);
+
+ try (IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME)) {
+ ldr.timeout(TIMEOUT);
+ ldr.receiver(new TestDataReceiver());
+ ldr.perNodeBufferSize(1);
+ ldr.perNodeParallelOperations(1);
+ ((DataStreamerImpl)ldr).maxRemapCount(0);
+
+ try {
+ for (int i = 0; i < ENTRY_AMOUNT; i++) {
+ ldr.addData(i, i);
+
+ processed++;
+ }
+ }
+ catch (IllegalStateException e) {
+ // No-op.
+ }
+ }
+ catch (CacheException | IgniteDataStreamerTimeoutException e) {
thrown = true;
}
-
}
finally {
- if (thrown)
- ldr.close(true);
-
stopAllGrids();
}
assertTrue(thrown);
+
+ return processed;
}
/**
@@ -143,16 +175,14 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
*/
private static class TestDataReceiver implements StreamReceiver {
- /** Is first. */
- boolean isFirst = true;
+ /** Count. */
+ private final AtomicInteger cnt = new AtomicInteger();
/** {@inheritDoc} */
- @Override public void receive(IgniteCache cache, Collection collection) throws IgniteException {
+ @Override public void receive(IgniteCache cache, Collection col) throws IgniteException {
try {
- if (isFirst)
+ if (cnt.incrementAndGet() == failOn)
U.sleep(2 * TIMEOUT);
-
- isFirst = false;
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index dc412a9..0663903 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest;
import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTestAllowOverwrite;
import org.apache.ignite.internal.processors.cache.distributed.CacheLockReleaseNodeLeaveTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionNotLoadedEventSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionedNearDisabledTxMultiThreadedSelfTest;
@@ -214,6 +215,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(FairAffinityFunctionBackupFilterSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedPreloadLifecycleSelfTest.class));
suite.addTest(new TestSuite(CacheLoadingConcurrentGridStartSelfTest.class));
+ suite.addTest(new TestSuite(CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.class));
suite.addTest(new TestSuite(GridCacheDhtPreloadDelayedSelfTest.class));
suite.addTest(new TestSuite(GridPartitionedBackupLoadSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedLoadCacheSelfTest.class));