You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/25 13:31:20 UTC

[21/51] [abbrv] ignite git commit: ignite-2523 : SingleUpdateResponse implementation.

ignite-2523 : SingleUpdateResponse implementation.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c39410a2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c39410a2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c39410a2

Branch: refs/heads/ignite-2523
Commit: c39410a2dbbab7e8886a407a5661b29ee985adce
Parents: dfdd2f4
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Mon Feb 8 18:05:34 2016 +0300
Committer: Ilya Lantukh <il...@gridgain.com>
Committed: Mon Feb 8 18:05:34 2016 +0300

----------------------------------------------------------------------
 .../GridNearAtomicSingleUpdateResponse.java     | 195 ++++++++-----------
 1 file changed, 84 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c39410a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
index 581c33b..0bea0dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -38,11 +39,10 @@ import org.jetbrains.annotations.Nullable;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridNearAtomicUpdateResponse {
 
@@ -70,28 +70,21 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
     @GridToStringInclude
     private GridCacheReturn ret;
 
-    /** Failed keys. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private volatile Collection<KeyCacheObject> failedKeys;
+    private KeyCacheObject key;
 
-    /** Keys that should be remapped. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private List<KeyCacheObject> remapKeys;
+    private boolean failed;
 
-    /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
-    @GridDirectCollection(int.class)
-    private List<Integer> nearValsIdxs;
+    private boolean remap;
 
-    /** Indexes of keys for which update was skipped (used if originating node has near cache). */
-    @GridDirectCollection(int.class)
-    private List<Integer> nearSkipIdxs;
+    private boolean hasNearVal;
 
-    /** Values generated on primary node which should be put to originating node's near cache. */
-    @GridToStringInclude
-    @GridDirectCollection(CacheObject.class)
-    private List<CacheObject> nearVals;
+    private CacheObject nearVal;
+
+    private boolean nearSkip;
+
+    private long nearTtl = -1;
+
+    private long nearExpireTime = -1;
 
     /** Version generated on primary node to be used for originating node's near cache update. */
     private GridCacheVersion nearVer;
@@ -168,7 +161,10 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
      * @return Collection of failed keys.
      */
     @Override public Collection<KeyCacheObject> failedKeys() {
-        return failedKeys;
+        if (failed && key != null)
+            return Collections.singletonList(key);
+
+        return Collections.emptyList();
     }
 
     /**
@@ -190,14 +186,23 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
      * @param remapKeys Remap keys.
      */
     @Override public void remapKeys(List<KeyCacheObject> remapKeys) {
-        this.remapKeys = remapKeys;
+        assert remapKeys.size() <= 1;
+
+        if (remapKeys.isEmpty())
+            return;
+
+        key = remapKeys.get(0);
+        remap = true;
     }
 
     /**
      * @return Remap keys.
      */
     @Override public Collection<KeyCacheObject> remapKeys() {
-        return remapKeys;
+        if (remap && key != null)
+            return Collections.singletonList(key);
+
+        return Collections.emptyList();
     }
 
     /**
@@ -212,15 +217,13 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
         @Nullable CacheObject val,
         long ttl,
         long expireTime) {
-        if (nearValsIdxs == null) {
-            nearValsIdxs = new ArrayList<>();
-            nearVals = new ArrayList<>();
-        }
 
-        addNearTtl(keyIdx, ttl, expireTime);
+        assert keyIdx == 0;
 
-        nearValsIdxs.add(keyIdx);
-        nearVals.add(val);
+        nearVal = val;
+        hasNearVal = true;
+
+        addNearTtl(keyIdx, ttl, expireTime);
     }
 
     /**
@@ -230,29 +233,11 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
      */
     @Override @SuppressWarnings("ForLoopReplaceableByForEach")
     public void addNearTtl(int keyIdx, long ttl, long expireTime) {
-        if (ttl >= 0) {
-            if (nearTtls == null) {
-                nearTtls = new GridLongList(16);
+        assert keyIdx == 0;
 
-                for (int i = 0; i < keyIdx; i++)
-                    nearTtls.add(-1L);
-            }
-        }
-
-        if (nearTtls != null)
-            nearTtls.add(ttl);
+        nearTtl = ttl >= 0 ? ttl : -1;
 
-        if (expireTime >= 0) {
-            if (nearExpireTimes == null) {
-                nearExpireTimes = new GridLongList(16);
-
-                for (int i = 0; i < keyIdx; i++)
-                    nearExpireTimes.add(-1);
-            }
-        }
-
-        if (nearExpireTimes != null)
-            nearExpireTimes.add(expireTime);
+        nearExpireTime = expireTime >= 0 ? expireTime : -1;
     }
 
     /**
@@ -260,11 +245,8 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
      * @return Expire time for near cache update.
      */
     @Override public long nearExpireTime(int idx) {
-        if (nearExpireTimes != null) {
-            assert idx >= 0 && idx < nearExpireTimes.size();
-
-            return nearExpireTimes.get(idx);
-        }
+        if (idx == 0)
+            return nearExpireTime;
 
         return -1L;
     }
@@ -274,11 +256,8 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
      * @return TTL for near cache update.
      */
     @Override public long nearTtl(int idx) {
-        if (nearTtls != null) {
-            assert idx >= 0 && idx < nearTtls.size();
-
-            return nearTtls.get(idx);
-        }
+        if (idx == 0)
+            return nearTtl;
 
         return -1L;
     }
@@ -301,26 +280,31 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
      * @param keyIdx Index of key for which update was skipped
      */
     @Override public void addSkippedIndex(int keyIdx) {
-        if (nearSkipIdxs == null)
-            nearSkipIdxs = new ArrayList<>();
+        assert keyIdx == 0;
 
-        nearSkipIdxs.add(keyIdx);
-
-        addNearTtl(keyIdx, -1L, -1L);
+        nearSkip = true;
+        nearTtl = -1;
+        nearExpireTime = -1;
     }
 
     /**
      * @return Indexes of keys for which update was skipped
      */
     @Override @Nullable public List<Integer> skippedIndexes() {
-        return nearSkipIdxs;
+        if (nearSkip && key != null)
+            return Collections.singletonList(0);
+
+        return Collections.emptyList();
     }
 
     /**
      * @return Indexes of keys for which values were generated on primary node.
      */
     @Override @Nullable public List<Integer> nearValuesIndexes() {
-        return nearValsIdxs;
+        if (hasNearVal && key != null)
+            return Collections.singletonList(0);
+
+        return Collections.emptyList();
     }
 
     /**
@@ -328,7 +312,12 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
      * @return Value generated on primary node which should be put to originating node's near cache.
      */
     @Override @Nullable public CacheObject nearValue(int idx) {
-        return nearVals.get(idx);
+        assert idx == 0;
+
+        if (hasNearVal)
+            return nearVal;
+
+        return null;
     }
 
     /**
@@ -338,13 +327,10 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
      * @param e Error cause.
      */
     @Override public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
-        if (failedKeys == null)
-            failedKeys = new ConcurrentLinkedQueue<>();
-
-        failedKeys.add(key);
+        this.key = key;
+        failed = true;
 
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
+        err = new IgniteCheckedException("Failed to update keys on primary node.");
 
         err.addSuppressed(e);
     }
@@ -357,14 +343,13 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
      */
     @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
         if (keys != null) {
-            if (failedKeys == null)
-                failedKeys = new ArrayList<>(keys.size());
+            assert keys.size() <= 1;
 
-            failedKeys.addAll(keys);
+            if (keys.size() == 1)
+                key = F.first(keys);
         }
 
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
+        err = new IgniteCheckedException("Failed to update keys on primary node.");
 
         err.addSuppressed(e);
     }
@@ -378,15 +363,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
      */
     @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e,
         GridCacheContext ctx) {
-        if (failedKeys == null)
-            failedKeys = new ArrayList<>(keys.size());
-
-        failedKeys.addAll(keys);
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
-
-        err.addSuppressed(e);
+        addFailedKeys(keys, e);
     }
 
     /** {@inheritDoc}
@@ -399,11 +376,9 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        prepareMarshalCacheObjects(failedKeys, cctx);
+        prepareMarshalCacheObject(key, cctx);
 
-        prepareMarshalCacheObjects(remapKeys, cctx);
-
-        prepareMarshalCacheObjects(nearVals, cctx);
+        prepareMarshalCacheObject(nearVal, cctx);
 
         if (ret != null)
             ret.prepareMarshal(cctx);
@@ -418,11 +393,9 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
-
-        finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
+        finishUnmarshalCacheObject(key, cctx, ldr);
 
-        finishUnmarshalCacheObjects(nearVals, cctx, ldr);
+        finishUnmarshalCacheObject(nearVal, cctx, ldr);
 
         if (ret != null)
             ret.finishUnmarshal(cctx, ldr);
@@ -455,7 +428,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeBoolean("failed", failed))
                     return false;
 
                 writer.incrementState();
@@ -467,31 +440,31 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+                if (!writer.writeLong("nearExpireTime", nearExpireTime))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
+                if (!writer.writeBoolean("nearSkip", nearSkip))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeMessage("nearTtls", nearTtls))
+                if (!writer.writeLong("nearTtl", nearTtl))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("nearVal", nearVal))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
+                if (!writer.writeBoolean("hasNearVal", hasNearVal))
                     return false;
 
                 writer.incrementState();
@@ -503,7 +476,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("key", key))
                     return false;
 
                 writer.incrementState();
@@ -539,7 +512,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
                 reader.incrementState();
 
             case 4:
-                failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+                failed = reader.readBoolean("failed");
 
                 if (!reader.isLastRead())
                     return false;
@@ -555,7 +528,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
                 reader.incrementState();
 
             case 6:
-                nearExpireTimes = reader.readMessage("nearExpireTimes");
+                nearExpireTime = reader.readLong("nearExpireTime");
 
                 if (!reader.isLastRead())
                     return false;
@@ -563,7 +536,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
                 reader.incrementState();
 
             case 7:
-                nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
+                nearSkip = reader.readBoolean("nearSkip");
 
                 if (!reader.isLastRead())
                     return false;
@@ -571,7 +544,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
                 reader.incrementState();
 
             case 8:
-                nearTtls = reader.readMessage("nearTtls");
+                nearTtl = reader.readLong("nearTtl");
 
                 if (!reader.isLastRead())
                     return false;
@@ -579,7 +552,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
                 reader.incrementState();
 
             case 9:
-                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+                nearVal = reader.readMessage("nearVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -587,7 +560,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
                 reader.incrementState();
 
             case 10:
-                nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
+                hasNearVal = reader.readBoolean("hasNearVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -603,7 +576,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
                 reader.incrementState();
 
             case 12:
-                remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+                key = reader.readMessage("key");
 
                 if (!reader.isLastRead())
                     return false;