You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/08/18 09:08:16 UTC

[1/3] ignite git commit: IGNITE-3692: IGFS: Test fixes.

Repository: ignite
Updated Branches:
  refs/heads/ignite-3547-1 [created] 9d200004c


IGNITE-3692: IGFS: Test fixes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09a3922d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09a3922d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09a3922d

Branch: refs/heads/ignite-3547-1
Commit: 09a3922d57f9a4c8fbe6c1056f3ea128869c250e
Parents: 278633e
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Aug 16 12:52:09 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Aug 16 12:52:09 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsAsyncImpl.java |  5 ++
 .../ignite/internal/processors/igfs/IgfsEx.java |  7 ++
 .../internal/processors/igfs/IgfsImpl.java      |  8 +-
 .../processors/igfs/IgfsAbstractSelfTest.java   | 86 ++++++++++----------
 .../igfs/IgfsBackupFailoverSelfTest.java        |  2 +-
 .../igfs/IgfsDualAbstractSelfTest.java          | 86 +-------------------
 .../internal/processors/igfs/IgfsMock.java      |  5 ++
 .../igfs/HadoopIgfsDualAbstractSelfTest.java    |  2 +-
 8 files changed, 66 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index bf3d22b..07b070e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -322,4 +322,9 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
     @Override public IgfsSecondaryFileSystem asSecondary() {
         return igfs.asSecondary();
     }
+
+    /** {@inheritDoc} */
+    @Override public void await(IgfsPath... paths) {
+        igfs.await(paths);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 4c64bc9..9760f43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -142,4 +142,11 @@ public interface IgfsEx extends IgniteFileSystem {
      * @return Secondary file system wrapper.
      */
     public IgfsSecondaryFileSystem asSecondary();
+
+    /**
+     * Await for any pending finished writes on the children paths.
+     *
+     * @param paths Paths to check.
+     */
+    public void await(IgfsPath... paths);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index e1f8e61..6707acc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -374,12 +374,8 @@ public final class IgfsImpl implements IgfsEx {
         return busyLock.enterBusy();
     }
 
-    /**
-     * Await for any pending finished writes on the children paths.
-     *
-     * @param paths Paths to check.
-     */
-    void await(IgfsPath... paths) {
+    /** {@inheritDoc} */
+    @Override public void await(IgfsPath... paths) {
         assert paths != null;
 
         for (Map.Entry<IgfsPath, IgfsFileWorkerBatch> workerEntry : workerMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 08cb929..236a589 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -489,7 +489,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testExists() throws Exception {
-        create(igfs.asSecondary(), paths(DIR), null);
+        create(igfs, paths(DIR), null);
 
         checkExist(igfs, igfsSecondary, DIR);
     }
@@ -600,7 +600,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
         IgfsPath file1 = new IgfsPath("/file1");
         IgfsPath file2 = new IgfsPath("/file2");
 
-        create(igfs.asSecondary(), null, paths(file1));
+        create(igfs, null, paths(file1));
 
         igfs.rename(file1, file2);
 
@@ -632,7 +632,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
         IgfsPath dir1 = new IgfsPath("/dir1");
         IgfsPath dir2 = new IgfsPath("/dir2");
 
-        create(igfs.asSecondary(), paths(dir1), null);
+        create(igfs, paths(dir1), null);
 
         igfs.rename(dir1, dir2);
 
@@ -661,7 +661,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testMoveFileDestinationRoot() throws Exception {
-        create(igfs.asSecondary(), paths(DIR, SUBDIR), paths(FILE));
+        create(igfs, paths(DIR, SUBDIR), paths(FILE));
 
         igfs.rename(FILE, new IgfsPath());
 
@@ -753,7 +753,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testMoveDirectoryDestinationRoot() throws Exception {
-        create(igfs.asSecondary(), paths(DIR, SUBDIR, SUBSUBDIR), null);
+        create(igfs, paths(DIR, SUBDIR, SUBSUBDIR), null);
 
         igfs.rename(SUBSUBDIR, new IgfsPath());
 
@@ -769,7 +769,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
     public void testMoveDirectorySourceParentRoot() throws Exception {
         IgfsPath dir = new IgfsPath("/" + SUBSUBDIR.name());
 
-        create(igfs.asSecondary(), paths(DIR_NEW, SUBDIR_NEW, dir), null);
+        create(igfs, paths(DIR_NEW, SUBDIR_NEW, dir), null);
 
         igfs.rename(dir, SUBDIR_NEW);
 
@@ -1163,7 +1163,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
     public void testOpen() throws Exception {
         create(igfs, paths(DIR, SUBDIR), null);
 
-        createFile(igfs.asSecondary(), FILE, true, chunk);
+        createFile(igfs, FILE, true, chunk);
 
         checkFileContent(igfs, FILE, chunk);
 
@@ -1200,7 +1200,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testSetTimes() throws Exception {
-        createFile(igfs.asSecondary(), FILE, true, chunk);
+        createFile(igfs, FILE, true, chunk);
 
         checkExist(igfs, igfsSecondary, DIR);
         checkExist(igfs, igfsSecondary, SUBDIR);
@@ -1312,7 +1312,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
     public void testCreate() throws Exception {
         create(igfs, paths(DIR, SUBDIR), null);
 
-        createFile(igfs.asSecondary(), FILE, true, chunk);
+        createFile(igfs, FILE, true, chunk);
 
         checkFile(igfs, igfsSecondary, FILE, chunk);
 
@@ -1396,7 +1396,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
     public void testCreateParentRoot() throws Exception {
         IgfsPath file = new IgfsPath("/" + FILE.name());
 
-        createFile(igfs.asSecondary(), file, true, chunk);
+        createFile(igfs, file, true, chunk);
 
         checkFile(igfs, igfsSecondary, file, chunk);
     }
@@ -1681,7 +1681,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
                         assert igfs.exists(path);
                     }
 
-                    awaitFileClose(igfs.asSecondary(), path);
+                    awaitFileClose(igfs, path);
 
                     checkFileContent(igfs, path, chunk);
                 }
@@ -1806,7 +1806,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
             } finally {
                 U.closeQuiet(os);
 
-                awaitFileClose(igfs.asSecondary(), path2);
+                awaitFileClose(igfs, path2);
             }
 
             try {
@@ -1816,7 +1816,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
             } finally {
                 U.closeQuiet(os);
 
-                awaitFileClose(igfs.asSecondary(), path2);
+                awaitFileClose(igfs, path2);
             }
 
             checkFile(igfs, igfsSecondary, path2, chunk, chunk);
@@ -1944,7 +1944,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
         if (appendSupported()) {
             create(igfs, paths(DIR, SUBDIR), null);
 
-            createFile(igfs.asSecondary(), FILE, false);
+            createFile(igfs, FILE, false);
 
             GridTestUtils.assertThrowsInherited(log(), new Callable<Object>() {
                 @Override
@@ -1978,7 +1978,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
         if (appendSupported()) {
             create(igfs, paths(DIR, SUBDIR), null);
 
-            createFile(igfs.asSecondary(), FILE, false);
+            createFile(igfs, FILE, false);
 
             IgfsOutputStream os = null;
 
@@ -2004,9 +2004,9 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
             return;
 
         if (appendSupported()) {
-            create(igfs.asSecondary(), paths(DIR, SUBDIR), null);
+            create(igfs, paths(DIR, SUBDIR), null);
 
-            createFile(igfs.asSecondary(), FILE, false);
+            createFile(igfs, FILE, false);
 
             IgfsOutputStream os = null;
 
@@ -2034,7 +2034,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
         if (appendSupported()) {
             create(igfs, paths(DIR, SUBDIR), null);
 
-            createFile(igfs.asSecondary(), FILE, false);
+            createFile(igfs, FILE, false);
 
             IgfsOutputStream os = null;
             IgniteUuid id = null;
@@ -2089,7 +2089,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
         if (appendSupported()) {
             create(igfs, paths(DIR, SUBDIR), null);
 
-            createFile(igfs.asSecondary(), FILE, false);
+            createFile(igfs, FILE, false);
 
             IgfsOutputStream os = null;
             IgniteUuid id = null;
@@ -2146,7 +2146,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
 
             create(igfs, paths(DIR, SUBDIR), null);
 
-            createFile(igfs.asSecondary(), FILE, false);
+            createFile(igfs, FILE, false);
 
             IgfsOutputStream os = null;
 
@@ -2176,7 +2176,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
             int threadCnt = 10;
 
             for (int i = 0; i < threadCnt; i++)
-                createFile(igfs.asSecondary(), new IgfsPath("/file" + i), false);
+                createFile(igfs, new IgfsPath("/file" + i), false);
 
             multithreaded(new Runnable() {
                 @Override
@@ -2200,7 +2200,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
                             assert igfs.exists(path);
                         }
 
-                        awaitFileClose(igfs.asSecondary(), path);
+                        awaitFileClose(igfs, path);
 
                         checkFileContent(igfs, path, chunks);
                     } catch (IOException | IgniteCheckedException e) {
@@ -2271,7 +2271,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
 
             fut.get();
 
-            awaitFileClose(igfs.asSecondary(), FILE);
+            awaitFileClose(igfs, FILE);
 
             if (err.get() != null) {
                 X.println("Test failed: rethrowing first error: " + err.get());
@@ -2952,20 +2952,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      * @param files Files.
      * @throws Exception If failed.
      */
+    @SuppressWarnings("EmptyTryBlock")
     public static void create(IgfsImpl igfs, @Nullable IgfsPath[] dirs, @Nullable IgfsPath[] files) throws Exception {
-        create(igfs.asSecondary(), dirs, files);
-    }
-
-    /**
-     * Create the given directories and files in the given IGFS.
-     *
-     * @param igfs IGFS.
-     * @param dirs Directories.
-     * @param files Files.
-     * @throws Exception If failed.
-     */
-    public static void create(IgfsSecondaryFileSystem igfs, @Nullable IgfsPath[] dirs, @Nullable IgfsPath[] files)
-        throws Exception {
         if (dirs != null) {
             for (IgfsPath dir : dirs)
                 igfs.mkdirs(dir);
@@ -2973,9 +2961,11 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
 
         if (files != null) {
             for (IgfsPath file : files) {
-                OutputStream os = igfs.create(file, true);
+                try (OutputStream os = igfs.create(file, true)) {
+                    // No-op.
+                }
 
-                os.close();
+                igfs.await(file);
             }
         }
     }
@@ -3013,8 +3003,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      * @param chunks Data chunks.
      * @throws IOException In case of IO exception.
      */
-    protected static void createFile(IgfsSecondaryFileSystem igfs, IgfsPath file, boolean overwrite,
-        @Nullable byte[]... chunks) throws IOException {
+    protected static void createFile(IgfsEx igfs, IgfsPath file, boolean overwrite, @Nullable byte[]... chunks)
+        throws IOException {
         OutputStream os = null;
 
         try {
@@ -3051,7 +3041,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
             IgfsEx igfsEx = uni.igfs();
 
             if (igfsEx != null)
-                awaitFileClose(igfsEx.asSecondary(), file);
+                awaitFileClose(igfsEx, file);
         }
     }
 
@@ -3077,7 +3067,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
         finally {
             U.closeQuiet(os);
 
-            awaitFileClose(igfs.asSecondary(), file);
+            awaitFileClose(igfs, file);
         }
     }
 
@@ -3101,7 +3091,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
         finally {
             U.closeQuiet(os);
 
-            awaitFileClose(igfs.asSecondary(), file);
+            awaitFileClose(igfs, file);
         }
     }
 
@@ -3135,6 +3125,16 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
     }
 
     /**
+     * Await for previously opened output stream to close.
+     *
+     * @param igfs IGFS.
+     * @param file File.
+     */
+    public static void awaitFileClose(@Nullable IgfsEx igfs, IgfsPath file) {
+        igfs.await(file);
+    }
+
+    /**
      * Ensure that the given paths exist in the given IGFSs.
      *
      * @param igfs First IGFS.

http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
index 5be9c09..187aeeb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
@@ -563,7 +563,7 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
 
             U.closeQuiet(ios);
 
-            awaitFileClose(igfs0.asSecondary(), filePath(fileIdx));
+            awaitFileClose(igfs0, filePath(fileIdx));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
index b4ca0ca..02027d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.processors.igfs;
 
-import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsInputStream;
 import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
@@ -28,7 +26,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
-import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
@@ -1164,85 +1161,6 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
     }
 
     /**
-     * Ensure that prefetch occurs in case several blocks are read sequentially.
-     *
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("ResultOfMethodCallIgnored")
-    public void testOpenPrefetch() throws Exception {
-        create(igfsSecondary, paths(DIR, SUBDIR), paths(FILE));
-
-        // Write enough data to the secondary file system.
-        int blockSize0 = igfs.info(FILE).blockSize();
-        final int blockSize = blockSize0 != 0 ? blockSize0 : 8 * 1024;
-
-        int totalWritten = 0;
-        try (OutputStream out = igfsSecondary.openOutputStream(FILE.toString(), false)) {
-            while (totalWritten < blockSize * 2 + chunk.length) {
-                out.write(chunk);
-
-                totalWritten += chunk.length;
-            }
-        }
-
-        if (propertiesSupported())
-            awaitFileClose(igfsSecondaryFileSystem, FILE);
-        else
-            Thread.sleep(1000);
-
-        // Read the first two blocks.
-        int totalRead = 0;
-
-        IgfsInputStream in = igfs.open(FILE, blockSize);
-
-        final byte[] readBuf = new byte[1024];
-
-        while (totalRead + readBuf.length <= blockSize * 2) {
-            in.read(readBuf);
-
-            totalRead += readBuf.length;
-        }
-
-        // Wait for a while for prefetch to finish.
-        IgfsMetaManager meta = igfs.context().meta();
-
-        IgfsEntryInfo info = meta.info(meta.fileId(FILE));
-
-        assert info != null;
-
-        IgfsBlockKey key = new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 2);
-
-        IgniteCache<IgfsBlockKey, byte[]> dataCache = igfs.context().kernalContext().cache().jcache(
-            igfs.configuration().getDataCacheName());
-
-        for (int i = 0; i < 10; i++) {
-            if (dataCache.containsKey(key))
-                break;
-            else
-                U.sleep(100);
-        }
-
-        // Remove the file from the secondary file system.
-        igfsSecondary.delete(FILE.toString(), false);
-
-        // Let's wait for file will be deleted.
-        U.sleep(300);
-
-        // Read the third block.
-        totalRead = 0;
-
-        in.seek(blockSize * 2);
-
-        while (totalRead + readBuf.length <= blockSize) {
-            in.read(readBuf);
-
-            totalRead += readBuf.length;
-        }
-
-        in.close();
-    }
-
-    /**
      * Test create when parent directory is partially missing locally.
      *
      * @throws Exception If failed.
@@ -1251,7 +1169,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
         create(igfsSecondary, paths(DIR, SUBDIR), null);
         create(igfs, paths(DIR), null);
 
-        createFile(igfs.asSecondary(), FILE, true, chunk);
+        createFile(igfs, FILE, true, chunk);
 
         // Ensure that directory structure was created.
         checkExist(igfs, igfsSecondary, SUBDIR);
@@ -1287,7 +1205,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
         create(igfsSecondary, paths(DIR, SUBDIR), null);
         create(igfs, null, null);
 
-        createFile(igfs.asSecondary(), FILE, true, chunk);
+        createFile(igfs, FILE, true, chunk);
 
         checkExist(igfs, igfsSecondary, SUBDIR);
         checkFile(igfs, igfsSecondary, FILE, chunk);

http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
index a2bd9ca..0138907 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
@@ -376,6 +376,11 @@ public class IgfsMock implements IgfsEx {
     }
 
     /** {@inheritDoc} */
+    @Override public void await(IgfsPath... paths) {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteFileSystem withAsync() {
         throwUnsupported();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
index 40cf493..bb155b4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
@@ -252,7 +252,7 @@ public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractT
 
         out.close();
 
-        awaitFileClose(igfsSecondary.asSecondary(), FILE);
+        awaitFileClose(igfsSecondary, FILE);
 
         // Instantiate file system with overridden "seq reads before prefetch" property.
         Configuration cfg = new Configuration();


[3/3] ignite git commit: ignite-3547

Posted by sb...@apache.org.
ignite-3547


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d200004
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d200004
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d200004

Branch: refs/heads/ignite-3547-1
Commit: 9d200004c28e83a0b7024611d3e1b485fe3f317d
Parents: 476081b
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 18 12:03:32 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 18 12:03:32 2016 +0300

----------------------------------------------------------------------
 .../util/nio/GridTcpNioCommunicationClient.java |   5 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  49 +++--
 .../IgniteCacheConnectionRecoveryTest.java      | 202 +++++++++++++++++++
 .../IgniteCacheMessageRecoveryAbstractTest.java |  14 +-
 ...gniteCacheMessageRecoveryIdleConnection.java | 154 --------------
 ...eCacheMessageRecoveryIdleConnectionTest.java | 157 ++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   6 +-
 7 files changed, 399 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 4022bc5..5fe521d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -125,8 +125,11 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
                 if (log.isDebugEnabled())
                     log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
 
-                if (e.getCause() instanceof IOException)
+                if (e.getCause() instanceof IOException) {
+                    ses.close();
+
                     return true;
+                }
                 else
                     throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2c03b2d..d81b9f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -353,6 +353,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 UUID id = ses.meta(NODE_ID_META);
 
                 if (id != null) {
+                    GridCommunicationClient client = clients.get(id);
+
+                    if (client instanceof GridTcpNioCommunicationClient &&
+                        ((GridTcpNioCommunicationClient) client).session() == ses) {
+                        client.close();
+
+                        clients.remove(id, client);
+                    }
+
                     if (!stopping) {
                         boolean reconnect = false;
 
@@ -372,9 +381,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                 recoveryData.onNodeLeft();
                         }
 
-                        DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(id,
-                            ses,
-                            recoveryData,
+                        DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData,
                             reconnect);
 
                         commWorker.addProcessDisconnectRequest(disconnectData);
@@ -1400,6 +1407,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(']').append(U.nl());
             }
 
+            sb.append("Communication SPI clients: ").append(U.nl());
+
+            for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
+                sb.append("    [node=").append(entry.getKey())
+                    .append(", client=").append(entry.getValue())
+                    .append(']').append(U.nl());
+            }
+
             U.warn(log, sb.toString());
         }
 
@@ -1978,17 +1993,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     client.release();
 
-                    client = null;
-
                     if (!retry)
                         sentMsgsCnt.increment();
                     else {
+                        clients.remove(node.id(), client);
+
                         ClusterNode node0 = getSpiContext().node(node.id());
 
                         if (node0 == null)
                             throw new IgniteCheckedException("Failed to send message to remote node " +
                                 "(node has left the grid): " + node.id());
                     }
+
+                    client = null;
                 }
                 while (retry);
             }
@@ -3187,12 +3204,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
          * @param sesInfo Disconnected session information.
          */
         private void processDisconnect(DisconnectedSessionInfo sesInfo) {
-            GridCommunicationClient client = clients.get(sesInfo.nodeId);
-
-            if (client instanceof GridTcpNioCommunicationClient &&
-                ((GridTcpNioCommunicationClient) client).session() == sesInfo.ses)
-                    clients.remove(sesInfo.nodeId, client);
-
             if (sesInfo.reconnect) {
                 GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
 
@@ -3205,7 +3216,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
 
-                    client = reserveClient(node);
+                    GridCommunicationClient client = reserveClient(node);
 
                     client.release();
                 }
@@ -3756,29 +3767,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     private static class DisconnectedSessionInfo {
         /** */
-        private final UUID nodeId;
-
-        /** */
-        private final GridNioSession ses;
-
-        /** */
         private final GridNioRecoveryDescriptor recoveryDesc;
 
         /** */
         private final boolean reconnect;
 
         /**
-         * @param nodeId Node ID.
-         * @param ses Session.
          * @param recoveryDesc Recovery descriptor.
          * @param reconnect Reconnect flag.
          */
-        public DisconnectedSessionInfo(UUID nodeId,
-            GridNioSession ses,
-            @Nullable GridNioRecoveryDescriptor recoveryDesc,
+        DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc,
             boolean reconnect) {
-            this.nodeId = nodeId;
-            this.ses = ses;
             this.recoveryDesc = recoveryDesc;
             this.reconnect = reconnect;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
new file mode 100644
index 0000000..4a07674
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheConnectionRecoveryTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static final int SRVS = 5;
+
+    /** */
+    private static final int CLIENTS = 5;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        cfg.setCacheConfiguration(
+            cacheConfiguration("cache1", TRANSACTIONAL),
+            cacheConfiguration("cache2", ATOMIC));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(SRVS);
+
+        client = true;
+
+        startGridsMultiThreaded(SRVS, CLIENTS);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRecovery() throws Exception {
+        final Map<Integer, Integer> data = new TreeMap<>();
+
+        for (int i = 0; i < 500; i++)
+            data.put(i, i);
+
+        final AtomicInteger idx = new AtomicInteger();
+
+        final long stopTime = U.currentTimeMillis() + 30_000;
+
+        final AtomicReference<CyclicBarrier> barrierRef = new AtomicReference<>();
+
+        final int TEST_THREADS = (CLIENTS + SRVS) * 2;
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int idx0 = idx.getAndIncrement();
+                Ignite node = ignite(idx0 % (SRVS + CLIENTS));
+
+                Thread.currentThread().setName("test-thread-" + idx0 + "-" + node.name());
+
+                IgniteCache cache1 = node.cache("cache1").withAsync();
+                IgniteCache cache2 = node.cache("cache2").withAsync();
+
+                int iter = 0;
+
+                while (U.currentTimeMillis() < stopTime) {
+                    try {
+                        cache1.putAll(data);
+                        cache1.future().get(15, SECONDS);
+
+                        cache2.putAll(data);
+                        cache2.future().get(15, SECONDS);
+
+                        CyclicBarrier b = barrierRef.get();
+
+                        if (b != null)
+                            b.await(15, SECONDS);
+                    }
+                    catch (Exception e) {
+                        synchronized (IgniteCacheConnectionRecoveryTest.class) {
+                            log.error("Failed to execute update, will dump debug information" +
+                                " [err=" + e+ ", iter=" + iter + ']', e);
+
+                            List<Ignite> nodes = IgnitionEx.allGridsx();
+
+                            for (Ignite node0 : nodes)
+                                ((IgniteKernal)node0).dumpDebugInfo();
+
+                            U.dumpThreads(log);
+                        }
+
+                        throw e;
+                    }
+                }
+
+                return null;
+            }
+        }, TEST_THREADS, "test-thread");
+
+        while  (System.currentTimeMillis() < stopTime) {
+            boolean closed = false;
+
+            for (Ignite node : G.allGrids()) {
+                if (IgniteCacheMessageRecoveryAbstractTest.closeSessions(node))
+                    closed = true;
+            }
+
+            if (closed) {
+                CyclicBarrier b = new CyclicBarrier(TEST_THREADS + 1, new Runnable() {
+                    @Override public void run() {
+                        barrierRef.set(null);
+                    }
+                });
+
+                barrierRef.set(b);
+
+                b.await();
+            }
+
+            U.sleep(50);
+        }
+
+        fut.get();
+    }
+
+    /**
+     * @param name Cache name.
+     * @param atomicityMode Cache atomicity mode.
+     * @return Configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(REPLICATED);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 16d7e5d..0460a8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -150,7 +150,11 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
             for (int i = 0; i < 30; i++) {
                 Thread.sleep(1000);
 
-                closed |= closeSessions();
+                Ignite node0 = ignite(ThreadLocalRandom.current().nextInt(0, GRID_CNT));
+
+                log.info("Close sessions for: " + ignite.name());
+
+                closed |= closeSessions(node0);
             }
 
             assertTrue(closed);
@@ -163,13 +167,11 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
     }
 
     /**
+     * @param ignite Node.
      * @throws Exception If failed.
+     * @return {@code True} if closed at least one session.
      */
-    private boolean closeSessions() throws Exception {
-        Ignite ignite = ignite(ThreadLocalRandom.current().nextInt(0, GRID_CNT));
-
-        log.info("Close sessions for: " + ignite.name());
-
+    static boolean closeSessions(Ignite ignite) throws Exception {
         TcpCommunicationSpi commSpi = (TcpCommunicationSpi)ignite.configuration().getCommunicationSpi();
 
         Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients");

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
deleted file mode 100644
index 618fe2a..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.IgnitionEx;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
-/**
- *
- */
-public class IgniteCacheMessageRecoveryIdleConnection extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final int NODES = 3;
-
-    /** */
-    private static final long IDLE_TIMEOUT = 50;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
-
-        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
-
-        commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT);
-        commSpi.setSharedMemoryPort(-1);
-
-        cfg.setCommunicationSpi(commSpi);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return 2 * 60_000;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGridsMultiThreaded(NODES);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        super.afterTestsStopped();
-
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCacheOperationsIdleConnectionCloseTx() throws Exception {
-        cacheOperationsIdleConnectionClose(TRANSACTIONAL);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception {
-        cacheOperationsIdleConnectionClose(ATOMIC);
-    }
-
-    /**
-     * @param atomicityMode Cache atomicity mode.
-     * @throws Exception If failed.
-     */
-    private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception {
-        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
-        ccfg.setAtomicityMode(atomicityMode);
-        ccfg.setCacheMode(REPLICATED);
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
-
-        IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync();
-
-        try {
-            ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
-            int iter = 0;
-
-            long stopTime = System.currentTimeMillis() + 90_000;
-
-            while (System.currentTimeMillis() < stopTime) {
-                if (iter++ % 10 == 0)
-                    log.info("Iteration: " + iter);
-
-                cache.put(iter, 1);
-
-                IgniteFuture<?> fut = cache.future();
-
-                try {
-                    fut.get(10_000);
-                }
-                catch (IgniteException e) {
-                    List<Ignite> nodes = IgnitionEx.allGridsx();
-
-                    for (Ignite node : nodes)
-                        ((IgniteKernal)node).dumpDebugInfo();
-
-                    U.dumpThreads(log);
-
-                    throw e;
-                }
-
-                U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10));
-            }
-        }
-        finally {
-            ignite(0).destroyCache(ccfg.getName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
new file mode 100644
index 0000000..b9003cd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheMessageRecoveryIdleConnectionTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 3;
+
+    /** */
+    private static final long IDLE_TIMEOUT = 50;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT);
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 2 * 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheOperationsIdleConnectionCloseTx() throws Exception {
+        cacheOperationsIdleConnectionClose(TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception {
+        cacheOperationsIdleConnectionClose(ATOMIC);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(REPLICATED);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync();
+
+        try {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            int iter = 0;
+
+            long stopTime = System.currentTimeMillis() + 90_000;
+
+            while (System.currentTimeMillis() < stopTime) {
+                if (iter++ % 50 == 0)
+                    log.info("Iteration: " + iter);
+
+                cache.put(iter, 1);
+
+                IgniteFuture<?> fut = cache.future();
+
+                try {
+                    fut.get(10_000);
+                }
+                catch (IgniteException e) {
+                    log.error("Failed to execute update, will dump debug information" +
+                        " [err=" + e+ ", iter=" + iter + ']', e);
+
+                    List<Ignite> nodes = IgnitionEx.allGridsx();
+
+                    for (Ignite node : nodes)
+                        ((IgniteKernal)node).dumpDebugInfo();
+
+                    U.dumpThreads(log);
+
+                    throw e;
+                }
+
+                U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10));
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 8c3f4de..2c294ba 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -125,7 +125,8 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp
 import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnection;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnectionTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageWriteTimeoutTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSystemTransactionsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxMessageRecoveryTest;
@@ -281,7 +282,8 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
         suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
         suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class);
-        suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnection.class);
+        suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnectionTest.class);
+        suite.addTestSuite(IgniteCacheConnectionRecoveryTest.class);
         GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionAtomicSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredAtomicSelfTest.class, ignoredTests);


[2/3] ignite git commit: Fixed isReadFromBackup behaviour for transaction. This closes #955.

Posted by sb...@apache.org.
Fixed isReadFromBackup behaviour for transaction. This closes #955.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/476081b9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/476081b9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/476081b9

Branch: refs/heads/ignite-3547-1
Commit: 476081b9171b1b8e5ec0a1ffd7e87092da3601d6
Parents: 09a3922
Author: Tikhonov Nikolay <ti...@gmail.com>
Authored: Wed Aug 17 14:31:20 2016 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Wed Aug 17 14:31:20 2016 +0300

----------------------------------------------------------------------
 .../near/GridNearTransactionalCache.java        |   2 +-
 .../cache/distributed/near/GridNearTxLocal.java |   4 +-
 .../CacheTxNotAllowReadFromBackupTest.java      | 297 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 4 files changed, 302 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/476081b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index e954a7f..cf5d2e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -192,7 +192,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
             keys,
             readThrough,
-            /*force primary*/needVer,
+            /*force primary*/needVer || !ctx.config().isReadFromBackup(),
             tx,
             CU.subjectId(tx, ctx.shared()),
             tx.resolveTaskName(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/476081b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index db736a5..62cf74b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -363,7 +363,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                 return cacheCtx.colocated().loadAsync(
                     key,
                     readThrough,
-                    /*force primary*/needVer,
+                    /*force primary*/needVer || !cacheCtx.config().isReadFromBackup(),
                     topVer,
                     CU.subjectId(this, cctx),
                     resolveTaskName(),
@@ -394,7 +394,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                 return cacheCtx.colocated().loadAsync(
                     keys,
                     readThrough,
-                    /*force primary*/needVer,
+                    /*force primary*/needVer || !cacheCtx.config().isReadFromBackup(),
                     topVer,
                     CU.subjectId(this, cctx),
                     resolveTaskName(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/476081b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxNotAllowReadFromBackupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxNotAllowReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxNotAllowReadFromBackupTest.java
new file mode 100644
index 0000000..30de2f9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxNotAllowReadFromBackupTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Test for query with BinaryMarshaller and different serialization modes and with reflective serializer.
+ */
+public class CacheTxNotAllowReadFromBackupTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 2;
+
+    /** */
+    private static final int KEYS = 1000;
+
+    /** */
+    private static final int BATCH_SIZE  = 10;
+
+    /** */
+    private static final int ITERATION_CNT = 5;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBackupConsistencyReplicated() throws Exception {
+        CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>("test-cache");
+
+        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+        cfg.setCacheMode(CacheMode.REPLICATED);
+        cfg.setReadFromBackup(false);
+
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBackupConsistencyReplicatedFullSync() throws Exception {
+        CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>("test-cache");
+
+        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cfg.setCacheMode(CacheMode.REPLICATED);
+        cfg.setReadFromBackup(false);
+
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBackupConsistencyPartitioned() throws Exception {
+        CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>("test-cache");
+
+        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+        cfg.setCacheMode(CacheMode.PARTITIONED);
+        cfg.setBackups(NODES - 1);
+        cfg.setReadFromBackup(false);
+
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBackupConsistencyPartitionedFullSync() throws Exception {
+        CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>("test-cache");
+
+        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cfg.setCacheMode(CacheMode.PARTITIONED);
+        cfg.setBackups(NODES - 1);
+        cfg.setReadFromBackup(false);
+
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+        checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void checkBackupConsistency(CacheConfiguration<Integer, Integer> ccfg, TransactionConcurrency txConcurrency,
+        TransactionIsolation txIsolation) throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).getOrCreateCache(ccfg);
+
+        int nodeIdx = ThreadLocalRandom.current().nextInt(NODES);
+
+        try {
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                log.info("Iteration: " + i);
+
+                // Put data in one transaction.
+                try (Transaction tx = grid(nodeIdx).transactions().txStart(txConcurrency, txIsolation)) {
+                    for (int key = 0; key < KEYS; key++)
+                        cache.put(key, key);
+
+                    tx.commit();
+                }
+
+                int missCnt = 0;
+
+                // Try to load data from another transaction.
+                try (Transaction tx = grid(nodeIdx).transactions().txStart(txConcurrency, txIsolation)) {
+                    for (int key = 0; key < KEYS; key++)
+                        if (cache.get(key) == null)
+                            ++missCnt;
+
+                    tx.commit();
+                }
+
+                assertEquals("Failed. Found missing get()", 0, missCnt);
+            }
+        }
+        finally {
+            grid(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void checkBackupConsistencyGetAll(CacheConfiguration<Integer, Integer> ccfg,
+        TransactionConcurrency txConcurrency, TransactionIsolation txIsolation) throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).getOrCreateCache(ccfg);
+
+        int nodeIdx = ThreadLocalRandom.current().nextInt(NODES);
+
+        try {
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                log.info("Iteration: " + i);
+
+                List<Set<Integer>> batches = createBatches();
+
+                // Put data in one transaction.
+                try (Transaction tx = grid(nodeIdx).transactions().txStart(txConcurrency, txIsolation)) {
+                    for (int key = 0; key < KEYS; key++)
+                        cache.put(key, key);
+
+                    tx.commit();
+                }
+
+                // Try to load data from another transaction.
+                try (Transaction tx = grid(nodeIdx).transactions().txStart(txConcurrency, txIsolation)) {
+                    for (Set<Integer> batch : batches)
+                        assertEquals("Failed. Found missing entries.", batch.size(), cache.getAll(batch).size());
+
+                    tx.commit();
+                }
+            }
+        }
+        finally {
+            grid(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @return Batches.
+     */
+    @NotNull private List<Set<Integer>> createBatches() {
+        List<Set<Integer>> batches = new ArrayList<>(KEYS / BATCH_SIZE + 1);
+
+        int size = BATCH_SIZE;
+        Set<Integer> batch = new HashSet<>();
+
+        for (int key = 0; key < KEYS; key++) {
+            batch.add(key);
+
+            if (--size == 0) {
+                size = BATCH_SIZE;
+                batch = new HashSet<>();
+                batches.add(batch);
+            }
+        }
+        return batches;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/476081b9/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 018fa17..60d59d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynam
 import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartAtomicTest;
 import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartTxTest;
 import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTest;
+import org.apache.ignite.internal.processors.cache.CacheTxNotAllowReadFromBackupTest;
 import org.apache.ignite.internal.processors.cache.CrossCacheLockTest;
 import org.apache.ignite.internal.processors.cache.GridCacheMarshallingNodeJoinSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheMultinodeUpdateAtomicNearEnabledSelfTest;
@@ -263,6 +264,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(CacheGetEntryPessimisticReadCommittedSeltTest.class);
         suite.addTestSuite(CacheGetEntryPessimisticRepeatableReadSeltTest.class);
         suite.addTestSuite(CacheGetEntryPessimisticSerializableSeltTest.class);
+        suite.addTestSuite(CacheTxNotAllowReadFromBackupTest.class);
 
         suite.addTestSuite(CacheStopAndDestroySelfTest.class);