You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/22 16:44:58 UTC
[7/8] ignite git commit: ignite-5075
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6eed51a2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6eed51a2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6eed51a2
Branch: refs/heads/ignite-5075-pds
Commit: 6eed51a2e9937c8afadd44eac1a6b5437b603fdb
Parents: aafa8df
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 22 19:34:15 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 22 19:34:15 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 4 +-
.../cache/GridCacheClearAllRunnable.java | 2 +-
.../cache/IgniteCacheOffheapManager.java | 33 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 32 +-
.../GridDistributedCacheAdapter.java | 2 +-
.../cache/query/GridCacheQueryManager.java | 4 +-
.../continuous/CacheContinuousQueryManager.java | 2 +-
.../processors/cache/IgniteCacheGroupsTest.java | 498 ++++++++++++++++---
.../cache/IgniteCacheGroupsSqlTest.java | 199 +++++++-
9 files changed, 676 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 0cf49be..8bd072b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -704,7 +704,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteCacheOffheapManager offheapMgr = ctx.isNear() ? ctx.near().dht().context().offheap() : ctx.offheap();
- its.add(offheapMgr.<K, V>entriesIterator(ctx, modes.primary, modes.backup, topVer, ctx.keepBinary()));
+ its.add(offheapMgr.<K, V>cacheEntriesIterator(ctx, modes.primary, modes.backup, topVer, ctx.keepBinary()));
}
}
else if (modes.heap) {
@@ -2895,7 +2895,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
List<K> keys = new ArrayList<>(Math.min(REMOVE_ALL_KEYS_BATCH, size()));
do {
- for (Iterator<CacheDataRow> it = ctx.offheap().iteratorForCache(ctx.cacheId(), true, true, null);
+ for (Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(), true, true, null);
it.hasNext() && keys.size() < REMOVE_ALL_KEYS_BATCH; )
keys.add((K)it.next().key());
http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
index ca89650..d37cecb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
@@ -82,7 +82,7 @@ public class GridCacheClearAllRunnable<K, V> implements Runnable {
if (!ctx.isNear()) {
if (id == 0)
- ctx.offheap().clear(ctx, readers);
+ ctx.offheap().clearCache(ctx, readers);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index d344e20..55485bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -142,7 +142,8 @@ public interface IgniteCacheOffheapManager {
* @param c Closure.
* @throws IgniteCheckedException If failed.
*/
- public boolean expire(GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount) throws IgniteCheckedException;
+ public boolean expire(GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount)
+ throws IgniteCheckedException;
/**
* Gets the number of entries pending expire.
@@ -212,7 +213,7 @@ public interface IgniteCacheOffheapManager {
* @return Rows iterator.
* @throws IgniteCheckedException If failed.
*/
- public GridIterator<CacheDataRow> iteratorForCache(int cacheId,
+ public GridIterator<CacheDataRow> cacheIterator(int cacheId,
boolean primary,
boolean backup,
final AffinityTopologyVersion topVer)
@@ -224,17 +225,18 @@ public interface IgniteCacheOffheapManager {
* @return Partition data iterator.
* @throws IgniteCheckedException If failed.
*/
- public GridIterator<CacheDataRow> iteratorForCache(int cacheId, final int part) throws IgniteCheckedException;
+ public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, final int part) throws IgniteCheckedException;
/**
* @param part Partition number.
* @return Iterator for given partition.
- * @throws IgniteCheckedException
+ * @throws IgniteCheckedException If failed.
*/
public GridIterator<CacheDataRow> partitionIterator(final int part) throws IgniteCheckedException;
/**
* @param part Partition.
+ * @param topVer Topology version.
* @param partCntr Partition counter to get historical data if available.
* @return Partition data iterator.
* @throws IgniteCheckedException If failed.
@@ -243,6 +245,7 @@ public interface IgniteCacheOffheapManager {
throws IgniteCheckedException;
/**
+ * @param cctx Cache context.
* @param primary Primary entries flag.
* @param backup Backup entries flag.
* @param topVer Topology version.
@@ -250,7 +253,7 @@ public interface IgniteCacheOffheapManager {
* @return Entries iterator.
* @throws IgniteCheckedException If failed.
*/
- public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(
+ public <K, V> GridCloseableIterator<Cache.Entry<K, V>> cacheEntriesIterator(
GridCacheContext cctx,
final boolean primary,
final boolean backup,
@@ -258,13 +261,16 @@ public interface IgniteCacheOffheapManager {
final boolean keepBinary) throws IgniteCheckedException;
/**
+ * @param cacheId Cache ID.
* @param part Partition.
* @return Iterator.
* @throws IgniteCheckedException If failed.
*/
- public GridCloseableIterator<KeyCacheObject> keysIterator(final int part) throws IgniteCheckedException;
+ public GridCloseableIterator<KeyCacheObject> cacheKeysIterator(int cacheId, final int part)
+ throws IgniteCheckedException;
/**
+ * @param cacheId Cache ID.
* @param primary Primary entries flag.
* @param backup Backup entries flag.
* @param topVer Topology version.
@@ -280,7 +286,7 @@ public interface IgniteCacheOffheapManager {
* @param cctx Cache context.
* @param readers {@code True} to clear readers.
*/
- public void clear(GridCacheContext cctx, boolean readers);
+ public void clearCache(GridCacheContext cctx, boolean readers);
/**
* @param cacheId Cache ID.
@@ -313,7 +319,9 @@ public interface IgniteCacheOffheapManager {
public void dropRootPageForIndex(String idxName) throws IgniteCheckedException;
/**
+ * @param idxName Index name.
* @return Reuse list for index tree.
+ * @throws IgniteCheckedException If failed.
*/
public ReuseList reuseListForIndex(String idxName) throws IgniteCheckedException;
@@ -469,6 +477,17 @@ public interface IgniteCacheOffheapManager {
KeyCacheObject upper) throws IgniteCheckedException;
/**
+ * @param cacheId Cache ID.
+ * @param lower Lower bound.
+ * @param upper Upper bound.
+ * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+ * @return Data cursor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
+ KeyCacheObject upper, Object x) throws IgniteCheckedException;
+
+ /**
* Destroys the tree associated with the store.
*
* @throws IgniteCheckedException If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 8da7357..9631268 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -425,7 +425,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
* @param readers {@code True} to clear readers.
*/
@SuppressWarnings("unchecked")
- @Override public void clear(GridCacheContext cctx, boolean readers) {
+ @Override public void clearCache(GridCacheContext cctx, boolean readers) {
GridCacheVersion obsoleteVer = null;
GridIterator<CacheDataRow> it = iterator(cctx.cacheId(), cacheDataStores().iterator());
@@ -470,13 +470,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(
+ @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> cacheEntriesIterator(
final GridCacheContext cctx,
final boolean primary,
final boolean backup,
final AffinityTopologyVersion topVer,
final boolean keepBinary) throws IgniteCheckedException {
- final Iterator<CacheDataRow> it = iteratorForCache(cctx.cacheId(), primary, backup, topVer);
+ final Iterator<CacheDataRow> it = cacheIterator(cctx.cacheId(), primary, backup, topVer);
return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() {
/** */
@@ -517,13 +517,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridCloseableIterator<KeyCacheObject> keysIterator(final int part) throws IgniteCheckedException {
+ @Override public GridCloseableIterator<KeyCacheObject> cacheKeysIterator(int cacheId, final int part) throws IgniteCheckedException {
CacheDataStore data = partitionData(part);
if (data == null)
return new GridEmptyCloseableIterator<>();
- final GridCursor<? extends CacheDataRow> cur = data.cursor();
+ final GridCursor<? extends CacheDataRow> cur =
+ data.cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY);
return new GridCloseableIteratorAdapter<KeyCacheObject>() {
/** */
@@ -553,7 +554,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridIterator<CacheDataRow> iteratorForCache(
+ @Override public GridIterator<CacheDataRow> cacheIterator(
int cacheId,
boolean primary,
boolean backups,
@@ -563,7 +564,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public GridIterator<CacheDataRow> iteratorForCache(int cacheId, int part) throws IgniteCheckedException {
+ @Override public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, int part) throws IgniteCheckedException {
CacheDataStore data = partitionData(part);
if (data == null)
@@ -1300,6 +1301,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
KeyCacheObject upper) throws IgniteCheckedException {
+ return cursor(cacheId, lower, upper, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
+ KeyCacheObject upper, Object x) throws IgniteCheckedException {
SearchRow lowerRow;
SearchRow upperRow;
@@ -1314,7 +1321,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
upperRow = upper != null ? new SearchRow(UNDEFINED_CACHE_ID, upper) : null;
}
- return dataTree.find(lowerRow, upperRow);
+ return dataTree.find(lowerRow, upperRow, x);
}
/** {@inheritDoc} */
@@ -1352,12 +1359,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
Exception ex = null;
- SearchRow bound = new SearchRow(cacheId);
-
- GridCursor<? extends CacheDataRow> cursor = dataTree.find(bound, bound, CacheDataRowAdapter.RowData.KEY_ONLY);
+ GridCursor<? extends CacheDataRow> cur =
+ cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY);
- while (cursor.next()) {
- CacheDataRow row = cursor.get();
+ while (cur.next()) {
+ CacheDataRow row = cur.get();
assert row.link() != 0 : row;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 0955a51..daf4b91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -459,7 +459,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
return false;
try {
- GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().keysIterator(part);
+ GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().cacheKeysIterator(ctx.cacheId(), part);
if (iter != null) {
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 12a8126..f189bd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -863,12 +863,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
locPart = locPart0;
- it = cctx.offheap().iteratorForCache(cctx.cacheId(), part);
+ it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part);
}
else {
locPart = null;
- it = cctx.offheap().iteratorForCache(cctx.cacheId(), true, backups, topVer);
+ it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer);
}
return new PeekValueExpiryAwareIterator(it, plc, topVer, keyValFilter, qry.keepBinary(), locNode) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 03e1e1c..d83c033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -655,7 +655,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
if (notifyExisting) {
- final Iterator<CacheDataRow> it = cctx.offheap().iteratorForCache(cctx.cacheId(),
+ final Iterator<CacheDataRow> it = cctx.offheap().cacheIterator(cctx.cacheId(),
true,
true,
AffinityTopologyVersion.NONE);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index ee76c6e..1a133dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -35,15 +35,19 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import javax.cache.Cache;
import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheExistsException;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheInterceptorAdapter;
@@ -58,6 +62,7 @@ import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.binary.BinaryMarshaller;
@@ -66,6 +71,8 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -388,6 +395,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
/**
+ * @param cacheMode Cache mode.
+ * @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void scanQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
@@ -430,13 +439,17 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
}
else {
- cache1 = ignite(local ? 0 : 1).cache(CACHE1);
- cache2 = ignite(local ? 0 : 2).cache(CACHE2);
+ // Async put ops.
+ int ldrs = 4;
- for (int i = 0; i < keys ; i++) {
- cache1.put(i, data1[i]);
- cache2.put(i, data2[i]);
+ List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
+
+ for (int i = 0; i < ldrs ; i++) {
+ cls.add(putOperation(local ? 0 : 1, ldrs, i, CACHE1, data1));
+ cls.add(putOperation(local ? 0 : 2, ldrs, i, CACHE2, data2));
}
+
+ GridTestUtils.runMultiThreaded(cls, "loaders");
}
ScanQuery<Integer, Integer> qry = new ScanQuery<>();
@@ -463,6 +476,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
/**
+ * @param cacheMode Cache mode.
+ * @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void scanQueryMultiplePartitions(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
@@ -503,13 +518,17 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
}
else {
- cache1 = ignite(1).cache(CACHE1);
- cache2 = ignite(2).cache(CACHE2);
+ // Async put ops.
+ int ldrs = 4;
+
+ List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
- for (int i = 0; i < keys ; i++) {
- cache1.put(i, data1[i]);
- cache2.put(i, data2[i]);
+ for (int i = 0; i < ldrs ; i++) {
+ cls.add(putOperation(1, ldrs, i, CACHE1, data1));
+ cls.add(putOperation(2, ldrs, i, CACHE2, data2));
}
+
+ GridTestUtils.runMultiThreaded(cls, "loaders");
}
@@ -559,6 +578,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
/**
+ * @param cacheMode Cache mode.
+ * @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void cacheIterator(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
@@ -598,13 +619,17 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
}
else {
- IgniteCache cache1 = ignite(local ? 0 : 1).cache(CACHE1);
- IgniteCache cache2 = ignite(local ? 0 : 2).cache(CACHE2);
+ // Async put ops.
+ int ldrs = 4;
- for (int i = 0; i < keys ; i++) {
- cache1.put(i, data1[i]);
- cache2.put(i, data2[i]);
+ List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
+
+ for (int i = 0; i < ldrs ; i++) {
+ cls.add(putOperation(local ? 0 : 1, ldrs, i, CACHE1, data1));
+ cls.add(putOperation(local ? 0 : 2, ldrs, i, CACHE2, data2));
}
+
+ GridTestUtils.runMultiThreaded(cls, "loaders");
}
@@ -678,13 +703,17 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
}
else {
- IgniteCache cache1 = ignite(local ? 0 : 1).cache(CACHE1);
- IgniteCache cache2 = ignite(local ? 0 : 2).cache(CACHE2);
+ // async put ops
+ int ldrs = 4;
+
+ List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
- for (int i = 0; i < keys ; i++) {
- cache1.put(i, data1[i]);
- cache2.put(i, data2[i]);
+ for (int i = 0; i < ldrs ; i++) {
+ cls.add(putOperation(local ? 0 : 1, ldrs, i, CACHE1, data1));
+ cls.add(putOperation(local ? 0 : 2, ldrs, i, CACHE2, data2));
}
+
+ GridTestUtils.runMultiThreaded(cls, "loaders");
}
checkData(local ? 0 : 3, CACHE1, data1);
@@ -702,6 +731,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
/**
+ * @param cacheMode Cache mode.
+ * @param atomicityMode Cache atomicity mode.
* @throws Exception If failed.
*/
private void createDestroyCaches(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
@@ -735,13 +766,16 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
}
else {
- IgniteCache cache1 = ignite(1).cache(CACHE1);
- IgniteCache cache2 = ignite(2).cache(CACHE2);
+ int ldrs = 4;
+
+ List<Callable<?>> cls = new ArrayList<>(ldrs * 2);
- for (int i = 0; i < keys ; i++) {
- cache1.put(i, data1[i]);
- cache2.put(i, data2[i]);
+ for (int i = 0; i < ldrs ; i++) {
+ cls.add(putOperation(1, ldrs, i, CACHE1, data1));
+ cls.add(putOperation(2, ldrs, i, CACHE2, data2));
}
+
+ GridTestUtils.runMultiThreaded(cls, "loaders");
}
checkLocalData(3, CACHE1, data1);
@@ -765,6 +799,34 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
/**
+ * @param idx Node index.
+ * @param ldrs Loaders count.
+ * @param ldrIdx Loader index.
+ * @param cacheName Cache name.
+ * @param data Data.
+ * @return Callable for put operation.
+ */
+ private Callable<Void> putOperation(
+ final int idx,
+ final int ldrs,
+ final int ldrIdx,
+ final String cacheName,
+ final Integer[] data) {
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteCache cache = ignite(idx).cache(cacheName);
+
+ for (int j = 0, size = data.length; j < size ; j++) {
+ if (j % ldrs == ldrIdx) {
+ cache.put(j, data[j]);
+ }
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
* Creates an array of random integers.
*
* @param cnt Array length.
@@ -782,6 +844,23 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
}
/**
+ * Creates a map with random integers.
+ *
+ * @param cnt Map size length.
+ * @return Map with random integers.
+ */
+ private Map<Integer, Integer> generateDataMap(int cnt) {
+ Random rnd = ThreadLocalRandom.current();
+
+ Map<Integer, Integer> data = U.newHashMap(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ data.put(i, rnd.nextInt());
+
+ return data;
+ }
+
+ /**
* @param cnt Sequence length.
* @return Sequence of integers.
*/
@@ -1491,12 +1570,38 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
int[] backups = cacheMode == REPLICATED ? new int[]{Integer.MAX_VALUE} : new int[]{0, 1, 2, 3};
for (int backups0 : backups)
- cacheApiTest(cacheMode, atomicityMode, backups0, false);
+ cacheApiTest(cacheMode, atomicityMode, backups0, false, false, false);
int backups0 = cacheMode == REPLICATED ? Integer.MAX_VALUE :
backups[ThreadLocalRandom.current().nextInt(backups.length)];
- cacheApiTest(cacheMode, atomicityMode, backups0, true);
+ cacheApiTest(cacheMode, atomicityMode, backups0, true, false, false);
+
+ if (cacheMode == PARTITIONED) {
+ // Here the f variable is used as a bit set where 2 last bits
+ // determine whether a near cache is used on server/client side.
+ // The case without near cache is already tested at this point.
+ for (int f : new int[]{1, 2, 3}) {
+ cacheApiTest(cacheMode, atomicityMode, backups0, false, nearSrv(f), nearClient(f));
+ cacheApiTest(cacheMode, atomicityMode, backups0, true, nearSrv(f), nearClient(f));
+ }
+ }
+ }
+
+ /**
+ * @param flag Flag.
+ * @return {@code True} if near cache should be used on a client side.
+ */
+ private boolean nearClient(int flag) {
+ return (flag & 0b01) == 0b01;
+ }
+
+ /**
+ * @param flag Flag.
+ * @return {@code True} if near cache should be used on a server side.
+ */
+ private boolean nearSrv(int flag) {
+ return (flag & 0b10) == 0b10;
}
/**
@@ -1504,65 +1609,338 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
* @param atomicityMode Atomicity mode.
* @param backups Number of backups.
* @param heapCache On heap cache flag.
+ * @param nearSrv {@code True} if near cache should be used on a server side.
+ * @param nearClient {@code True} if near cache should be used on a client side.
+ * @throws Exception If failed.
*/
- private void cacheApiTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode, int backups, boolean heapCache) {
- for (int i = 0; i < 2; i++)
- ignite(0).createCache(cacheConfiguration(GROUP1, "cache-" + i, cacheMode, atomicityMode, backups, heapCache));
+ private void cacheApiTest(CacheMode cacheMode,
+ CacheAtomicityMode atomicityMode,
+ int backups,
+ boolean heapCache,
+ boolean nearSrv,
+ boolean nearClient) throws Exception {
+ Ignite srv0 = ignite(0);
+
+ NearCacheConfiguration nearCfg = nearSrv ? new NearCacheConfiguration() : null;
+
+ srv0.createCache(cacheConfiguration(GROUP1, "cache-0", cacheMode, atomicityMode, backups, heapCache)
+ .setNearConfiguration(nearCfg));
+
+ srv0.createCache(cacheConfiguration(GROUP1, "cache-1", cacheMode, atomicityMode, backups, heapCache));
+
+ srv0.createCache(cacheConfiguration(GROUP2, "cache-2", cacheMode, atomicityMode, backups, heapCache)
+ .setNearConfiguration(nearCfg));
+
+ srv0.createCache(cacheConfiguration(null, "cache-3", cacheMode, atomicityMode, backups, heapCache));
+
+ if (nearClient) {
+ Ignite clientNode = ignite(4);
+
+ clientNode.createNearCache("cache-0", new NearCacheConfiguration());
+ clientNode.createNearCache("cache-2", new NearCacheConfiguration());
+ }
try {
- for (Ignite node : Ignition.allGrids()) {
- for (int i = 0; i < 2; i++) {
- IgniteCache cache = node.cache("cache-" + i);
-
- log.info("Test cache [node=" + node.name() +
- ", cache=" + cache.getName() +
- ", mode=" + cacheMode +
- ", atomicity=" + atomicityMode +
- ", backups=" + backups +
- ", heapCache=" + heapCache +
- ']');
-
- cacheApiTest(cache);
- }
+ for (final Ignite node : Ignition.allGrids()) {
+ List<Callable<?>> ops = new ArrayList<>();
+
+ for (int i = 0; i < 4; i++)
+ ops.add(testSet(node.cache("cache-" + i), cacheMode, atomicityMode, backups, heapCache, node));
+
+ // Async operations.
+ GridTestUtils.runMultiThreaded(ops, "cacheApiTest");
}
}
finally {
- for (int i = 0; i < 2; i++)
- ignite(0).destroyCache("cache-" + i);
+ for (int i = 0; i < 4; i++)
+ srv0.destroyCache("cache-" + i);
}
}
/**
* @param cache Cache.
+ * @param cacheMode Cache mode.
+ * @param atomicityMode Atomicity mode.
+ * @param backups Number of backups.
+ * @param heapCache On heap cache flag.
+ * @param node Ignite node.
+ * @return Callable for the test operations.
+ */
+ private Callable<?> testSet(
+ final IgniteCache<Object, Object> cache,
+ final CacheMode cacheMode,
+ final CacheAtomicityMode atomicityMode,
+ final int backups,
+ final boolean heapCache,
+ final Ignite node) {
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Test cache [node=" + node.name() +
+ ", cache=" + cache.getName() +
+ ", mode=" + cacheMode +
+ ", atomicity=" + atomicityMode +
+ ", backups=" + backups +
+ ", heapCache=" + heapCache +
+ ']');
+
+ cacheApiTest(cache);
+
+ return null;
+ }
+ };
+ }
+
+ /**
+ * @param cache Cache.
+ * @throws Exception If failed.
*/
- private void cacheApiTest(IgniteCache cache) {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ private void cacheApiTest(IgniteCache cache) throws Exception {
+ cachePutAllGetAll(cache);
- for (int i = 0; i < 10; i++) {
- Integer key = rnd.nextInt(10_000);
+ cachePutRemove(cache);
- assertNull(cache.get(key));
- assertFalse(cache.containsKey(key));
+ cachePutGet(cache);
- Integer val = key + 1;
+ cachePutGetAndPut(cache);
- cache.put(key, val);
+ cacheQuery(cache);
- assertEquals(val, cache.get(key));
- assertTrue(cache.containsKey(key));
+ cacheInvokeAll(cache);
- cache.remove(key);
+ cacheInvoke(cache);
- assertNull(cache.get(key));
- assertFalse(cache.containsKey(key));
- }
+ cacheDataStreamer(cache);
+ }
+ /**
+ * @param cache Cache.
+ */
+ private void tearDown(IgniteCache cache) {
cache.clear();
cache.removeAll();
}
/**
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void cacheDataStreamer(final IgniteCache cache) throws Exception {
+ final int keys = 1000;
+ final int loaders = 4;
+
+ final Integer[] data = generateData(keys * loaders);
+
+ // Stream through a client node.
+ Ignite clientNode = ignite(4);
+
+ List<Callable<?>> cls = new ArrayList<>(loaders);
+
+ for (final int i : sequence(loaders)) {
+ final IgniteDataStreamer ldr = clientNode.dataStreamer(cache.getName());
+
+ ldr.autoFlushFrequency(0);
+
+ cls.add(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ List<IgniteFuture> futs = new ArrayList<>(keys);
+
+ for (int j = 0, size = keys * loaders; j < size; j++) {
+ if (j % loaders == i)
+ futs.add(ldr.addData(j, data[j]));
+
+ if(j % (100 * loaders) == 0)
+ ldr.flush();
+ }
+
+ ldr.flush();
+
+ for (IgniteFuture fut : futs)
+ fut.get();
+
+ return null;
+ }
+ });
+ }
+
+ GridTestUtils.runMultiThreaded(cls, "loaders");
+
+ Set<Integer> keysSet = sequence(data.length);
+
+ for (Cache.Entry<Integer, Integer> entry : (IgniteCache<Integer, Integer>)cache) {
+ assertTrue(keysSet.remove(entry.getKey()));
+ assertEquals(data[entry.getKey()], entry.getValue());
+ }
+
+ assertTrue(keysSet.isEmpty());
+
+ tearDown(cache);
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void cachePutAllGetAll(IgniteCache cache) {
+ Map<Integer, Integer> data = generateDataMap(1000);
+
+ cache.putAll(data);
+
+ Map data0 = cache.getAll(data.keySet());
+
+ assertEquals(data.size(), data0.size());
+
+ for (Map.Entry<Integer, Integer> entry : data.entrySet()) {
+ assertEquals(entry.getValue(), data0.get(entry.getKey()));
+ }
+
+ tearDown(cache);
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void cachePutRemove(IgniteCache cache) {
+ Random rnd = ThreadLocalRandom.current();
+
+ Integer key = rnd.nextInt();
+ Integer val = rnd.nextInt();
+
+ cache.put(key, val);
+
+ assertTrue(cache.remove(key));
+
+ assertNull(cache.get(key));
+
+ tearDown(cache);
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void cachePutGet(IgniteCache cache) {
+ Random rnd = ThreadLocalRandom.current();
+
+ Integer key = rnd.nextInt();
+ Integer val = rnd.nextInt();
+
+ cache.put(key, val);
+
+ Object val0 = cache.get(key);
+
+ assertEquals(val, val0);
+
+ tearDown(cache);
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void cachePutGetAndPut(IgniteCache cache) {
+ Random rnd = ThreadLocalRandom.current();
+
+ Integer key = rnd.nextInt();
+ Integer val1 = rnd.nextInt();
+ Integer val2 = rnd.nextInt();
+
+ cache.put(key, val1);
+
+ Object val0 = cache.getAndPut(key, val2);
+
+ assertEquals(val1, val0);
+
+ val0 = cache.get(key);
+
+ assertEquals(val2, val0);
+
+ tearDown(cache);
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void cacheQuery(IgniteCache cache) {
+ Map<Integer, Integer> data = generateDataMap(1000);
+
+ cache.putAll(data);
+
+ ScanQuery<Integer, Integer> qry = new ScanQuery<>(new IgniteBiPredicate<Integer, Integer>() {
+ @Override public boolean apply(Integer key, Integer val) {
+ return key % 2 == 0;
+ }
+ });
+
+ List<Cache.Entry<Integer, Integer>> all = cache.query(qry).getAll();
+
+ assertEquals(all.size(), data.size() / 2);
+
+ for (Cache.Entry<Integer, Integer> entry : all) {
+ assertEquals(0, entry.getKey() % 2);
+ assertEquals(entry.getValue(), data.get(entry.getKey()));
+ }
+
+ tearDown(cache);
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void cacheInvokeAll(IgniteCache cache) {
+ int keys = 1000;
+ Map<Integer, Integer> data = generateDataMap(keys);
+
+ cache.putAll(data);
+
+ Random rnd = ThreadLocalRandom.current();
+
+ int one = rnd.nextInt();
+ int two = rnd.nextInt();
+
+ Map<Integer, CacheInvokeResult<Integer>> res = cache.invokeAll(data.keySet(), new CacheEntryProcessor<Integer, Integer, Integer>() {
+ @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException {
+ Object expected = ((Map)arguments[0]).get(entry.getKey());
+
+ assertEquals(expected, entry.getValue());
+
+ // Some calculation.
+ return (Integer)arguments[1] + (Integer)arguments[2];
+ }
+ }, data, one, two);
+
+ assertEquals(keys, res.size());
+ assertEquals(one + two, (Object)res.get(0).get());
+
+ tearDown(cache);
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void cacheInvoke(IgniteCache cache) {
+ Random rnd = ThreadLocalRandom.current();
+
+ Integer key = rnd.nextInt();
+ Integer val = rnd.nextInt();
+
+ cache.put(key, val);
+
+ int one = rnd.nextInt();
+ int two = rnd.nextInt();
+
+ Object res = cache.invoke(key, new CacheEntryProcessor<Integer, Integer, Integer>() {
+ @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException {
+ assertEquals(arguments[0], entry.getValue());
+
+ // Some calculation.
+ return (Integer)arguments[1] + (Integer)arguments[2];
+ }
+ }, val, one, two);
+
+ assertEquals(one + two, res);
+
+ tearDown(cache);
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testConcurrentOperationsSameKeys() throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/6eed51a2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java
index 39456c3..b538449 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java
@@ -18,19 +18,33 @@
package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.Callable;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.binary.AffinityKey;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
@@ -61,17 +75,17 @@ public class IgniteCacheGroupsSqlTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
startGridsMultiThreaded(3);
}
/** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
+ @Override protected void afterTest() throws Exception {
stopAllGrids();
- super.afterTestsStopped();
+ super.afterTest();
}
/**
@@ -80,8 +94,8 @@ public class IgniteCacheGroupsSqlTest extends GridCommonAbstractTest {
public void testSqlQuery() throws Exception {
Ignite node = ignite(0);
- IgniteCache c1 = node.createCache(cacheConfiguration(GROUP1, "c1"));
- IgniteCache c2 = node.createCache(cacheConfiguration(GROUP1, "c2"));
+ IgniteCache c1 = node.createCache(personCacheConfiguration(GROUP1, "c1"));
+ IgniteCache c2 = node.createCache(personCacheConfiguration(GROUP1, "c2"));
SqlFieldsQuery qry = new SqlFieldsQuery("select name from Person where name=?");
qry.setArgs("p1");
@@ -101,23 +115,157 @@ public class IgniteCacheGroupsSqlTest extends GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testJoinQuery1() throws Exception {
+ joinQuery(GROUP1, GROUP2, REPLICATED, PARTITIONED, TRANSACTIONAL, TRANSACTIONAL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinQuery2() throws Exception {
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ joinQuery(GROUP1, GROUP1, REPLICATED, PARTITIONED, TRANSACTIONAL, TRANSACTIONAL);
+ return null;
+ }
+ }, IgniteCheckedException.class, "Cache mode mismatch for caches related to the same group");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinQuery3() throws Exception {
+ joinQuery(GROUP1, GROUP1, PARTITIONED, PARTITIONED, TRANSACTIONAL, ATOMIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinQuery4() throws Exception {
+ joinQuery(GROUP1, GROUP1, REPLICATED, REPLICATED, ATOMIC, TRANSACTIONAL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinQuery5() throws Exception {
+ joinQuery(GROUP1, null, REPLICATED, PARTITIONED, TRANSACTIONAL, TRANSACTIONAL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinQuery6() throws Exception {
+ joinQuery(GROUP1, null, PARTITIONED, PARTITIONED, TRANSACTIONAL, ATOMIC);
+ }
+
+ /**
+ * @param grp1 First cache group.
+ * @param grp2 Second cache group.
+ * @param cm1 First cache mode.
+ * @param cm2 Second cache mode.
+ * @param cam1 First cache atomicity mode.
+ * @param cam2 Second cache atomicity mode.
+ * @throws Exception If failed.
+ */
+ private void joinQuery(String grp1, String grp2, CacheMode cm1,
+ CacheMode cm2, CacheAtomicityMode cam1, CacheAtomicityMode cam2) throws Exception {
+ int keys = 1000;
+ int accsPerPerson = 4;
+
+ Ignite srv0 = ignite(0);
+
+ IgniteCache pers = srv0.createCache(personCacheConfiguration(grp1, "pers")
+ .setAffinity(new RendezvousAffinityFunction().setPartitions(10))
+ .setCacheMode(cm1)
+ .setAtomicityMode(cam1));
+
+ IgniteCache acc = srv0.createCache(accountCacheConfiguration(grp2, "acc")
+ .setAffinity(new RendezvousAffinityFunction().setPartitions(10))
+ .setCacheMode(cm2)
+ .setAtomicityMode(cam2));
+
+ try(Transaction tx = cam1 == TRANSACTIONAL || cam2 == TRANSACTIONAL ? srv0.transactions().txStart() : null) {
+ for (int i = 0; i < keys; i++) {
+
+ int pKey = i - (i % accsPerPerson);
+
+ if (i % accsPerPerson == 0)
+ pers.put(pKey, new Person("pers-" + pKey));
+
+
+ acc.put(new AffinityKey(i, pKey), new Account(pKey, "acc-" + i));
+ }
+
+ if (tx != null)
+ tx.commit();
+ }
+
+ Ignite node = ignite(2);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(
+ "select p._key as p_key, p.name, a._key as a_key, a.personId, a.attr \n" +
+ "from \"pers\".Person p inner join \"acc\".Account a \n" +
+ "on (p._key = a.personId)");
+
+ IgniteCache<Object, Object> cache = node.cache("acc");
+
+ List<List<?>> res = cache.query(qry).getAll();
+
+ assertEquals(keys, res.size());
+
+ for (List<?> row : res)
+ assertEquals(row.get(0), row.get(3));
+ }
+
+ /**
+ * @param grpName Group name.
+ * @param cacheName Cache name.
+ * @return Person cache configuration.
+ */
+ private CacheConfiguration personCacheConfiguration(String grpName, String cacheName) {
+ QueryEntity entity = new QueryEntity();
+
+ entity.setKeyType(Integer.class.getName());
+ entity.setValueType(Person.class.getName());
+ entity.addQueryField("name", String.class.getName(), null);
+
+ return cacheConfiguration(grpName, cacheName, entity);
+ }
+
+ /**
+ * @param grpName Group name.
+ * @param cacheName Cache name.
+ * @return Account cache configuration.
+ */
+ private CacheConfiguration accountCacheConfiguration(String grpName, String cacheName) {
+ QueryEntity entity = new QueryEntity();
+
+ entity.setKeyType(AffinityKey.class.getName());
+ entity.setValueType(Account.class.getName());
+ entity.addQueryField("personId", Integer.class.getName(), null);
+ entity.addQueryField("attr", String.class.getName(), null);
+ entity.setIndexes(F.asList(new QueryIndex("personId")));
+
+ return cacheConfiguration(grpName, cacheName, entity);
+ }
+
+ /**
* @param grpName Group name.
* @param cacheName Cache name.
+ * @param queryEntity Query entity.
* @return Cache configuration.
*/
- private CacheConfiguration cacheConfiguration(String grpName, String cacheName) {
+ private CacheConfiguration cacheConfiguration(String grpName, String cacheName, QueryEntity queryEntity) {
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setGroupName(grpName);
ccfg.setName(cacheName);
- QueryEntity entity = new QueryEntity();
- entity.setKeyType(Integer.class.getName());
- entity.setValueType(Person.class.getName());
- entity.addQueryField("name", String.class.getName(), null);
-
- ccfg.setQueryEntities(F.asList(entity));
+ ccfg.setQueryEntities(F.asList(queryEntity));
return ccfg;
}
@@ -141,4 +289,29 @@ public class IgniteCacheGroupsSqlTest extends GridCommonAbstractTest {
return S.toString(Person.class, this);
}
}
+
+ /**
+ *
+ */
+ private static class Account implements Serializable {
+ /** */
+ Integer personId;
+
+ /** */
+ String attr;
+
+ /**
+ * @param personId Person ID.
+ * @param attr Attribute (some data).
+ */
+ public Account(Integer personId, String attr) {
+ this.personId = personId;
+ this.attr = attr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Account.class, this);
+ }
+ }
}