You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/01/19 19:59:15 UTC
ignite git commit: IGNITE-10837 WIP
Repository: ignite
Updated Branches:
refs/heads/ignite-gg-10837 76685e376 -> 85e22a278
IGNITE-10837 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/85e22a27
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/85e22a27
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/85e22a27
Branch: refs/heads/ignite-gg-10837
Commit: 85e22a2785d62081348da3f5b07be2a0e4c110b4
Parents: 76685e3
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Jan 19 21:58:49 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Jan 19 21:58:49 2016 +0300
----------------------------------------------------------------------
.../processors/cache/IgniteCacheProxy.java | 48 -------------
.../dht/atomic/GridDhtAtomicCache.java | 74 ++++++++------------
.../processors/cache/dr/GridCacheDrInfo.java | 8 +--
.../transactions/IgniteTxLocalAdapter.java | 12 ++--
4 files changed, 40 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/85e22a27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 8cf7285..fb549eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -65,10 +65,8 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.GridEmptyIterator;
@@ -1082,52 +1080,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
}
- /**
- * Put entries to cache with conflict resolution.
- *
- * @param conflictMap Conflict map.
- */
- public void putAllConflict(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) {
- try {
- GridCacheGateway<K, V> gate = this.gate;
-
- CacheOperationContext prev = onEnter(gate, opCtx);
-
- try {
- delegate.putAllConflict(conflictMap);
- }
- finally {
- onLeave(gate, prev);
- }
- }
- catch (IgniteCheckedException e) {
- throw cacheException(e);
- }
- }
-
- /**
- * Remove entries from cache with conflict resolution.
- *
- * @param conflictMap Conflict map.
- */
- public void removeAllConflict(Map<KeyCacheObject, GridCacheVersion> conflictMap) {
- try {
- GridCacheGateway<K, V> gate = this.gate;
-
- CacheOperationContext prev = onEnter(gate, opCtx);
-
- try {
- delegate.removeAllConflict(conflictMap);
- }
- finally {
- onLeave(gate, prev);
- }
- }
- catch (IgniteCheckedException e) {
- throw cacheException(e);
- }
- }
-
/** {@inheritDoc} */
@Override public V getAndPut(K key, V val) {
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/85e22a27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2c45121..4353232 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -37,7 +37,6 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -99,6 +98,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
@@ -620,21 +620,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/** {@inheritDoc} */
- public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> conflictMap, Object... args)
- throws IgniteCheckedException {
- updateAllAsync0(null,
- null,
- args,
- conflictMap,
- null,
- false,
- false,
- null,
- true,
- TRANSFORM).get();
- }
-
- /** {@inheritDoc} */
@Override public V getAndRemove(K key) throws IgniteCheckedException {
return getAndRemoveAsync(key).get();
}
@@ -943,41 +928,43 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.checkSecurity(SecurityPermission.CACHE_PUT);
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ final CacheOperationContext opCtx = ctx.operationContextPerCall();
if (opCtx != null && opCtx.hasDataCenterId()) {
assert conflictPutMap == null : conflictPutMap;
assert conflictRmvMap == null : conflictRmvMap;
if (op == GridCacheOperation.TRANSFORM) {
- Map<KeyCacheObject, GridCacheDrInfo> confMap = new HashMap<>(invokeMap.size());
-
- for (Map.Entry<? extends K, ? extends EntryProcessor> e : invokeMap.entrySet())
- confMap.put(ctx.toCacheKeyObject(e.getKey()), new GridCacheDrInfo((CacheEntryProcessor)e.getValue(),
- ctx.versions().next(opCtx.dataCenterId())));
+ assert invokeMap != null : invokeMap;
- conflictPutMap = confMap;
+ conflictPutMap = F.viewReadOnly((Map)invokeMap,
+ new IgniteClosure<EntryProcessor, GridCacheDrInfo>() {
+ @Override public GridCacheDrInfo apply(EntryProcessor o) {
+ return new GridCacheDrInfo(o, ctx.versions().next(opCtx.dataCenterId()));
+ }
+ });
invokeMap = null;
}
else if (op == GridCacheOperation.DELETE) {
- Map<KeyCacheObject, GridCacheVersion> confMap = new HashMap<>(map.size());
-
- for (K key : map.keySet())
- confMap.put(ctx.toCacheKeyObject(key), ctx.versions().next(opCtx.dataCenterId()));
+ assert map != null : map;
- conflictRmvMap = confMap;
+ conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheVersion>() {
+ @Override public GridCacheVersion apply(V o) {
+ return ctx.versions().next(opCtx.dataCenterId());
+ }
+ });
map = null;
}
else {
- Map<KeyCacheObject, GridCacheDrInfo> confMap = new HashMap<>(map.size());
+ assert map != null : map;
- for (Map.Entry<? extends K, ? extends V> e : map.entrySet())
- confMap.put(ctx.toCacheKeyObject(e.getKey()), new GridCacheDrInfo(
- ctx.toCacheObject(e.getValue()), ctx.versions().next(opCtx.dataCenterId())));
-
- conflictPutMap = confMap;
+ conflictPutMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheDrInfo>() {
+ @Override public GridCacheDrInfo apply(V o) {
+ return new GridCacheDrInfo(ctx.toCacheObject(o), ctx.versions().next(opCtx.dataCenterId()));
+ }
+ });
map = null;
}
@@ -1048,23 +1035,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ final CacheOperationContext opCtx = ctx.operationContextPerCall();
UUID subjId = ctx.subjectIdPerCall(null, opCtx);
int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
+ Collection<GridCacheVersion> drVers = null;
+
if (opCtx != null && keys != null && opCtx.hasDataCenterId()) {
assert conflictMap == null : conflictMap;
- Map<KeyCacheObject, GridCacheVersion> confMap = new HashMap<>(keys.size());
-
- for (K key : keys)
- confMap.put(ctx.toCacheKeyObject(key), ctx.versions().next(opCtx.dataCenterId()));
-
- conflictMap = confMap;
-
- keys = null;
+ drVers = F.transform(keys, new C1<K, GridCacheVersion>() {
+ @Override public GridCacheVersion apply(K k) {
+ return ctx.versions().next(opCtx.dataCenterId());
+ }
+ });
}
final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
@@ -1076,7 +1062,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
null,
null,
- keys != null ? null : conflictMap.values(),
+ drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
retval,
rawRetval,
(filter != null && opCtx != null) ? opCtx.expiry() : null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/85e22a27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
index bd3df02..02bc6b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
@@ -21,7 +21,7 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import org.apache.ignite.cache.CacheEntryProcessor;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -38,7 +38,7 @@ public class GridCacheDrInfo implements Externalizable {
private CacheObject val;
/** Entry processor. */
- private CacheEntryProcessor proc;
+ private EntryProcessor proc;
/** DR version. */
private GridCacheVersion ver;
@@ -79,7 +79,7 @@ public class GridCacheDrInfo implements Externalizable {
* @param proc Entry processor.
* @param ver Version.
*/
- public GridCacheDrInfo(CacheEntryProcessor proc, GridCacheVersion ver) {
+ public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) {
assert proc != null;
assert ver != null;
@@ -97,7 +97,7 @@ public class GridCacheDrInfo implements Externalizable {
/**
* @return Entry processor.
*/
- public CacheEntryProcessor entryProcessor() {
+ public EntryProcessor entryProcessor() {
return proc;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/85e22a27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 0b19f30..32a9e27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -22,13 +22,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -3087,8 +3087,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (opCtx != null && opCtx.hasDataCenterId()) {
assert drMap == null : drMap;
+ assert map != null || invokeMap != null;
- drMap = (Map<KeyCacheObject, GridCacheDrInfo>)F.viewReadOnly((Map<K, V>) map,
+ drMap = (Map<KeyCacheObject, GridCacheDrInfo>)F.viewReadOnly((Map)(map != null ? map : invokeMap),
new IgniteClosure<V, GridCacheDrInfo>() {
@Override public GridCacheDrInfo apply(V val) {
return new GridCacheDrInfo(cctx.versions().next(opCtx.dataCenterId()));
@@ -3348,10 +3349,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
- if (drMap == null && opCtx != null && opCtx.hasDataCenterId()) {
- assert drMap == null : drMap;
-
- Map<K, GridCacheVersion> confMap = new TreeMap<>();
+ // DrMap used as map. Keys order doesn't matter.
+ if (keys != null && drMap == null && opCtx != null && opCtx.hasDataCenterId()) {
+ Map<K, GridCacheVersion> confMap = new HashMap<>(keys.size());
for (K key : keys)
confMap.put(key, cacheCtx.versions().next(opCtx.dataCenterId()));