You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/01/19 19:59:15 UTC

ignite git commit: IGNITE-10837 WIP

Repository: ignite
Updated Branches:
  refs/heads/ignite-gg-10837 76685e376 -> 85e22a278


IGNITE-10837 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/85e22a27
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/85e22a27
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/85e22a27

Branch: refs/heads/ignite-gg-10837
Commit: 85e22a2785d62081348da3f5b07be2a0e4c110b4
Parents: 76685e3
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Jan 19 21:58:49 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Jan 19 21:58:49 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      | 48 -------------
 .../dht/atomic/GridDhtAtomicCache.java          | 74 ++++++++------------
 .../processors/cache/dr/GridCacheDrInfo.java    |  8 +--
 .../transactions/IgniteTxLocalAdapter.java      | 12 ++--
 4 files changed, 40 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/85e22a27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 8cf7285..fb549eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -65,10 +65,8 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridEmptyIterator;
@@ -1082,52 +1080,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         }
     }
 
-    /**
-     * Put entries to cache with conflict resolution.
-     *
-     * @param conflictMap Conflict map.
-     */
-    public void putAllConflict(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) {
-        try {
-            GridCacheGateway<K, V> gate = this.gate;
-
-            CacheOperationContext prev = onEnter(gate, opCtx);
-
-            try {
-                delegate.putAllConflict(conflictMap);
-            }
-            finally {
-                onLeave(gate, prev);
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw cacheException(e);
-        }
-    }
-
-    /**
-     * Remove entries from cache with conflict resolution.
-     *
-     * @param conflictMap Conflict map.
-     */
-    public void removeAllConflict(Map<KeyCacheObject, GridCacheVersion> conflictMap) {
-        try {
-            GridCacheGateway<K, V> gate = this.gate;
-
-            CacheOperationContext prev = onEnter(gate, opCtx);
-
-            try {
-                delegate.removeAllConflict(conflictMap);
-            }
-            finally {
-                onLeave(gate, prev);
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw cacheException(e);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public V getAndPut(K key, V val) {
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/85e22a27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2c45121..4353232 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -37,7 +37,6 @@ import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -99,6 +98,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -620,21 +620,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> conflictMap, Object... args)
-        throws IgniteCheckedException {
-        updateAllAsync0(null,
-            null,
-            args,
-            conflictMap,
-            null,
-            false,
-            false,
-            null,
-            true,
-            TRANSFORM).get();
-    }
-
-    /** {@inheritDoc} */
     @Override public V getAndRemove(K key) throws IgniteCheckedException {
         return getAndRemoveAsync(key).get();
     }
@@ -943,41 +928,43 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         ctx.checkSecurity(SecurityPermission.CACHE_PUT);
 
-        CacheOperationContext opCtx = ctx.operationContextPerCall();
+        final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         if (opCtx != null && opCtx.hasDataCenterId()) {
             assert conflictPutMap == null : conflictPutMap;
             assert conflictRmvMap == null : conflictRmvMap;
 
             if (op == GridCacheOperation.TRANSFORM) {
-                Map<KeyCacheObject, GridCacheDrInfo> confMap = new HashMap<>(invokeMap.size());
-
-                for (Map.Entry<? extends K, ? extends EntryProcessor> e : invokeMap.entrySet())
-                    confMap.put(ctx.toCacheKeyObject(e.getKey()), new GridCacheDrInfo((CacheEntryProcessor)e.getValue(),
-                        ctx.versions().next(opCtx.dataCenterId())));
+                assert invokeMap != null : invokeMap;
 
-                conflictPutMap = confMap;
+                conflictPutMap = F.viewReadOnly((Map)invokeMap,
+                    new IgniteClosure<EntryProcessor, GridCacheDrInfo>() {
+                        @Override public GridCacheDrInfo apply(EntryProcessor o) {
+                            return new GridCacheDrInfo(o, ctx.versions().next(opCtx.dataCenterId()));
+                        }
+                    });
 
                 invokeMap = null;
             }
             else if (op == GridCacheOperation.DELETE) {
-                Map<KeyCacheObject, GridCacheVersion> confMap = new HashMap<>(map.size());
-
-                for (K key : map.keySet())
-                    confMap.put(ctx.toCacheKeyObject(key), ctx.versions().next(opCtx.dataCenterId()));
+                assert map != null : map;
 
-                conflictRmvMap = confMap;
+                conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheVersion>() {
+                    @Override public GridCacheVersion apply(V o) {
+                        return ctx.versions().next(opCtx.dataCenterId());
+                    }
+                });
 
                 map = null;
             }
             else {
-                Map<KeyCacheObject, GridCacheDrInfo> confMap = new HashMap<>(map.size());
+                assert map != null : map;
 
-                for (Map.Entry<? extends K, ? extends V> e : map.entrySet())
-                    confMap.put(ctx.toCacheKeyObject(e.getKey()), new GridCacheDrInfo(
-                        ctx.toCacheObject(e.getValue()), ctx.versions().next(opCtx.dataCenterId())));
-
-                conflictPutMap = confMap;
+                conflictPutMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheDrInfo>() {
+                    @Override public GridCacheDrInfo apply(V o) {
+                        return new GridCacheDrInfo(ctx.toCacheObject(o), ctx.versions().next(opCtx.dataCenterId()));
+                    }
+                });
 
                 map = null;
             }
@@ -1048,23 +1035,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
 
-        CacheOperationContext opCtx = ctx.operationContextPerCall();
+        final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         UUID subjId = ctx.subjectIdPerCall(null, opCtx);
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
 
+        Collection<GridCacheVersion> drVers = null;
+
         if (opCtx != null && keys != null && opCtx.hasDataCenterId()) {
             assert conflictMap == null : conflictMap;
 
-            Map<KeyCacheObject, GridCacheVersion> confMap = new HashMap<>(keys.size());
-
-            for (K key : keys)
-                confMap.put(ctx.toCacheKeyObject(key), ctx.versions().next(opCtx.dataCenterId()));
-
-            conflictMap = confMap;
-
-            keys = null;
+            drVers = F.transform(keys, new C1<K, GridCacheVersion>() {
+                @Override public GridCacheVersion apply(K k) {
+                    return ctx.versions().next(opCtx.dataCenterId());
+                }
+            });
         }
 
         final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
@@ -1076,7 +1062,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             null,
             null,
             null,
-            keys != null ? null : conflictMap.values(),
+            drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
             retval,
             rawRetval,
             (filter != null && opCtx != null) ? opCtx.expiry() : null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/85e22a27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
index bd3df02..02bc6b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
@@ -21,7 +21,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import org.apache.ignite.cache.CacheEntryProcessor;
+import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -38,7 +38,7 @@ public class GridCacheDrInfo implements Externalizable {
     private CacheObject val;
 
     /** Entry processor. */
-    private CacheEntryProcessor proc;
+    private EntryProcessor proc;
 
     /** DR version. */
     private GridCacheVersion ver;
@@ -79,7 +79,7 @@ public class GridCacheDrInfo implements Externalizable {
      * @param proc Entry processor.
      * @param ver Version.
      */
-    public GridCacheDrInfo(CacheEntryProcessor proc, GridCacheVersion ver) {
+    public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) {
         assert proc != null;
         assert ver != null;
 
@@ -97,7 +97,7 @@ public class GridCacheDrInfo implements Externalizable {
     /**
      * @return Entry processor.
      */
-    public CacheEntryProcessor entryProcessor() {
+    public EntryProcessor entryProcessor() {
         return proc;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/85e22a27/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 0b19f30..32a9e27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -22,13 +22,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -3087,8 +3087,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
         if (opCtx != null && opCtx.hasDataCenterId()) {
             assert drMap == null : drMap;
+            assert map != null || invokeMap != null;
 
-            drMap = (Map<KeyCacheObject, GridCacheDrInfo>)F.viewReadOnly((Map<K, V>) map,
+            drMap = (Map<KeyCacheObject, GridCacheDrInfo>)F.viewReadOnly((Map)(map != null ? map : invokeMap),
                 new IgniteClosure<V, GridCacheDrInfo>() {
                     @Override public GridCacheDrInfo apply(V val) {
                         return new GridCacheDrInfo(cctx.versions().next(opCtx.dataCenterId()));
@@ -3348,10 +3349,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
         CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
 
-        if (drMap == null && opCtx != null && opCtx.hasDataCenterId()) {
-            assert drMap == null : drMap;
-
-            Map<K, GridCacheVersion> confMap = new TreeMap<>();
+        // DrMap used as map. Keys order doesn't matter.
+        if (keys != null && drMap == null && opCtx != null && opCtx.hasDataCenterId()) {
+            Map<K, GridCacheVersion> confMap = new HashMap<>(keys.size());
 
             for (K key : keys)
                 confMap.put(key, cacheCtx.versions().next(opCtx.dataCenterId()));