You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/22 11:39:58 UTC

[09/50] [abbrv] ignite git commit: IGNITE-500 CacheLoadingConcurrentGridStartSelfTest fails (DataStreamer data loss at unstable topology in !allowOverwrite mode fixed)

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
index c1251ae..3d3f146 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
@@ -125,8 +125,6 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest {
                 ignite.getOrCreateCache(cacheConfiguration());
 
             try (final DataStreamerImpl dataLdr = (DataStreamerImpl)ignite.dataStreamer(null)) {
-                dataLdr.maxRemapCount(0);
-
                 Random rnd = new Random();
 
                 long endTime = U.currentTimeMillis() + 15_000;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
index 4e981b7..766aa84 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerTimeoutTest.java
@@ -17,11 +17,13 @@
 package org.apache.ignite.internal.processors.datastreamer;
 
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteDataStreamerTimeoutException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -36,7 +38,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
  * Test timeout for Data streamer.
  */
 public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
-
     /** Cache name. */
     public static final String CACHE_NAME = "cacheName";
 
@@ -46,6 +47,9 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
     /** Amount of entries. */
     public static final int ENTRY_AMOUNT = 100;
 
+    /** Fail on. */
+    private static volatile int failOn;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -76,6 +80,8 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
      * @throws Exception If fail.
      */
     public void testTimeoutOnCloseMethod() throws Exception {
+        failOn = 1;
+
         Ignite ignite = startGrid(1);
 
         boolean thrown = false;
@@ -85,12 +91,10 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
             ldr.receiver(new TestDataReceiver());
             ldr.perNodeBufferSize(ENTRY_AMOUNT);
 
-            for (int i=0; i < ENTRY_AMOUNT; i++)
+            for (int i = 0; i < ENTRY_AMOUNT; i++)
                 ldr.addData(i, i);
-
         }
-        catch (IgniteDataStreamerTimeoutException e) {
-            assertEquals(e.getMessage(), "Data streamer exceeded timeout on flush.");
+        catch (CacheException | IgniteDataStreamerTimeoutException e) {
             thrown = true;
         }
         finally {
@@ -102,40 +106,68 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
 
     /**
      * Test timeout on {@code DataStreamer.close()} method
+     *
      * @throws Exception If fail.
      */
-    public void testTimeoutOnAddDataMethod() throws Exception {
-        Ignite ignite = startGrid(1);
+    public void testTimeoutOnAddData() throws Exception {
+        failOn = 1;
 
-        boolean thrown = false;
+        int processed = timeoutOnAddData();
 
-        IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME);
+        assertTrue(processed == (failOn + 1) || processed == failOn);
 
-        try {
-            ldr.timeout(TIMEOUT);
-            ldr.receiver(new TestDataReceiver());
-            ldr.perNodeBufferSize(ENTRY_AMOUNT/2);
-            ldr.perNodeParallelOperations(1);
+        failOn = ENTRY_AMOUNT / 2;
 
-            try {
-                for (int i=0; i < ENTRY_AMOUNT; i++)
-                    ldr.addData(i, i);
-            }
-            catch (IgniteDataStreamerTimeoutException e) {
-                assertEquals(e.getMessage(), "Data streamer exceeded timeout when starts parallel operation.");
+        processed = timeoutOnAddData();
+
+        assertTrue(processed == (failOn + 1) || processed == failOn);
 
+        failOn = ENTRY_AMOUNT;
+
+        processed = timeoutOnAddData();
+
+        assertTrue(processed == (failOn + 1) || processed == failOn);
+    }
+
+    /**
+     *
+     */
+    private int timeoutOnAddData() throws Exception {
+        boolean thrown = false;
+        int processed = 0;
+
+        try {
+            Ignite ignite = startGrid(1);
+
+            try (IgniteDataStreamer ldr = ignite.dataStreamer(CACHE_NAME)) {
+                ldr.timeout(TIMEOUT);
+                ldr.receiver(new TestDataReceiver());
+                ldr.perNodeBufferSize(1);
+                ldr.perNodeParallelOperations(1);
+                ((DataStreamerImpl)ldr).maxRemapCount(0);
+
+                try {
+                    for (int i = 0; i < ENTRY_AMOUNT; i++) {
+                        ldr.addData(i, i);
+
+                        processed++;
+                    }
+                }
+                catch (IllegalStateException e) {
+                    // No-op.
+                }
+            }
+            catch (CacheException | IgniteDataStreamerTimeoutException e) {
                 thrown = true;
             }
-
         }
         finally {
-            if (thrown)
-                ldr.close(true);
-
             stopAllGrids();
         }
 
         assertTrue(thrown);
+
+        return processed;
     }
 
     /**
@@ -143,16 +175,14 @@ public class DataStreamerTimeoutTest extends GridCommonAbstractTest {
      */
     private static class TestDataReceiver implements StreamReceiver {
 
-        /** Is first. */
-        boolean isFirst = true;
+        /** Count. */
+        private final AtomicInteger cnt = new AtomicInteger();
 
         /** {@inheritDoc} */
-        @Override public void receive(IgniteCache cache, Collection collection) throws IgniteException {
+        @Override public void receive(IgniteCache cache, Collection col) throws IgniteException {
             try {
-                if (isFirst)
+                if (cnt.incrementAndGet() == failOn)
                     U.sleep(2 * TIMEOUT);
-
-                isFirst = false;
             }
             catch (IgniteInterruptedCheckedException e) {
                 throw new IgniteException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index dc412a9..0663903 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest;
 import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
 import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTestAllowOverwrite;
 import org.apache.ignite.internal.processors.cache.distributed.CacheLockReleaseNodeLeaveTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionNotLoadedEventSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionedNearDisabledTxMultiThreadedSelfTest;
@@ -214,6 +215,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTest(new TestSuite(FairAffinityFunctionBackupFilterSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedPreloadLifecycleSelfTest.class));
         suite.addTest(new TestSuite(CacheLoadingConcurrentGridStartSelfTest.class));
+        suite.addTest(new TestSuite(CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.class));
         suite.addTest(new TestSuite(GridCacheDhtPreloadDelayedSelfTest.class));
         suite.addTest(new TestSuite(GridPartitionedBackupLoadSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedLoadCacheSelfTest.class));