You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/08/18 09:08:16 UTC
[1/3] ignite git commit: IGNITE-3692: IGFS: Test fixes.
Repository: ignite
Updated Branches:
refs/heads/ignite-3547-1 [created] 9d200004c
IGNITE-3692: IGFS: Test fixes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09a3922d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09a3922d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09a3922d
Branch: refs/heads/ignite-3547-1
Commit: 09a3922d57f9a4c8fbe6c1056f3ea128869c250e
Parents: 278633e
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Aug 16 12:52:09 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Aug 16 12:52:09 2016 +0300
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsAsyncImpl.java | 5 ++
.../ignite/internal/processors/igfs/IgfsEx.java | 7 ++
.../internal/processors/igfs/IgfsImpl.java | 8 +-
.../processors/igfs/IgfsAbstractSelfTest.java | 86 ++++++++++----------
.../igfs/IgfsBackupFailoverSelfTest.java | 2 +-
.../igfs/IgfsDualAbstractSelfTest.java | 86 +-------------------
.../internal/processors/igfs/IgfsMock.java | 5 ++
.../igfs/HadoopIgfsDualAbstractSelfTest.java | 2 +-
8 files changed, 66 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index bf3d22b..07b070e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -322,4 +322,9 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
@Override public IgfsSecondaryFileSystem asSecondary() {
return igfs.asSecondary();
}
+
+ /** {@inheritDoc} */
+ @Override public void await(IgfsPath... paths) {
+ igfs.await(paths);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 4c64bc9..9760f43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -142,4 +142,11 @@ public interface IgfsEx extends IgniteFileSystem {
* @return Secondary file system wrapper.
*/
public IgfsSecondaryFileSystem asSecondary();
+
+ /**
+ * Await for any pending finished writes on the children paths.
+ *
+ * @param paths Paths to check.
+ */
+ public void await(IgfsPath... paths);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index e1f8e61..6707acc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -374,12 +374,8 @@ public final class IgfsImpl implements IgfsEx {
return busyLock.enterBusy();
}
- /**
- * Await for any pending finished writes on the children paths.
- *
- * @param paths Paths to check.
- */
- void await(IgfsPath... paths) {
+ /** {@inheritDoc} */
+ @Override public void await(IgfsPath... paths) {
assert paths != null;
for (Map.Entry<IgfsPath, IgfsFileWorkerBatch> workerEntry : workerMap.entrySet()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 08cb929..236a589 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -489,7 +489,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @throws Exception If failed.
*/
public void testExists() throws Exception {
- create(igfs.asSecondary(), paths(DIR), null);
+ create(igfs, paths(DIR), null);
checkExist(igfs, igfsSecondary, DIR);
}
@@ -600,7 +600,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
IgfsPath file1 = new IgfsPath("/file1");
IgfsPath file2 = new IgfsPath("/file2");
- create(igfs.asSecondary(), null, paths(file1));
+ create(igfs, null, paths(file1));
igfs.rename(file1, file2);
@@ -632,7 +632,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
IgfsPath dir1 = new IgfsPath("/dir1");
IgfsPath dir2 = new IgfsPath("/dir2");
- create(igfs.asSecondary(), paths(dir1), null);
+ create(igfs, paths(dir1), null);
igfs.rename(dir1, dir2);
@@ -661,7 +661,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @throws Exception If failed.
*/
public void testMoveFileDestinationRoot() throws Exception {
- create(igfs.asSecondary(), paths(DIR, SUBDIR), paths(FILE));
+ create(igfs, paths(DIR, SUBDIR), paths(FILE));
igfs.rename(FILE, new IgfsPath());
@@ -753,7 +753,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @throws Exception If failed.
*/
public void testMoveDirectoryDestinationRoot() throws Exception {
- create(igfs.asSecondary(), paths(DIR, SUBDIR, SUBSUBDIR), null);
+ create(igfs, paths(DIR, SUBDIR, SUBSUBDIR), null);
igfs.rename(SUBSUBDIR, new IgfsPath());
@@ -769,7 +769,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
public void testMoveDirectorySourceParentRoot() throws Exception {
IgfsPath dir = new IgfsPath("/" + SUBSUBDIR.name());
- create(igfs.asSecondary(), paths(DIR_NEW, SUBDIR_NEW, dir), null);
+ create(igfs, paths(DIR_NEW, SUBDIR_NEW, dir), null);
igfs.rename(dir, SUBDIR_NEW);
@@ -1163,7 +1163,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
public void testOpen() throws Exception {
create(igfs, paths(DIR, SUBDIR), null);
- createFile(igfs.asSecondary(), FILE, true, chunk);
+ createFile(igfs, FILE, true, chunk);
checkFileContent(igfs, FILE, chunk);
@@ -1200,7 +1200,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @throws Exception If failed.
*/
public void testSetTimes() throws Exception {
- createFile(igfs.asSecondary(), FILE, true, chunk);
+ createFile(igfs, FILE, true, chunk);
checkExist(igfs, igfsSecondary, DIR);
checkExist(igfs, igfsSecondary, SUBDIR);
@@ -1312,7 +1312,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
public void testCreate() throws Exception {
create(igfs, paths(DIR, SUBDIR), null);
- createFile(igfs.asSecondary(), FILE, true, chunk);
+ createFile(igfs, FILE, true, chunk);
checkFile(igfs, igfsSecondary, FILE, chunk);
@@ -1396,7 +1396,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
public void testCreateParentRoot() throws Exception {
IgfsPath file = new IgfsPath("/" + FILE.name());
- createFile(igfs.asSecondary(), file, true, chunk);
+ createFile(igfs, file, true, chunk);
checkFile(igfs, igfsSecondary, file, chunk);
}
@@ -1681,7 +1681,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
assert igfs.exists(path);
}
- awaitFileClose(igfs.asSecondary(), path);
+ awaitFileClose(igfs, path);
checkFileContent(igfs, path, chunk);
}
@@ -1806,7 +1806,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
} finally {
U.closeQuiet(os);
- awaitFileClose(igfs.asSecondary(), path2);
+ awaitFileClose(igfs, path2);
}
try {
@@ -1816,7 +1816,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
} finally {
U.closeQuiet(os);
- awaitFileClose(igfs.asSecondary(), path2);
+ awaitFileClose(igfs, path2);
}
checkFile(igfs, igfsSecondary, path2, chunk, chunk);
@@ -1944,7 +1944,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
if (appendSupported()) {
create(igfs, paths(DIR, SUBDIR), null);
- createFile(igfs.asSecondary(), FILE, false);
+ createFile(igfs, FILE, false);
GridTestUtils.assertThrowsInherited(log(), new Callable<Object>() {
@Override
@@ -1978,7 +1978,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
if (appendSupported()) {
create(igfs, paths(DIR, SUBDIR), null);
- createFile(igfs.asSecondary(), FILE, false);
+ createFile(igfs, FILE, false);
IgfsOutputStream os = null;
@@ -2004,9 +2004,9 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
return;
if (appendSupported()) {
- create(igfs.asSecondary(), paths(DIR, SUBDIR), null);
+ create(igfs, paths(DIR, SUBDIR), null);
- createFile(igfs.asSecondary(), FILE, false);
+ createFile(igfs, FILE, false);
IgfsOutputStream os = null;
@@ -2034,7 +2034,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
if (appendSupported()) {
create(igfs, paths(DIR, SUBDIR), null);
- createFile(igfs.asSecondary(), FILE, false);
+ createFile(igfs, FILE, false);
IgfsOutputStream os = null;
IgniteUuid id = null;
@@ -2089,7 +2089,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
if (appendSupported()) {
create(igfs, paths(DIR, SUBDIR), null);
- createFile(igfs.asSecondary(), FILE, false);
+ createFile(igfs, FILE, false);
IgfsOutputStream os = null;
IgniteUuid id = null;
@@ -2146,7 +2146,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
create(igfs, paths(DIR, SUBDIR), null);
- createFile(igfs.asSecondary(), FILE, false);
+ createFile(igfs, FILE, false);
IgfsOutputStream os = null;
@@ -2176,7 +2176,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
int threadCnt = 10;
for (int i = 0; i < threadCnt; i++)
- createFile(igfs.asSecondary(), new IgfsPath("/file" + i), false);
+ createFile(igfs, new IgfsPath("/file" + i), false);
multithreaded(new Runnable() {
@Override
@@ -2200,7 +2200,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
assert igfs.exists(path);
}
- awaitFileClose(igfs.asSecondary(), path);
+ awaitFileClose(igfs, path);
checkFileContent(igfs, path, chunks);
} catch (IOException | IgniteCheckedException e) {
@@ -2271,7 +2271,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
fut.get();
- awaitFileClose(igfs.asSecondary(), FILE);
+ awaitFileClose(igfs, FILE);
if (err.get() != null) {
X.println("Test failed: rethrowing first error: " + err.get());
@@ -2952,20 +2952,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @param files Files.
* @throws Exception If failed.
*/
+ @SuppressWarnings("EmptyTryBlock")
public static void create(IgfsImpl igfs, @Nullable IgfsPath[] dirs, @Nullable IgfsPath[] files) throws Exception {
- create(igfs.asSecondary(), dirs, files);
- }
-
- /**
- * Create the given directories and files in the given IGFS.
- *
- * @param igfs IGFS.
- * @param dirs Directories.
- * @param files Files.
- * @throws Exception If failed.
- */
- public static void create(IgfsSecondaryFileSystem igfs, @Nullable IgfsPath[] dirs, @Nullable IgfsPath[] files)
- throws Exception {
if (dirs != null) {
for (IgfsPath dir : dirs)
igfs.mkdirs(dir);
@@ -2973,9 +2961,11 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
if (files != null) {
for (IgfsPath file : files) {
- OutputStream os = igfs.create(file, true);
+ try (OutputStream os = igfs.create(file, true)) {
+ // No-op.
+ }
- os.close();
+ igfs.await(file);
}
}
}
@@ -3013,8 +3003,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @param chunks Data chunks.
* @throws IOException In case of IO exception.
*/
- protected static void createFile(IgfsSecondaryFileSystem igfs, IgfsPath file, boolean overwrite,
- @Nullable byte[]... chunks) throws IOException {
+ protected static void createFile(IgfsEx igfs, IgfsPath file, boolean overwrite, @Nullable byte[]... chunks)
+ throws IOException {
OutputStream os = null;
try {
@@ -3051,7 +3041,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
IgfsEx igfsEx = uni.igfs();
if (igfsEx != null)
- awaitFileClose(igfsEx.asSecondary(), file);
+ awaitFileClose(igfsEx, file);
}
}
@@ -3077,7 +3067,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
finally {
U.closeQuiet(os);
- awaitFileClose(igfs.asSecondary(), file);
+ awaitFileClose(igfs, file);
}
}
@@ -3101,7 +3091,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
finally {
U.closeQuiet(os);
- awaitFileClose(igfs.asSecondary(), file);
+ awaitFileClose(igfs, file);
}
}
@@ -3135,6 +3125,16 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
}
/**
+ * Await for previously opened output stream to close.
+ *
+ * @param igfs IGFS.
+ * @param file File.
+ */
+ public static void awaitFileClose(@Nullable IgfsEx igfs, IgfsPath file) {
+ igfs.await(file);
+ }
+
+ /**
* Ensure that the given paths exist in the given IGFSs.
*
* @param igfs First IGFS.
http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
index 5be9c09..187aeeb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
@@ -563,7 +563,7 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
U.closeQuiet(ios);
- awaitFileClose(igfs0.asSecondary(), filePath(fileIdx));
+ awaitFileClose(igfs0, filePath(fileIdx));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
index b4ca0ca..02027d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.processors.igfs;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
@@ -28,7 +26,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
-import java.io.OutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
@@ -1164,85 +1161,6 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
}
/**
- * Ensure that prefetch occurs in case several blocks are read sequentially.
- *
- * @throws Exception If failed.
- */
- @SuppressWarnings("ResultOfMethodCallIgnored")
- public void testOpenPrefetch() throws Exception {
- create(igfsSecondary, paths(DIR, SUBDIR), paths(FILE));
-
- // Write enough data to the secondary file system.
- int blockSize0 = igfs.info(FILE).blockSize();
- final int blockSize = blockSize0 != 0 ? blockSize0 : 8 * 1024;
-
- int totalWritten = 0;
- try (OutputStream out = igfsSecondary.openOutputStream(FILE.toString(), false)) {
- while (totalWritten < blockSize * 2 + chunk.length) {
- out.write(chunk);
-
- totalWritten += chunk.length;
- }
- }
-
- if (propertiesSupported())
- awaitFileClose(igfsSecondaryFileSystem, FILE);
- else
- Thread.sleep(1000);
-
- // Read the first two blocks.
- int totalRead = 0;
-
- IgfsInputStream in = igfs.open(FILE, blockSize);
-
- final byte[] readBuf = new byte[1024];
-
- while (totalRead + readBuf.length <= blockSize * 2) {
- in.read(readBuf);
-
- totalRead += readBuf.length;
- }
-
- // Wait for a while for prefetch to finish.
- IgfsMetaManager meta = igfs.context().meta();
-
- IgfsEntryInfo info = meta.info(meta.fileId(FILE));
-
- assert info != null;
-
- IgfsBlockKey key = new IgfsBlockKey(info.id(), info.affinityKey(), info.evictExclude(), 2);
-
- IgniteCache<IgfsBlockKey, byte[]> dataCache = igfs.context().kernalContext().cache().jcache(
- igfs.configuration().getDataCacheName());
-
- for (int i = 0; i < 10; i++) {
- if (dataCache.containsKey(key))
- break;
- else
- U.sleep(100);
- }
-
- // Remove the file from the secondary file system.
- igfsSecondary.delete(FILE.toString(), false);
-
- // Let's wait for file will be deleted.
- U.sleep(300);
-
- // Read the third block.
- totalRead = 0;
-
- in.seek(blockSize * 2);
-
- while (totalRead + readBuf.length <= blockSize) {
- in.read(readBuf);
-
- totalRead += readBuf.length;
- }
-
- in.close();
- }
-
- /**
* Test create when parent directory is partially missing locally.
*
* @throws Exception If failed.
@@ -1251,7 +1169,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
create(igfsSecondary, paths(DIR, SUBDIR), null);
create(igfs, paths(DIR), null);
- createFile(igfs.asSecondary(), FILE, true, chunk);
+ createFile(igfs, FILE, true, chunk);
// Ensure that directory structure was created.
checkExist(igfs, igfsSecondary, SUBDIR);
@@ -1287,7 +1205,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
create(igfsSecondary, paths(DIR, SUBDIR), null);
create(igfs, null, null);
- createFile(igfs.asSecondary(), FILE, true, chunk);
+ createFile(igfs, FILE, true, chunk);
checkExist(igfs, igfsSecondary, SUBDIR);
checkFile(igfs, igfsSecondary, FILE, chunk);
http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
index a2bd9ca..0138907 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
@@ -376,6 +376,11 @@ public class IgfsMock implements IgfsEx {
}
/** {@inheritDoc} */
+ @Override public void await(IgfsPath... paths) {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteFileSystem withAsync() {
throwUnsupported();
http://git-wip-us.apache.org/repos/asf/ignite/blob/09a3922d/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
index 40cf493..bb155b4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java
@@ -252,7 +252,7 @@ public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractT
out.close();
- awaitFileClose(igfsSecondary.asSecondary(), FILE);
+ awaitFileClose(igfsSecondary, FILE);
// Instantiate file system with overridden "seq reads before prefetch" property.
Configuration cfg = new Configuration();
[3/3] ignite git commit: ignite-3547
Posted by sb...@apache.org.
ignite-3547
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d200004
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d200004
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d200004
Branch: refs/heads/ignite-3547-1
Commit: 9d200004c28e83a0b7024611d3e1b485fe3f317d
Parents: 476081b
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 18 12:03:32 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 18 12:03:32 2016 +0300
----------------------------------------------------------------------
.../util/nio/GridTcpNioCommunicationClient.java | 5 +-
.../communication/tcp/TcpCommunicationSpi.java | 49 +++--
.../IgniteCacheConnectionRecoveryTest.java | 202 +++++++++++++++++++
.../IgniteCacheMessageRecoveryAbstractTest.java | 14 +-
...gniteCacheMessageRecoveryIdleConnection.java | 154 --------------
...eCacheMessageRecoveryIdleConnectionTest.java | 157 ++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 6 +-
7 files changed, 399 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 4022bc5..5fe521d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -125,8 +125,11 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
if (log.isDebugEnabled())
log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
- if (e.getCause() instanceof IOException)
+ if (e.getCause() instanceof IOException) {
+ ses.close();
+
return true;
+ }
else
throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2c03b2d..d81b9f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -353,6 +353,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
UUID id = ses.meta(NODE_ID_META);
if (id != null) {
+ GridCommunicationClient client = clients.get(id);
+
+ if (client instanceof GridTcpNioCommunicationClient &&
+ ((GridTcpNioCommunicationClient) client).session() == ses) {
+ client.close();
+
+ clients.remove(id, client);
+ }
+
if (!stopping) {
boolean reconnect = false;
@@ -372,9 +381,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
recoveryData.onNodeLeft();
}
- DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(id,
- ses,
- recoveryData,
+ DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData,
reconnect);
commWorker.addProcessDisconnectRequest(disconnectData);
@@ -1400,6 +1407,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
.append(']').append(U.nl());
}
+ sb.append("Communication SPI clients: ").append(U.nl());
+
+ for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
+ sb.append(" [node=").append(entry.getKey())
+ .append(", client=").append(entry.getValue())
+ .append(']').append(U.nl());
+ }
+
U.warn(log, sb.toString());
}
@@ -1978,17 +1993,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
client.release();
- client = null;
-
if (!retry)
sentMsgsCnt.increment();
else {
+ clients.remove(node.id(), client);
+
ClusterNode node0 = getSpiContext().node(node.id());
if (node0 == null)
throw new IgniteCheckedException("Failed to send message to remote node " +
"(node has left the grid): " + node.id());
}
+
+ client = null;
}
while (retry);
}
@@ -3187,12 +3204,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
* @param sesInfo Disconnected session information.
*/
private void processDisconnect(DisconnectedSessionInfo sesInfo) {
- GridCommunicationClient client = clients.get(sesInfo.nodeId);
-
- if (client instanceof GridTcpNioCommunicationClient &&
- ((GridTcpNioCommunicationClient) client).session() == sesInfo.ses)
- clients.remove(sesInfo.nodeId, client);
-
if (sesInfo.reconnect) {
GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
@@ -3205,7 +3216,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isDebugEnabled())
log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
- client = reserveClient(node);
+ GridCommunicationClient client = reserveClient(node);
client.release();
}
@@ -3756,29 +3767,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
*/
private static class DisconnectedSessionInfo {
/** */
- private final UUID nodeId;
-
- /** */
- private final GridNioSession ses;
-
- /** */
private final GridNioRecoveryDescriptor recoveryDesc;
/** */
private final boolean reconnect;
/**
- * @param nodeId Node ID.
- * @param ses Session.
* @param recoveryDesc Recovery descriptor.
* @param reconnect Reconnect flag.
*/
- public DisconnectedSessionInfo(UUID nodeId,
- GridNioSession ses,
- @Nullable GridNioRecoveryDescriptor recoveryDesc,
+ DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc,
boolean reconnect) {
- this.nodeId = nodeId;
- this.ses = ses;
this.recoveryDesc = recoveryDesc;
this.reconnect = reconnect;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
new file mode 100644
index 0000000..4a07674
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheConnectionRecoveryTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** */
+ private static final int SRVS = 5;
+
+ /** */
+ private static final int CLIENTS = 5;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ cfg.setCacheConfiguration(
+ cacheConfiguration("cache1", TRANSACTIONAL),
+ cacheConfiguration("cache2", ATOMIC));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(SRVS);
+
+ client = true;
+
+ startGridsMultiThreaded(SRVS, CLIENTS);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConnectionRecovery() throws Exception {
+ final Map<Integer, Integer> data = new TreeMap<>();
+
+ for (int i = 0; i < 500; i++)
+ data.put(i, i);
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ final long stopTime = U.currentTimeMillis() + 30_000;
+
+ final AtomicReference<CyclicBarrier> barrierRef = new AtomicReference<>();
+
+ final int TEST_THREADS = (CLIENTS + SRVS) * 2;
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int idx0 = idx.getAndIncrement();
+ Ignite node = ignite(idx0 % (SRVS + CLIENTS));
+
+ Thread.currentThread().setName("test-thread-" + idx0 + "-" + node.name());
+
+ IgniteCache cache1 = node.cache("cache1").withAsync();
+ IgniteCache cache2 = node.cache("cache2").withAsync();
+
+ int iter = 0;
+
+ while (U.currentTimeMillis() < stopTime) {
+ try {
+ cache1.putAll(data);
+ cache1.future().get(15, SECONDS);
+
+ cache2.putAll(data);
+ cache2.future().get(15, SECONDS);
+
+ CyclicBarrier b = barrierRef.get();
+
+ if (b != null)
+ b.await(15, SECONDS);
+ }
+ catch (Exception e) {
+ synchronized (IgniteCacheConnectionRecoveryTest.class) {
+ log.error("Failed to execute update, will dump debug information" +
+ " [err=" + e+ ", iter=" + iter + ']', e);
+
+ List<Ignite> nodes = IgnitionEx.allGridsx();
+
+ for (Ignite node0 : nodes)
+ ((IgniteKernal)node0).dumpDebugInfo();
+
+ U.dumpThreads(log);
+ }
+
+ throw e;
+ }
+ }
+
+ return null;
+ }
+ }, TEST_THREADS, "test-thread");
+
+ while (System.currentTimeMillis() < stopTime) {
+ boolean closed = false;
+
+ for (Ignite node : G.allGrids()) {
+ if (IgniteCacheMessageRecoveryAbstractTest.closeSessions(node))
+ closed = true;
+ }
+
+ if (closed) {
+ CyclicBarrier b = new CyclicBarrier(TEST_THREADS + 1, new Runnable() {
+ @Override public void run() {
+ barrierRef.set(null);
+ }
+ });
+
+ barrierRef.set(b);
+
+ b.await();
+ }
+
+ U.sleep(50);
+ }
+
+ fut.get();
+ }
+
+ /**
+ * @param name Cache name.
+ * @param atomicityMode Cache atomicity mode.
+ * @return Configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(name);
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(REPLICATED);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 16d7e5d..0460a8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -150,7 +150,11 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
for (int i = 0; i < 30; i++) {
Thread.sleep(1000);
- closed |= closeSessions();
+ Ignite node0 = ignite(ThreadLocalRandom.current().nextInt(0, GRID_CNT));
+
+ log.info("Close sessions for: " + ignite.name());
+
+ closed |= closeSessions(node0);
}
assertTrue(closed);
@@ -163,13 +167,11 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
}
/**
+ * @param ignite Node.
* @throws Exception If failed.
+ * @return {@code True} if closed at least one session.
*/
- private boolean closeSessions() throws Exception {
- Ignite ignite = ignite(ThreadLocalRandom.current().nextInt(0, GRID_CNT));
-
- log.info("Close sessions for: " + ignite.name());
-
+ static boolean closeSessions(Ignite ignite) throws Exception {
TcpCommunicationSpi commSpi = (TcpCommunicationSpi)ignite.configuration().getCommunicationSpi();
Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients");
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
deleted file mode 100644
index 618fe2a..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.IgnitionEx;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
-/**
- *
- */
-public class IgniteCacheMessageRecoveryIdleConnection extends GridCommonAbstractTest {
- /** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private static final int NODES = 3;
-
- /** */
- private static final long IDLE_TIMEOUT = 50;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
-
- TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
-
- commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT);
- commSpi.setSharedMemoryPort(-1);
-
- cfg.setCommunicationSpi(commSpi);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected long getTestTimeout() {
- return 2 * 60_000;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- startGridsMultiThreaded(NODES);
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- super.afterTestsStopped();
-
- stopAllGrids();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCacheOperationsIdleConnectionCloseTx() throws Exception {
- cacheOperationsIdleConnectionClose(TRANSACTIONAL);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception {
- cacheOperationsIdleConnectionClose(ATOMIC);
- }
-
- /**
- * @param atomicityMode Cache atomicity mode.
- * @throws Exception If failed.
- */
- private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception {
- CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
- ccfg.setAtomicityMode(atomicityMode);
- ccfg.setCacheMode(REPLICATED);
- ccfg.setWriteSynchronizationMode(FULL_SYNC);
-
- IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync();
-
- try {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- int iter = 0;
-
- long stopTime = System.currentTimeMillis() + 90_000;
-
- while (System.currentTimeMillis() < stopTime) {
- if (iter++ % 10 == 0)
- log.info("Iteration: " + iter);
-
- cache.put(iter, 1);
-
- IgniteFuture<?> fut = cache.future();
-
- try {
- fut.get(10_000);
- }
- catch (IgniteException e) {
- List<Ignite> nodes = IgnitionEx.allGridsx();
-
- for (Ignite node : nodes)
- ((IgniteKernal)node).dumpDebugInfo();
-
- U.dumpThreads(log);
-
- throw e;
- }
-
- U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10));
- }
- }
- finally {
- ignite(0).destroyCache(ccfg.getName());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
new file mode 100644
index 0000000..b9003cd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheMessageRecoveryIdleConnectionTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 3;
+
+ /** */
+ private static final long IDLE_TIMEOUT = 50;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT);
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 2 * 60_000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheOperationsIdleConnectionCloseTx() throws Exception {
+ cacheOperationsIdleConnectionClose(TRANSACTIONAL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception {
+ cacheOperationsIdleConnectionClose(ATOMIC);
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @throws Exception If failed.
+ */
+ private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setCacheMode(REPLICATED);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync();
+
+ try {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ int iter = 0;
+
+ long stopTime = System.currentTimeMillis() + 90_000;
+
+ while (System.currentTimeMillis() < stopTime) {
+ if (iter++ % 50 == 0)
+ log.info("Iteration: " + iter);
+
+ cache.put(iter, 1);
+
+ IgniteFuture<?> fut = cache.future();
+
+ try {
+ fut.get(10_000);
+ }
+ catch (IgniteException e) {
+ log.error("Failed to execute update, will dump debug information" +
+ " [err=" + e+ ", iter=" + iter + ']', e);
+
+ List<Ignite> nodes = IgnitionEx.allGridsx();
+
+ for (Ignite node : nodes)
+ ((IgniteKernal)node).dumpDebugInfo();
+
+ U.dumpThreads(log);
+
+ throw e;
+ }
+
+ U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10));
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d200004/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 8c3f4de..2c294ba 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -125,7 +125,8 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp
import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnection;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnectionTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageWriteTimeoutTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSystemTransactionsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxMessageRecoveryTest;
@@ -281,7 +282,8 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class);
- suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnection.class);
+ suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnectionTest.class);
+ suite.addTestSuite(IgniteCacheConnectionRecoveryTest.class);
GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionAtomicSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredAtomicSelfTest.class, ignoredTests);
[2/3] ignite git commit: Fixed isReadFromBackup behaviour for
transaction. This closes #955.
Posted by sb...@apache.org.
Fixed isReadFromBackup behaviour for transaction. This closes #955.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/476081b9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/476081b9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/476081b9
Branch: refs/heads/ignite-3547-1
Commit: 476081b9171b1b8e5ec0a1ffd7e87092da3601d6
Parents: 09a3922
Author: Tikhonov Nikolay <ti...@gmail.com>
Authored: Wed Aug 17 14:31:20 2016 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Wed Aug 17 14:31:20 2016 +0300
----------------------------------------------------------------------
.../near/GridNearTransactionalCache.java | 2 +-
.../cache/distributed/near/GridNearTxLocal.java | 4 +-
.../CacheTxNotAllowReadFromBackupTest.java | 297 +++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 2 +
4 files changed, 302 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/476081b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index e954a7f..cf5d2e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -192,7 +192,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
keys,
readThrough,
- /*force primary*/needVer,
+ /*force primary*/needVer || !ctx.config().isReadFromBackup(),
tx,
CU.subjectId(tx, ctx.shared()),
tx.resolveTaskName(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/476081b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index db736a5..62cf74b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -363,7 +363,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
return cacheCtx.colocated().loadAsync(
key,
readThrough,
- /*force primary*/needVer,
+ /*force primary*/needVer || !cacheCtx.config().isReadFromBackup(),
topVer,
CU.subjectId(this, cctx),
resolveTaskName(),
@@ -394,7 +394,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
return cacheCtx.colocated().loadAsync(
keys,
readThrough,
- /*force primary*/needVer,
+ /*force primary*/needVer || !cacheCtx.config().isReadFromBackup(),
topVer,
CU.subjectId(this, cctx),
resolveTaskName(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/476081b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxNotAllowReadFromBackupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxNotAllowReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxNotAllowReadFromBackupTest.java
new file mode 100644
index 0000000..30de2f9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxNotAllowReadFromBackupTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Test for query with BinaryMarshaller and different serialization modes and with reflective serializer.
+ */
+public class CacheTxNotAllowReadFromBackupTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 2;
+
+ /** */
+ private static final int KEYS = 1000;
+
+ /** */
+ private static final int BATCH_SIZE = 10;
+
+ /** */
+ private static final int ITERATION_CNT = 5;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBackupConsistencyReplicated() throws Exception {
+ CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>("test-cache");
+
+ cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+ cfg.setCacheMode(CacheMode.REPLICATED);
+ cfg.setReadFromBackup(false);
+
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBackupConsistencyReplicatedFullSync() throws Exception {
+ CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>("test-cache");
+
+ cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cfg.setCacheMode(CacheMode.REPLICATED);
+ cfg.setReadFromBackup(false);
+
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBackupConsistencyPartitioned() throws Exception {
+ CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>("test-cache");
+
+ cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+ cfg.setCacheMode(CacheMode.PARTITIONED);
+ cfg.setBackups(NODES - 1);
+ cfg.setReadFromBackup(false);
+
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testBackupConsistencyPartitionedFullSync() throws Exception {
+ CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>("test-cache");
+
+ cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ cfg.setCacheMode(CacheMode.PARTITIONED);
+ cfg.setBackups(NODES - 1);
+ cfg.setReadFromBackup(false);
+
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistency(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistency(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+ checkBackupConsistencyGetAll(cfg, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ private void checkBackupConsistency(CacheConfiguration<Integer, Integer> ccfg, TransactionConcurrency txConcurrency,
+ TransactionIsolation txIsolation) throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).getOrCreateCache(ccfg);
+
+ int nodeIdx = ThreadLocalRandom.current().nextInt(NODES);
+
+ try {
+ for (int i = 0; i < ITERATION_CNT; i++) {
+ log.info("Iteration: " + i);
+
+ // Put data in one transaction.
+ try (Transaction tx = grid(nodeIdx).transactions().txStart(txConcurrency, txIsolation)) {
+ for (int key = 0; key < KEYS; key++)
+ cache.put(key, key);
+
+ tx.commit();
+ }
+
+ int missCnt = 0;
+
+ // Try to load data from another transaction.
+ try (Transaction tx = grid(nodeIdx).transactions().txStart(txConcurrency, txIsolation)) {
+ for (int key = 0; key < KEYS; key++)
+ if (cache.get(key) == null)
+ ++missCnt;
+
+ tx.commit();
+ }
+
+ assertEquals("Failed. Found missing get()", 0, missCnt);
+ }
+ }
+ finally {
+ grid(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ private void checkBackupConsistencyGetAll(CacheConfiguration<Integer, Integer> ccfg,
+ TransactionConcurrency txConcurrency, TransactionIsolation txIsolation) throws Exception {
+ IgniteCache<Integer, Integer> cache = grid(0).getOrCreateCache(ccfg);
+
+ int nodeIdx = ThreadLocalRandom.current().nextInt(NODES);
+
+ try {
+ for (int i = 0; i < ITERATION_CNT; i++) {
+ log.info("Iteration: " + i);
+
+ List<Set<Integer>> batches = createBatches();
+
+ // Put data in one transaction.
+ try (Transaction tx = grid(nodeIdx).transactions().txStart(txConcurrency, txIsolation)) {
+ for (int key = 0; key < KEYS; key++)
+ cache.put(key, key);
+
+ tx.commit();
+ }
+
+ // Try to load data from another transaction.
+ try (Transaction tx = grid(nodeIdx).transactions().txStart(txConcurrency, txIsolation)) {
+ for (Set<Integer> batch : batches)
+ assertEquals("Failed. Found missing entries.", batch.size(), cache.getAll(batch).size());
+
+ tx.commit();
+ }
+ }
+ }
+ finally {
+ grid(0).destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @return Batches.
+ */
+ @NotNull private List<Set<Integer>> createBatches() {
+ List<Set<Integer>> batches = new ArrayList<>(KEYS / BATCH_SIZE + 1);
+
+ int size = BATCH_SIZE;
+ Set<Integer> batch = new HashSet<>();
+
+ for (int key = 0; key < KEYS; key++) {
+ batch.add(key);
+
+ if (--size == 0) {
+ size = BATCH_SIZE;
+ batch = new HashSet<>();
+ batches.add(batch);
+ }
+ }
+ return batches;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/476081b9/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 018fa17..60d59d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynam
import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartAtomicTest;
import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartTxTest;
import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTest;
+import org.apache.ignite.internal.processors.cache.CacheTxNotAllowReadFromBackupTest;
import org.apache.ignite.internal.processors.cache.CrossCacheLockTest;
import org.apache.ignite.internal.processors.cache.GridCacheMarshallingNodeJoinSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheMultinodeUpdateAtomicNearEnabledSelfTest;
@@ -263,6 +264,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CacheGetEntryPessimisticReadCommittedSeltTest.class);
suite.addTestSuite(CacheGetEntryPessimisticRepeatableReadSeltTest.class);
suite.addTestSuite(CacheGetEntryPessimisticSerializableSeltTest.class);
+ suite.addTestSuite(CacheTxNotAllowReadFromBackupTest.class);
suite.addTestSuite(CacheStopAndDestroySelfTest.class);