You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/03 14:50:37 UTC
[2/2] ignite git commit: ignite-2042 Added special queue/set key
classes to make collocation work with BinaryMarshaller. Also fixed issue with
'invoke' result with binary marshaller.
ignite-2042 Added special queue/set key classes to make collocation work with BinaryMarshaller. Also fixed issue with 'invoke' result with binary marshaller.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/50f6c013
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/50f6c013
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/50f6c013
Branch: refs/heads/ignite-1.5
Commit: 50f6c0131fd761f6231e7c2632a010c093000e70
Parents: 86ec37e
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 3 16:50:00 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 3 16:50:00 2015 +0300
----------------------------------------------------------------------
.../internal/portable/BinaryReaderExImpl.java | 2 +-
.../internal/portable/PortableContext.java | 34 +-
.../processors/cache/GridCacheContext.java | 30 ++
.../CacheDataStructuresManager.java | 31 +-
.../dht/atomic/GridDhtAtomicCache.java | 20 +-
.../CacheObjectBinaryProcessorImpl.java | 8 +
.../cache/query/GridCacheQueryManager.java | 12 +-
.../transactions/IgniteTxLocalAdapter.java | 11 +-
.../datastructures/CollocatedQueueItemKey.java | 75 ++++
.../datastructures/CollocatedSetItemKey.java | 87 +++++
.../datastructures/DataStructuresProcessor.java | 7 +-
.../GridAtomicCacheQueueImpl.java | 8 +-
.../datastructures/GridCacheQueueAdapter.java | 30 +-
.../datastructures/GridCacheQueueItemKey.java | 9 +-
.../datastructures/GridCacheSetImpl.java | 37 +-
.../datastructures/GridCacheSetItemKey.java | 21 +-
.../GridTransactionalCacheQueueImpl.java | 2 +-
.../processors/datastructures/QueueItemKey.java | 27 ++
.../processors/datastructures/SetItemKey.java | 36 ++
.../cache/IgniteCacheInvokeAbstractTest.java | 369 ++++++++++++++-----
...eAbstractDataStructuresFailoverSelfTest.java | 7 +-
.../GridCacheQueueApiSelfAbstractTest.java | 18 +-
.../GridCacheSetFailoverAbstractSelfTest.java | 6 +-
.../GridCachePartitionedQueueApiSelfTest.java | 5 +
...dCachePartitionedQueueEntryMoveSelfTest.java | 2 +-
.../IgnitePartitionedQueueNoBackupsTest.java | 92 +++++
.../GridCacheReplicatedQueueApiSelfTest.java | 5 +
.../GridCacheWriteBehindStoreAbstractTest.java | 2 +-
.../IgniteCacheDataStructuresSelfTestSuite.java | 3 +
29 files changed, 778 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
index ddbf6ba..91b67f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
@@ -246,7 +246,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
dataStart = start + DFLT_HDR_LEN;
}
- idMapper = userType ? ctx.userTypeIdMapper(typeId) : null;
+ idMapper = userType ? ctx.userTypeIdMapper(typeId) : BinaryInternalIdMapper.defaultInstance();
schema = PortableUtils.hasSchema(flags) ? getOrCreateSchema() : null;
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index 1482df9..fd6c41d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -66,6 +66,8 @@ import org.apache.ignite.binary.BinarySerializer;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.portable.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.datastructures.CollocatedQueueItemKey;
+import org.apache.ignite.internal.processors.datastructures.CollocatedSetItemKey;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.GridMapEntry;
import org.apache.ignite.internal.util.typedef.F;
@@ -233,7 +235,8 @@ public class PortableContext implements Externalizable {
/**
* @param marsh Portable marshaller.
- * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+ * @param cfg Configuration.
+ * @throws BinaryObjectException In case of error.
*/
public void configure(BinaryMarshaller marsh, IgniteConfiguration cfg) throws BinaryObjectException {
if (marsh == null)
@@ -265,7 +268,7 @@ public class PortableContext implements Externalizable {
* @param globalIdMapper ID mapper.
* @param globalSerializer Serializer.
* @param typeCfgs Type configurations.
- * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+ * @throws BinaryObjectException In case of error.
*/
private void configure(
BinaryIdMapper globalIdMapper,
@@ -313,9 +316,8 @@ public class PortableContext implements Externalizable {
}
}
- for (TypeDescriptor desc : descs.descriptors()) {
+ for (TypeDescriptor desc : descs.descriptors())
registerUserType(desc.clsName, desc.idMapper, desc.serializer, desc.affKeyFieldName, desc.isEnum);
- }
BinaryInternalIdMapper dfltMapper = BinaryInternalIdMapper.create(globalIdMapper);
@@ -327,6 +329,20 @@ public class PortableContext implements Externalizable {
affKeyFieldNames.putIfAbsent(typeId, entry.getValue());
}
+
+ addSystemClassAffinityKey(CollocatedSetItemKey.class);
+ addSystemClassAffinityKey(CollocatedQueueItemKey.class);
+ }
+
+ /**
+ * @param cls Class.
+ */
+ private void addSystemClassAffinityKey(Class<?> cls) {
+ String fieldName = affinityFieldName(cls);
+
+ assert fieldName != null : cls;
+
+ affKeyFieldNames.putIfAbsent(cls.getName().hashCode(), affinityFieldName(cls));
}
/**
@@ -400,7 +416,7 @@ public class PortableContext implements Externalizable {
/**
* @param cls Class.
* @return Class descriptor.
- * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+ * @throws BinaryObjectException In case of error.
*/
public PortableClassDescriptor descriptorForClass(Class<?> cls, boolean deserialize)
throws BinaryObjectException {
@@ -722,7 +738,7 @@ public class PortableContext implements Externalizable {
* @param serializer Serializer.
* @param affKeyFieldName Affinity key field name.
* @param isEnum If enum.
- * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+ * @throws BinaryObjectException In case of error.
*/
@SuppressWarnings("ErrorNotRethrown")
public void registerUserType(String clsName,
@@ -808,7 +824,7 @@ public class PortableContext implements Externalizable {
/**
* @param typeId Type ID.
* @return Meta data.
- * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+ * @throws BinaryObjectException In case of error.
*/
@Nullable public BinaryType metadata(int typeId) throws BinaryObjectException {
return metaHnd != null ? metaHnd.metadata(typeId) : null;
@@ -964,7 +980,7 @@ public class PortableContext implements Externalizable {
* @param affKeyFieldName Affinity key field name.
* @param isEnum Enum flag.
* @param canOverride Whether this descriptor can be override.
- * @throws org.apache.ignite.binary.BinaryObjectException If failed.
+ * @throws BinaryObjectException If failed.
*/
private void add(String clsName,
BinaryIdMapper idMapper,
@@ -1044,7 +1060,7 @@ public class PortableContext implements Externalizable {
* Override portable class descriptor.
*
* @param other Other descriptor.
- * @throws org.apache.ignite.binary.BinaryObjectException If failed.
+ * @throws BinaryObjectException If failed.
*/
private void override(TypeDescriptor other) throws BinaryObjectException {
assert clsName.equals(other.clsName);
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 5b4f22c..d689ba6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -36,6 +36,7 @@ import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.expiry.EternalExpiryPolicy;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheInterceptor;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManager;
+import org.apache.ignite.internal.portable.BinaryMarshaller;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -1682,6 +1684,13 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @return {@code True} if {@link BinaryMarshaller is configured}.
+ */
+ public boolean binaryMarshaller() {
+ return marshaller() instanceof BinaryMarshaller;
+ }
+
+ /**
* @return Keep portable flag.
*/
public boolean keepPortable() {
@@ -1752,6 +1761,27 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @param resMap Invoke results map.
+ * @param keepBinary Keep binary flag.
+ * @return Unwrapped results.
+ */
+ public Map unwrapInvokeResult(@Nullable Map<Object, EntryProcessorResult> resMap, final boolean keepBinary) {
+ return F.viewReadOnly(resMap, new C1<EntryProcessorResult, EntryProcessorResult>() {
+ @Override public EntryProcessorResult apply(EntryProcessorResult res) {
+ if (res instanceof CacheInvokeResult) {
+ CacheInvokeResult invokeRes = (CacheInvokeResult)res;
+
+ if (invokeRes.result() != null)
+ res = CacheInvokeResult.fromResult(unwrapPortableIfNeeded(invokeRes.result(),
+ keepBinary, false));
+ }
+
+ return res;
+ }
+ });
+ }
+
+ /**
* @return Cache object context.
*/
public CacheObjectContext cacheObjectContext() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index ec787f8..6ec29b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -54,9 +54,9 @@ import org.apache.ignite.internal.processors.datastructures.GridCacheQueueProxy;
import org.apache.ignite.internal.processors.datastructures.GridCacheSetHeader;
import org.apache.ignite.internal.processors.datastructures.GridCacheSetHeaderKey;
import org.apache.ignite.internal.processors.datastructures.GridCacheSetImpl;
-import org.apache.ignite.internal.processors.datastructures.GridCacheSetItemKey;
import org.apache.ignite.internal.processors.datastructures.GridCacheSetProxy;
import org.apache.ignite.internal.processors.datastructures.GridTransactionalCacheQueueImpl;
+import org.apache.ignite.internal.processors.datastructures.SetItemKey;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -79,7 +79,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
private final ConcurrentMap<IgniteUuid, GridCacheSetProxy> setsMap;
/** Set keys used for set iteration. */
- private ConcurrentMap<IgniteUuid, GridConcurrentHashSet<GridCacheSetItemKey>> setDataMap =
+ private ConcurrentMap<IgniteUuid, GridConcurrentHashSet<SetItemKey>> setDataMap =
new ConcurrentHashMap8<>();
/** Queues map. */
@@ -311,12 +311,13 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
*
* @param key Key.
* @param rmv {@code True} if entry was removed.
+ * @param keepPortable Keep portable flag.
*/
public void onEntryUpdated(KeyCacheObject key, boolean rmv, boolean keepPortable) {
Object key0 = cctx.cacheObjectContext().unwrapPortableIfNeeded(key, keepPortable, false);
- if (key0 instanceof GridCacheSetItemKey)
- onSetItemUpdated((GridCacheSetItemKey)key0, rmv);
+ if (key0 instanceof SetItemKey)
+ onSetItemUpdated((SetItemKey)key0, rmv);
}
/**
@@ -327,11 +328,11 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
public void onPartitionEvicted(int part) {
GridCacheAffinityManager aff = cctx.affinity();
- for (GridConcurrentHashSet<GridCacheSetItemKey> set : setDataMap.values()) {
- Iterator<GridCacheSetItemKey> iter = set.iterator();
+ for (GridConcurrentHashSet<SetItemKey> set : setDataMap.values()) {
+ Iterator<SetItemKey> iter = set.iterator();
while (iter.hasNext()) {
- GridCacheSetItemKey key = iter.next();
+ SetItemKey key = iter.next();
if (aff.partition(key) == part)
iter.remove();
@@ -415,7 +416,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
* @param id Set ID.
* @return Data for given set.
*/
- @Nullable public GridConcurrentHashSet<GridCacheSetItemKey> setData(IgniteUuid id) {
+ @Nullable public GridConcurrentHashSet<SetItemKey> setData(IgniteUuid id) {
return setDataMap.get(id);
}
@@ -436,7 +437,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
cctx.preloader().syncFuture().get();
}
- GridConcurrentHashSet<GridCacheSetItemKey> set = setDataMap.get(setId);
+ GridConcurrentHashSet<SetItemKey> set = setDataMap.get(setId);
if (set == null)
return;
@@ -445,9 +446,9 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
final int BATCH_SIZE = 100;
- Collection<GridCacheSetItemKey> keys = new ArrayList<>(BATCH_SIZE);
+ Collection<SetItemKey> keys = new ArrayList<>(BATCH_SIZE);
- for (GridCacheSetItemKey key : set) {
+ for (SetItemKey key : set) {
if (!loc && !aff.primary(cctx.localNode(), key, topVer))
continue;
@@ -555,14 +556,14 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
* @param key Set item key.
* @param rmv {@code True} if item was removed.
*/
- private void onSetItemUpdated(GridCacheSetItemKey key, boolean rmv) {
- GridConcurrentHashSet<GridCacheSetItemKey> set = setDataMap.get(key.setId());
+ private void onSetItemUpdated(SetItemKey key, boolean rmv) {
+ GridConcurrentHashSet<SetItemKey> set = setDataMap.get(key.setId());
if (set == null) {
if (rmv)
return;
- GridConcurrentHashSet<GridCacheSetItemKey> old = setDataMap.putIfAbsent(key.setId(),
+ GridConcurrentHashSet<SetItemKey> old = setDataMap.putIfAbsent(key.setId(),
set = new GridConcurrentHashSet<>());
if (old != null)
@@ -592,7 +593,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- private void retryRemoveAll(final IgniteInternalCache cache, final Collection<GridCacheSetItemKey> keys)
+ private void retryRemoveAll(final IgniteInternalCache cache, final Collection<SetItemKey> keys)
throws IgniteCheckedException {
DataStructuresProcessor.retry(log, new Callable<Void>() {
@Override public Void call() throws Exception {
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d8ab62a..c5ec258 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -824,25 +824,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return resFut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() {
@Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) throws IgniteCheckedException {
- Map<K, EntryProcessorResult<T>> resMap = fut.get();
+ Map<Object, EntryProcessorResult> resMap = (Map)fut.get();
- if (resMap != null) {
- return F.viewReadOnly(resMap, new C1<EntryProcessorResult<T>, EntryProcessorResult<T>>() {
- @Override public EntryProcessorResult<T> apply(EntryProcessorResult<T> res) {
- if (res instanceof CacheInvokeResult) {
- CacheInvokeResult invokeRes = (CacheInvokeResult)res;
-
- if (invokeRes.result() != null)
- res = CacheInvokeResult.fromResult((T)ctx.unwrapPortableIfNeeded(invokeRes.result(),
- keepBinary, false));
- }
-
- return res;
- }
- });
- }
-
- return null;
+ return ctx.unwrapInvokeResult(resMap, keepBinary);
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
index 220a45a..d172bca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
@@ -602,6 +602,14 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
if (affKeyFieldName != null)
return po.field(affKeyFieldName);
}
+ else if (po instanceof BinaryObjectEx) {
+ int id = ((BinaryObjectEx)po).typeId();
+
+ String affKeyFieldName = portableCtx.affinityKeyFieldName(id);
+
+ if (affKeyFieldName != null)
+ return po.field(affKeyFieldName);
+ }
}
catch (BinaryObjectException e) {
U.error(log, "Failed to get affinity field from portable object: " + po, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index bef587a..bb5d230 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -47,8 +47,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.datastructures.GridCacheSetItemKey;
import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate;
+import org.apache.ignite.internal.processors.datastructures.SetItemKey;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -761,21 +761,21 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
IgniteUuid id = filter.setId();
- Collection<GridCacheSetItemKey> data = cctx.dataStructures().setData(id);
+ Collection<SetItemKey> data = cctx.dataStructures().setData(id);
if (data == null)
data = Collections.emptyList();
final GridIterator<IgniteBiTuple<K, V>> it = F.iterator(
data,
- new C1<GridCacheSetItemKey, IgniteBiTuple<K, V>>() {
- @Override public IgniteBiTuple<K, V> apply(GridCacheSetItemKey e) {
+ new C1<SetItemKey, IgniteBiTuple<K, V>>() {
+ @Override public IgniteBiTuple<K, V> apply(SetItemKey e) {
return new IgniteBiTuple<>((K)e.item(), (V)Boolean.TRUE);
}
},
true,
- new P1<GridCacheSetItemKey>() {
- @Override public boolean apply(GridCacheSetItemKey e) {
+ new P1<SetItemKey>() {
+ @Override public boolean apply(SetItemKey e) {
return filter.apply(e, null);
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f13cff4..33c0fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -3220,8 +3220,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
try {
txFut.get();
- return new GridCacheReturn(cacheCtx, true, keepBinary,
- implicitRes.value(), implicitRes.success());
+ Object res = implicitRes.value();
+
+ if (implicitRes.invokeResult()) {
+ assert res == null || res instanceof Map : implicitRes;
+
+ res = cacheCtx.unwrapInvokeResult((Map)res, keepBinary);
+ }
+
+ return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success());
}
catch (IgniteCheckedException | RuntimeException e) {
rollbackAsync();
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java
new file mode 100644
index 0000000..8eb9fa0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ *
+ */
+public class CollocatedQueueItemKey implements QueueItemKey {
+ /** */
+ private IgniteUuid queueId;
+
+ /** */
+ @AffinityKeyMapped
+ private int queueNameHash;
+
+ /** */
+ private long idx;
+
+ /**
+ * @param queueId Queue unique ID.
+ * @param queueName Queue name.
+ * @param idx Item index.
+ */
+ public CollocatedQueueItemKey(IgniteUuid queueId, String queueName, long idx) {
+ this.queueId = queueId;
+ this.queueNameHash = queueName.hashCode();
+ this.idx = idx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ CollocatedQueueItemKey itemKey = (CollocatedQueueItemKey)o;
+
+ return idx == itemKey.idx && queueId.equals(itemKey.queueId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = queueId.hashCode();
+
+ res = 31 * res + (int)(idx ^ (idx >>> 32));
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CollocatedQueueItemKey.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java
new file mode 100644
index 0000000..94cffd4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ *
+ */
+public class CollocatedSetItemKey implements SetItemKey {
+ /** */
+ private IgniteUuid setId;
+
+ /** */
+ @GridToStringInclude
+ private Object item;
+
+ /** */
+ @AffinityKeyMapped
+ private int setNameHash;
+
+ /**
+ * @param setName Set name.
+ * @param setId Set unique ID.
+ * @param item Set item.
+ */
+ public CollocatedSetItemKey(String setName, IgniteUuid setId, Object item) {
+ this.setNameHash = setName.hashCode();
+ this.setId = setId;
+ this.item = item;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid setId() {
+ return setId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object item() {
+ return item;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = setId.hashCode();
+
+ res = 31 * res + item.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ CollocatedSetItemKey that = (CollocatedSetItemKey)o;
+
+ return setId.equals(that.setId) && item.equals(that.item);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CollocatedSetItemKey.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 998bd92..9ed9350 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -113,12 +113,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/** Initial capacity. */
private static final int INITIAL_CAPACITY = 10;
- /** */
- private static final int MAX_UPDATE_RETRIES = 100;
-
- /** */
- private static final long RETRY_DELAY = 1;
-
/** Initialization latch. */
private final CountDownLatch initLatch = new CountDownLatch(1);
@@ -986,6 +980,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
hdr.id(),
name,
hdr.collocated(),
+ cctx.binaryMarshaller(),
hdr.head(),
hdr.tail(),
0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
index b433887..58d3efe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
@@ -55,7 +55,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
checkRemoved(idx);
- GridCacheQueueItemKey key = itemKey(idx);
+ QueueItemKey key = itemKey(idx);
cache.getAndPut(key, item);
@@ -78,7 +78,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
checkRemoved(idx);
- GridCacheQueueItemKey key = itemKey(idx);
+ QueueItemKey key = itemKey(idx);
T data = (T)cache.getAndRemove(key);
@@ -115,7 +115,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
checkRemoved(idx);
- Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
+ Map<QueueItemKey, T> putMap = new HashMap<>();
for (T item : items) {
putMap.put(itemKey(idx), item);
@@ -140,7 +140,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
if (idx != null) {
checkRemoved(idx);
- GridCacheQueueItemKey key = itemKey(idx);
+ QueueItemKey key = itemKey(idx);
if (cache.remove(key))
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index df1bd88..ca0250d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -58,9 +58,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
protected static final long QUEUE_REMOVED_IDX = Long.MIN_VALUE;
/** */
- protected static final long RETRY_DELAY = 1;
-
- /** */
private static final int DFLT_CLEAR_BATCH_SIZE = 100;
/** Logger. */
@@ -98,6 +95,9 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
@GridToStringExclude
private final Semaphore writeSem;
+ /** */
+ private final boolean binaryMarsh;
+
/**
* @param queueName Queue name.
* @param hdr Queue hdr.
@@ -112,6 +112,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
collocated = hdr.collocated();
queueKey = new GridCacheQueueHeaderKey(queueName);
cache = cctx.kernalContext().cache().internalCache(cctx.name());
+ binaryMarsh = cctx.binaryMarshaller();
log = cctx.logger(getClass());
@@ -369,7 +370,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
checkRemoved(t.get1());
- removeKeys(cache, id, queueName, collocated, t.get1(), t.get2(), batchSize);
+ removeKeys(cache, id, queueName, collocated, binaryMarsh, t.get1(), t.get2(), batchSize);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -407,6 +408,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
* @param id Queue unique ID.
* @param name Queue name.
* @param collocated Collocation flag.
+ * @param binaryMarsh {@code True} if binary marshaller is configured.
* @param startIdx Start item index.
* @param endIdx End item index.
* @param batchSize Batch size.
@@ -418,14 +420,15 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
IgniteUuid id,
String name,
boolean collocated,
+ boolean binaryMarsh,
long startIdx,
long endIdx,
int batchSize)
throws IgniteCheckedException {
- Set<GridCacheQueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10);
+ Set<QueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10);
for (long idx = startIdx; idx < endIdx; idx++) {
- keys.add(itemKey(id, name, collocated, idx));
+ keys.add(itemKey(id, name, collocated, binaryMarsh, idx));
if (batchSize > 0 && keys.size() == batchSize) {
cache.removeAll(keys);
@@ -536,8 +539,8 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
* @param idx Item index.
* @return Item key.
*/
- protected GridCacheQueueItemKey itemKey(Long idx) {
- return itemKey(id, queueName, collocated(), idx);
+ protected QueueItemKey itemKey(Long idx) {
+ return itemKey(id, queueName, collocated(), binaryMarsh, idx);
}
/** {@inheritDoc} */
@@ -558,11 +561,18 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
* @param id Queue unique ID.
* @param queueName Queue name.
* @param collocated Collocation flag.
+ * @param binaryMarsh {@code True} if binary marshaller is configured.
* @param idx Item index.
* @return Item key.
*/
- private static GridCacheQueueItemKey itemKey(IgniteUuid id, String queueName, boolean collocated, long idx) {
- return collocated ? new CollocatedItemKey(id, queueName, idx) : new GridCacheQueueItemKey(id, queueName, idx);
+ private static QueueItemKey itemKey(IgniteUuid id,
+ String queueName,
+ boolean collocated,
+ boolean binaryMarsh,
+ long idx) {
+ return collocated ?
+ (binaryMarsh ? new CollocatedQueueItemKey(id, queueName, idx) : new CollocatedItemKey(id, queueName, idx)) :
+ new GridCacheQueueItemKey(id, queueName, idx);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java
index c4cb7b1..df47e73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java
@@ -21,7 +21,6 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -29,7 +28,7 @@ import org.apache.ignite.lang.IgniteUuid;
/**
* Queue item key.
*/
-class GridCacheQueueItemKey implements Externalizable, GridCacheInternal {
+class GridCacheQueueItemKey implements Externalizable, QueueItemKey {
/** */
private static final long serialVersionUID = 0L;
@@ -110,11 +109,11 @@ class GridCacheQueueItemKey implements Externalizable, GridCacheInternal {
/** {@inheritDoc} */
@Override public int hashCode() {
- int result = queueId.hashCode();
+ int res = queueId.hashCode();
- result = 31 * result + (int)(idx ^ (idx >>> 32));
+ res = 31 * res + (int)(idx ^ (idx >>> 32));
- return result;
+ return res;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 62eab61..f25e361 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -66,7 +66,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
private final GridCacheContext ctx;
/** Cache. */
- private final IgniteInternalCache<GridCacheSetItemKey, Boolean> cache;
+ private final IgniteInternalCache<SetItemKey, Boolean> cache;
/** Logger. */
private final IgniteLogger log;
@@ -86,6 +86,9 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
/** Removed flag. */
private volatile boolean rmvd;
+ /** */
+ private final boolean binaryMarsh;
+
/**
* @param ctx Cache context.
* @param name Set name.
@@ -97,6 +100,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
this.name = name;
id = hdr.id();
collocated = hdr.collocated();
+ binaryMarsh = ctx.binaryMarshaller();
cache = ctx.cache();
@@ -140,7 +144,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
onAccess();
if (ctx.isLocal() || ctx.isReplicated()) {
- GridConcurrentHashSet<GridCacheSetItemKey> set = ctx.dataStructures().setData(id);
+ GridConcurrentHashSet<SetItemKey> set = ctx.dataStructures().setData(id);
return set != null ? set.size() : 0;
}
@@ -171,7 +175,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
@Override public boolean isEmpty() {
onAccess();
- GridConcurrentHashSet<GridCacheSetItemKey> set = ctx.dataStructures().setData(id);
+ GridConcurrentHashSet<SetItemKey> set = ctx.dataStructures().setData(id);
return (set == null || set.isEmpty()) && size() == 0;
}
@@ -180,7 +184,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
@Override public boolean contains(Object o) {
onAccess();
- final GridCacheSetItemKey key = itemKey(o);
+ final SetItemKey key = itemKey(o);
return retry(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
@@ -193,7 +197,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
@Override public boolean add(T o) {
onAccess();
- final GridCacheSetItemKey key = itemKey(o);
+ final SetItemKey key = itemKey(o);
return retry(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
@@ -206,7 +210,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
@Override public boolean remove(Object o) {
onAccess();
- final GridCacheSetItemKey key = itemKey(o);
+ final SetItemKey key = itemKey(o);
return retry(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
@@ -231,7 +235,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
boolean add = false;
- Map<GridCacheSetItemKey, Boolean> addKeys = null;
+ Map<SetItemKey, Boolean> addKeys = null;
for (T obj : c) {
if (add) {
@@ -247,7 +251,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
}
}
else
- add |= add(obj);
+ add = add(obj);
}
if (!F.isEmpty(addKeys))
@@ -262,7 +266,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
boolean rmv = false;
- Set<GridCacheSetItemKey> rmvKeys = null;
+ Set<SetItemKey> rmvKeys = null;
for (Object obj : c) {
if (rmv) {
@@ -278,7 +282,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
}
}
else
- rmv |= remove(obj);
+ rmv = remove(obj);
}
if (!F.isEmpty(rmvKeys))
@@ -295,7 +299,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
try (GridCloseableIterator<T> iter = iterator0()) {
boolean rmv = false;
- Set<GridCacheSetItemKey> rmvKeys = null;
+ Set<SetItemKey> rmvKeys = null;
for (T val : iter) {
if (!c.contains(val)) {
@@ -331,7 +335,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
onAccess();
try (GridCloseableIterator<T> iter = iterator0()) {
- Collection<GridCacheSetItemKey> rmvKeys = new ArrayList<>(BATCH_SIZE);
+ Collection<SetItemKey> rmvKeys = new ArrayList<>(BATCH_SIZE);
for (T val : iter) {
rmvKeys.add(itemKey(val));
@@ -425,7 +429,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
/**
* @param keys Keys to remove.
*/
- private void retryRemoveAll(final Collection<GridCacheSetItemKey> keys) {
+ private void retryRemoveAll(final Collection<SetItemKey> keys) {
retry(new Callable<Void>() {
@Override public Void call() throws Exception {
cache.removeAll(keys);
@@ -438,7 +442,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
/**
* @param keys Keys to remove.
*/
- private void retryPutAll(final Map<GridCacheSetItemKey, Boolean> keys) {
+ private void retryPutAll(final Map<SetItemKey, Boolean> keys) {
retry(new Callable<Void>() {
@Override public Void call() throws Exception {
cache.putAll(keys);
@@ -523,8 +527,9 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
* @param item Set item.
* @return Item key.
*/
- private GridCacheSetItemKey itemKey(Object item) {
- return collocated ? new CollocatedItemKey(name, id, item) : new GridCacheSetItemKey(id, item);
+ private SetItemKey itemKey(Object item) {
+ return collocated ? (binaryMarsh ? new CollocatedSetItemKey(name, id, item) : new CollocatedItemKey(name, id, item))
+ : new GridCacheSetItemKey(id, item);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
index d025dce..8b47b3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
@@ -21,7 +21,6 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -30,7 +29,7 @@ import org.apache.ignite.lang.IgniteUuid;
/**
* Set item key.
*/
-public class GridCacheSetItemKey implements GridCacheInternal, Externalizable {
+public class GridCacheSetItemKey implements SetItemKey, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -57,27 +56,23 @@ public class GridCacheSetItemKey implements GridCacheInternal, Externalizable {
this.item = item;
}
- /**
- * @return Set UUID.
- */
- public IgniteUuid setId() {
+ /** {@inheritDoc} */
+ @Override public IgniteUuid setId() {
return setId;
}
- /**
- * @return Set item.
- */
- public Object item() {
+ /** {@inheritDoc} */
+ @Override public Object item() {
return item;
}
/** {@inheritDoc} */
@Override public int hashCode() {
- int result = setId.hashCode();
+ int res = setId.hashCode();
- result = 31 * result + item.hashCode();
+ res = 31 * res + item.hashCode();
- return result;
+ return res;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index 4880324..32e94d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -143,7 +143,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
if (idx != null) {
checkRemoved(idx);
- Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
+ Map<QueueItemKey, T> putMap = new HashMap<>();
for (T item : items) {
putMap.put(itemKey(idx), item);
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java
new file mode 100644
index 0000000..fe0cef3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+
+/**
+ *
+ */
+public interface QueueItemKey extends GridCacheInternal {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java
new file mode 100644
index 0000000..759945a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ *
+ */
+public interface SetItemKey extends GridCacheInternal {
+ /**
+ * @return Set UUID.
+ */
+ public IgniteUuid setId();
+
+ /**
+ * @return Set item.
+ */
+ public Object item();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
index b881d90..51a70b9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -139,6 +140,31 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
tx = startTx(txMode);
+ TestValue testVal = cache.invoke(key, new UserClassValueProcessor());
+
+ if (tx != null)
+ tx.commit();
+
+ assertEquals("63", testVal.value());
+
+ checkValue(key, 63);
+
+ tx = startTx(txMode);
+
+ Collection<TestValue> testValCol = cache.invoke(key, new CollectionReturnProcessor());
+
+ if (tx != null)
+ tx.commit();
+
+ assertEquals(10, testValCol.size());
+
+ for (TestValue val : testValCol)
+ assertEquals("64", val.value());
+
+ checkValue(key, 63);
+
+ tx = startTx(txMode);
+
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.invoke(key, new ExceptionProcessor(63));
@@ -237,166 +263,226 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
IncrementProcessor incProcessor = new IncrementProcessor();
- Transaction tx = startTx(txMode);
+ {
+ Transaction tx = startTx(txMode);
- Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor);
+ Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor);
- if (tx != null)
- tx.commit();
+ if (tx != null)
+ tx.commit();
- Map<Object, Object> exp = new HashMap<>();
+ Map<Object, Object> exp = new HashMap<>();
- for (Integer key : keys)
- exp.put(key, -1);
+ for (Integer key : keys)
+ exp.put(key, -1);
- checkResult(resMap, exp);
+ checkResult(resMap, exp);
- for (Integer key : keys)
- checkValue(key, 1);
+ for (Integer key : keys)
+ checkValue(key, 1);
+ }
- tx = startTx(txMode);
+ {
+ Transaction tx = startTx(txMode);
- resMap = cache.invokeAll(keys, incProcessor);
+ Map<Integer, EntryProcessorResult<TestValue>> resMap = cache.invokeAll(keys, new UserClassValueProcessor());
- if (tx != null)
- tx.commit();
+ if (tx != null)
+ tx.commit();
- exp = new HashMap<>();
+ Map<Object, Object> exp = new HashMap<>();
- for (Integer key : keys)
- exp.put(key, 1);
+ for (Integer key : keys)
+ exp.put(key, new TestValue("1"));
- checkResult(resMap, exp);
+ checkResult(resMap, exp);
- for (Integer key : keys)
- checkValue(key, 2);
+ for (Integer key : keys)
+ checkValue(key, 1);
+ }
- tx = startTx(txMode);
+ {
+ Transaction tx = startTx(txMode);
- resMap = cache.invokeAll(keys, new ArgumentsSumProcessor(), 10, 20, 30);
+ Map<Integer, EntryProcessorResult<Collection<TestValue>>> resMap =
+ cache.invokeAll(keys, new CollectionReturnProcessor());
- if (tx != null)
- tx.commit();
+ if (tx != null)
+ tx.commit();
- for (Integer key : keys)
- exp.put(key, 3);
+ Map<Object, Object> exp = new HashMap<>();
- checkResult(resMap, exp);
+ for (Integer key : keys) {
+ List<TestValue> expCol = new ArrayList<>();
- for (Integer key : keys)
- checkValue(key, 62);
+ for (int i = 0; i < 10; i++)
+ expCol.add(new TestValue("2"));
- tx = startTx(txMode);
+ exp.put(key, expCol);
+ }
- resMap = cache.invokeAll(keys, new ExceptionProcessor(null));
+ checkResult(resMap, exp);
- if (tx != null)
- tx.commit();
+ for (Integer key : keys)
+ checkValue(key, 1);
+ }
- for (Integer key : keys) {
- final EntryProcessorResult<Integer> res = resMap.get(key);
+ {
+ Transaction tx = startTx(txMode);
- assertNotNull("No result for " + key);
+ Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor);
- GridTestUtils.assertThrows(log, new Callable<Void>() {
- @Override public Void call() throws Exception {
- res.get();
+ if (tx != null)
+ tx.commit();
- return null;
- }
- }, EntryProcessorException.class, "Test processor exception.");
+ Map<Object, Object> exp = new HashMap<>();
+
+ for (Integer key : keys)
+ exp.put(key, 1);
+
+ checkResult(resMap, exp);
+
+ for (Integer key : keys)
+ checkValue(key, 2);
}
- for (Integer key : keys)
- checkValue(key, 62);
+ {
+ Transaction tx = startTx(txMode);
- tx = startTx(txMode);
+ Map<Integer, EntryProcessorResult<Integer>> resMap =
+ cache.invokeAll(keys, new ArgumentsSumProcessor(), 10, 20, 30);
- Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>();
+ if (tx != null)
+ tx.commit();
+
+ Map<Object, Object> exp = new HashMap<>();
+
+ for (Integer key : keys)
+ exp.put(key, 3);
+
+ checkResult(resMap, exp);
+
+ for (Integer key : keys)
+ checkValue(key, 62);
+ }
+
+ {
+ Transaction tx = startTx(txMode);
- for (Integer key : keys) {
- switch (key % 4) {
- case 0: invokeMap.put(key, new IncrementProcessor()); break;
+ Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, new ExceptionProcessor(null));
- case 1: invokeMap.put(key, new RemoveProcessor(62)); break;
+ if (tx != null)
+ tx.commit();
- case 2: invokeMap.put(key, new ArgumentsSumProcessor()); break;
+ for (Integer key : keys) {
+ final EntryProcessorResult<Integer> res = resMap.get(key);
- case 3: invokeMap.put(key, new ExceptionProcessor(62)); break;
+ assertNotNull("No result for " + key);
- default:
- fail();
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ res.get();
+
+ return null;
+ }
+ }, EntryProcessorException.class, "Test processor exception.");
}
+
+ for (Integer key : keys)
+ checkValue(key, 62);
}
- resMap = cache.invokeAll(invokeMap, 10, 20, 30);
+ {
+ Transaction tx = startTx(txMode);
- if (tx != null)
- tx.commit();
+ Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>();
- for (Integer key : keys) {
- final EntryProcessorResult<Integer> res = resMap.get(key);
+ for (Integer key : keys) {
+ switch (key % 4) {
+ case 0: invokeMap.put(key, new IncrementProcessor()); break;
- switch (key % 4) {
- case 0: {
- assertNotNull("No result for " + key, res);
+ case 1: invokeMap.put(key, new RemoveProcessor(62)); break;
- assertEquals(62, (int)res.get());
+ case 2: invokeMap.put(key, new ArgumentsSumProcessor()); break;
- checkValue(key, 63);
+ case 3: invokeMap.put(key, new ExceptionProcessor(62)); break;
- break;
+ default:
+ fail();
}
+ }
- case 1: {
- assertNull(res);
+ Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(invokeMap, 10, 20, 30);
- checkValue(key, null);
+ if (tx != null)
+ tx.commit();
- break;
- }
+ for (Integer key : keys) {
+ final EntryProcessorResult<Integer> res = resMap.get(key);
- case 2: {
- assertNotNull("No result for " + key, res);
+ switch (key % 4) {
+ case 0: {
+ assertNotNull("No result for " + key, res);
- assertEquals(3, (int)res.get());
+ assertEquals(62, (int)res.get());
- checkValue(key, 122);
+ checkValue(key, 63);
- break;
- }
+ break;
+ }
+
+ case 1: {
+ assertNull(res);
+
+ checkValue(key, null);
+
+ break;
+ }
+
+ case 2: {
+ assertNotNull("No result for " + key, res);
+
+ assertEquals(3, (int)res.get());
+
+ checkValue(key, 122);
- case 3: {
- assertNotNull("No result for " + key, res);
+ break;
+ }
- GridTestUtils.assertThrows(log, new Callable<Void>() {
- @Override public Void call() throws Exception {
- res.get();
+ case 3: {
+ assertNotNull("No result for " + key, res);
- return null;
- }
- }, EntryProcessorException.class, "Test processor exception.");
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ res.get();
- checkValue(key, 62);
+ return null;
+ }
+ }, EntryProcessorException.class, "Test processor exception.");
- break;
+ checkValue(key, 62);
+
+ break;
+ }
}
}
}
cache.invokeAll(keys, new IncrementProcessor());
- tx = startTx(txMode);
+ {
+ Transaction tx = startTx(txMode);
- resMap = cache.invokeAll(keys, new RemoveProcessor(null));
+ Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, new RemoveProcessor(null));
- if (tx != null)
- tx.commit();
+ if (tx != null)
+ tx.commit();
- assertEquals("Unexpected results: " + resMap, 0, resMap.size());
+ assertEquals("Unexpected results: " + resMap, 0, resMap.size());
- for (Integer key : keys)
- checkValue(key, null);
+ for (Integer key : keys)
+ checkValue(key, null);
+ }
IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
@@ -406,9 +492,9 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
IgniteFuture<Map<Integer, EntryProcessorResult<Integer>>> fut = asyncCache.future();
- resMap = fut.get();
+ Map<Integer, EntryProcessorResult<Integer>> resMap = fut.get();
- exp = new HashMap<>();
+ Map<Object, Object> exp = new HashMap<>();
for (Integer key : keys)
exp.put(key, -1);
@@ -418,7 +504,7 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
for (Integer key : keys)
checkValue(key, 1);
- invokeMap = new HashMap<>();
+ Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>();
for (Integer key : keys)
invokeMap.put(key, incProcessor);
@@ -442,15 +528,16 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
* @param resMap Result map.
* @param exp Expected results.
*/
- private void checkResult(Map<Integer, EntryProcessorResult<Integer>> resMap, Map<Object, Object> exp) {
+ @SuppressWarnings("unchecked")
+ private void checkResult(Map resMap, Map<Object, Object> exp) {
assertNotNull(resMap);
assertEquals(exp.size(), resMap.size());
for (Map.Entry<Object, Object> expVal : exp.entrySet()) {
- EntryProcessorResult<Integer> res = resMap.get(expVal.getKey());
+ EntryProcessorResult<?> res = (EntryProcessorResult)resMap.get(expVal.getKey());
- assertNotNull("No result for " + expVal.getKey());
+ assertNotNull("No result for " + expVal.getKey(), res);
assertEquals("Unexpected result for " + expVal.getKey(), res.get(), expVal.getValue());
}
@@ -557,6 +644,44 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
/**
*
*/
+ protected static class UserClassValueProcessor implements EntryProcessor<Integer, Integer, TestValue> {
+ /** {@inheritDoc} */
+ @Override public TestValue process(MutableEntry<Integer, Integer> e, Object... arguments)
+ throws EntryProcessorException {
+ return new TestValue(String.valueOf(e.getValue()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(UserClassValueProcessor.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ protected static class CollectionReturnProcessor implements
+ EntryProcessor<Integer, Integer, Collection<TestValue>> {
+ /** {@inheritDoc} */
+ @Override public Collection<TestValue> process(MutableEntry<Integer, Integer> e, Object... arguments)
+ throws EntryProcessorException {
+ List<TestValue> vals = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++)
+ vals.add(new TestValue(String.valueOf(e.getValue() + 1)));
+
+ return vals;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CollectionReturnProcessor.class, this);
+ }
+ }
+
+ /**
+ *
+ */
protected static class IncrementProcessor implements EntryProcessor<Integer, Integer, Integer> {
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<Integer, Integer> e,
@@ -656,4 +781,50 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT
return S.toString(ExceptionProcessor.class, this);
}
}
+
+ /**
+ *
+ */
+ static class TestValue {
+ /** */
+ private String val;
+
+ /**
+ * @param val Value.
+ */
+ public TestValue(String val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Value.
+ */
+ public String value() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestValue testVal = (TestValue) o;
+
+ return val.equals(testVal.val);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestValue.class, this);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 2751de1..ef96d9f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -170,11 +170,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
while (U.currentTimeMillis() < stopTime)
assertEquals(10, atomic.get());
}
- catch (IgniteException e) {
- if (X.hasCause(e, ClusterTopologyServerNotFoundException.class))
- return;
-
- throw e;
+ catch (IgniteException ignore) {
+ return; // Test that client does not hang.
}
fail();
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
index cf638df..6366f09 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
@@ -244,15 +244,27 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
}
/**
- * JUnit.
- *
* @throws Exception If failed.
*/
public void testIterator() throws Exception {
+ checkIterator(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIteratorCollocated() throws Exception {
+ checkIterator(true);
+ }
+
+ /**
+ * @param collocated Collocated flag.
+ */
+ private void checkIterator(boolean collocated) {
// Random queue name.
String queueName = UUID.randomUUID().toString();
- IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false));
+ IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(collocated));
for (int i = 0; i < 100; i++)
assert queue.add(Integer.toString(i));
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java
index 74c9a4f..ca57205 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.datastructures.GridCacheSetItemKey;
+import org.apache.ignite.internal.processors.datastructures.SetItemKey;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.testframework.GridTestUtils;
@@ -183,8 +183,8 @@ public abstract class GridCacheSetFailoverAbstractSelfTest extends IgniteCollect
if (entry.hasValue()) {
cnt++;
- if (entry.key() instanceof GridCacheSetItemKey) {
- GridCacheSetItemKey setItem = (GridCacheSetItemKey)entry.key();
+ if (entry.key() instanceof SetItemKey) {
+ SetItemKey setItem = (SetItemKey)entry.key();
if (setIds.add(setItem.setId()))
log.info("Unexpected set item [setId=" + setItem.setId() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java
index 2420153..de2fa07 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java
@@ -31,6 +31,11 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
*/
public class GridCachePartitionedQueueApiSelfTest extends GridCacheQueueApiSelfAbstractTest {
/** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
@Override protected CacheMode collectionCacheMode() {
return PARTITIONED;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
index 1d225a6..db11291 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java
@@ -90,7 +90,7 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
* @throws Exception If failed.
*/
public void testQueue() throws Exception {
- final String queueName = "queue-test-name";
+ final String queueName = "q";
System.out.println(U.filler(20, '\n'));
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java
new file mode 100644
index 0000000..880c638
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
+
+import java.util.Iterator;
+import java.util.UUID;
+import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class IgnitePartitionedQueueNoBackupsTest extends GridCachePartitionedQueueApiSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheMode collectionCacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMemoryMode collectionMemoryMode() {
+ return ONHEAP_TIERED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode collectionCacheAtomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CollectionConfiguration collectionConfiguration() {
+ CollectionConfiguration colCfg = super.collectionConfiguration();
+
+ colCfg.setBackups(0);
+
+ return colCfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCollocation() throws Exception {
+ IgniteQueue<Integer> queue = grid(0).queue("queue", 0, config(true));
+
+ for (int i = 0; i < 1000; i++)
+ assertTrue(queue.add(i));
+
+ assertEquals(1000, queue.size());
+
+ GridCacheContext cctx = GridTestUtils.getFieldValue(queue, "cctx");
+
+ UUID setNodeId = null;
+
+ for (int i = 0; i < gridCount(); i++) {
+ IgniteKernal grid = (IgniteKernal)grid(i);
+
+ Iterator<GridCacheEntryEx> entries =
+ grid.context().cache().internalCache(cctx.name()).map().allEntries0().iterator();
+
+ if (entries.hasNext()) {
+ if (setNodeId == null)
+ setNodeId = grid.localNode().id();
+ else
+ fail("For collocated queue all items should be stored on single node.");
+ }
+ }
+ }}
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java
index 1aea6d9..bad37a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java
@@ -31,6 +31,11 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
*/
public class GridCacheReplicatedQueueApiSelfTest extends GridCacheQueueApiSelfAbstractTest {
/** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
@Override protected CacheMode collectionCacheMode() {
return REPLICATED;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
index 4a5141e..e9674f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java
@@ -114,7 +114,7 @@ public abstract class GridCacheWriteBehindStoreAbstractTest extends GridCommonAb
Map<Integer, String> map = store.getMap();
- assert map.isEmpty();
+ assert map.isEmpty() : map;
Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ);