You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/25 13:31:20 UTC
[21/51] [abbrv] ignite git commit: ignite-2523 : SingleUpdateResponse
implementation.
ignite-2523 : SingleUpdateResponse implementation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c39410a2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c39410a2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c39410a2
Branch: refs/heads/ignite-2523
Commit: c39410a2dbbab7e8886a407a5661b29ee985adce
Parents: dfdd2f4
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Mon Feb 8 18:05:34 2016 +0300
Committer: Ilya Lantukh <il...@gridgain.com>
Committed: Mon Feb 8 18:05:34 2016 +0300
----------------------------------------------------------------------
.../GridNearAtomicSingleUpdateResponse.java | 195 ++++++++-----------
1 file changed, 84 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c39410a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
index 581c33b..0bea0dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -38,11 +39,10 @@ import org.jetbrains.annotations.Nullable;
import java.io.Externalizable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridNearAtomicUpdateResponse {
@@ -70,28 +70,21 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
@GridToStringInclude
private GridCacheReturn ret;
- /** Failed keys. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private volatile Collection<KeyCacheObject> failedKeys;
+ private KeyCacheObject key;
- /** Keys that should be remapped. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> remapKeys;
+ private boolean failed;
- /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
- @GridDirectCollection(int.class)
- private List<Integer> nearValsIdxs;
+ private boolean remap;
- /** Indexes of keys for which update was skipped (used if originating node has near cache). */
- @GridDirectCollection(int.class)
- private List<Integer> nearSkipIdxs;
+ private boolean hasNearVal;
- /** Values generated on primary node which should be put to originating node's near cache. */
- @GridToStringInclude
- @GridDirectCollection(CacheObject.class)
- private List<CacheObject> nearVals;
+ private CacheObject nearVal;
+
+ private boolean nearSkip;
+
+ private long nearTtl = -1;
+
+ private long nearExpireTime = -1;
/** Version generated on primary node to be used for originating node's near cache update. */
private GridCacheVersion nearVer;
@@ -168,7 +161,10 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
* @return Collection of failed keys.
*/
@Override public Collection<KeyCacheObject> failedKeys() {
- return failedKeys;
+ if (failed && key != null)
+ return Collections.singletonList(key);
+
+ return Collections.emptyList();
}
/**
@@ -190,14 +186,23 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
* @param remapKeys Remap keys.
*/
@Override public void remapKeys(List<KeyCacheObject> remapKeys) {
- this.remapKeys = remapKeys;
+ assert remapKeys.size() <= 1;
+
+ if (remapKeys.isEmpty())
+ return;
+
+ key = remapKeys.get(0);
+ remap = true;
}
/**
* @return Remap keys.
*/
@Override public Collection<KeyCacheObject> remapKeys() {
- return remapKeys;
+ if (remap && key != null)
+ return Collections.singletonList(key);
+
+ return Collections.emptyList();
}
/**
@@ -212,15 +217,13 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
@Nullable CacheObject val,
long ttl,
long expireTime) {
- if (nearValsIdxs == null) {
- nearValsIdxs = new ArrayList<>();
- nearVals = new ArrayList<>();
- }
- addNearTtl(keyIdx, ttl, expireTime);
+ assert keyIdx == 0;
- nearValsIdxs.add(keyIdx);
- nearVals.add(val);
+ nearVal = val;
+ hasNearVal = true;
+
+ addNearTtl(keyIdx, ttl, expireTime);
}
/**
@@ -230,29 +233,11 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
*/
@Override @SuppressWarnings("ForLoopReplaceableByForEach")
public void addNearTtl(int keyIdx, long ttl, long expireTime) {
- if (ttl >= 0) {
- if (nearTtls == null) {
- nearTtls = new GridLongList(16);
+ assert keyIdx == 0;
- for (int i = 0; i < keyIdx; i++)
- nearTtls.add(-1L);
- }
- }
-
- if (nearTtls != null)
- nearTtls.add(ttl);
+ nearTtl = ttl >= 0 ? ttl : -1;
- if (expireTime >= 0) {
- if (nearExpireTimes == null) {
- nearExpireTimes = new GridLongList(16);
-
- for (int i = 0; i < keyIdx; i++)
- nearExpireTimes.add(-1);
- }
- }
-
- if (nearExpireTimes != null)
- nearExpireTimes.add(expireTime);
+ nearExpireTime = expireTime >= 0 ? expireTime : -1;
}
/**
@@ -260,11 +245,8 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
* @return Expire time for near cache update.
*/
@Override public long nearExpireTime(int idx) {
- if (nearExpireTimes != null) {
- assert idx >= 0 && idx < nearExpireTimes.size();
-
- return nearExpireTimes.get(idx);
- }
+ if (idx == 0)
+ return nearExpireTime;
return -1L;
}
@@ -274,11 +256,8 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
* @return TTL for near cache update.
*/
@Override public long nearTtl(int idx) {
- if (nearTtls != null) {
- assert idx >= 0 && idx < nearTtls.size();
-
- return nearTtls.get(idx);
- }
+ if (idx == 0)
+ return nearTtl;
return -1L;
}
@@ -301,26 +280,31 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
* @param keyIdx Index of key for which update was skipped
*/
@Override public void addSkippedIndex(int keyIdx) {
- if (nearSkipIdxs == null)
- nearSkipIdxs = new ArrayList<>();
+ assert keyIdx == 0;
- nearSkipIdxs.add(keyIdx);
-
- addNearTtl(keyIdx, -1L, -1L);
+ nearSkip = true;
+ nearTtl = -1;
+ nearExpireTime = -1;
}
/**
* @return Indexes of keys for which update was skipped
*/
@Override @Nullable public List<Integer> skippedIndexes() {
- return nearSkipIdxs;
+ if (nearSkip && key != null)
+ return Collections.singletonList(0);
+
+ return Collections.emptyList();
}
/**
* @return Indexes of keys for which values were generated on primary node.
*/
@Override @Nullable public List<Integer> nearValuesIndexes() {
- return nearValsIdxs;
+ if (hasNearVal && key != null)
+ return Collections.singletonList(0);
+
+ return Collections.emptyList();
}
/**
@@ -328,7 +312,12 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
* @return Value generated on primary node which should be put to originating node's near cache.
*/
@Override @Nullable public CacheObject nearValue(int idx) {
- return nearVals.get(idx);
+ assert idx == 0;
+
+ if (hasNearVal)
+ return nearVal;
+
+ return null;
}
/**
@@ -338,13 +327,10 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
* @param e Error cause.
*/
@Override public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
- if (failedKeys == null)
- failedKeys = new ConcurrentLinkedQueue<>();
-
- failedKeys.add(key);
+ this.key = key;
+ failed = true;
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
err.addSuppressed(e);
}
@@ -357,14 +343,13 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
*/
@Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
if (keys != null) {
- if (failedKeys == null)
- failedKeys = new ArrayList<>(keys.size());
+ assert keys.size() <= 1;
- failedKeys.addAll(keys);
+ if (keys.size() == 1)
+ key = F.first(keys);
}
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
err.addSuppressed(e);
}
@@ -378,15 +363,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
*/
@Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e,
GridCacheContext ctx) {
- if (failedKeys == null)
- failedKeys = new ArrayList<>(keys.size());
-
- failedKeys.addAll(keys);
-
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
-
- err.addSuppressed(e);
+ addFailedKeys(keys, e);
}
/** {@inheritDoc}
@@ -399,11 +376,9 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
GridCacheContext cctx = ctx.cacheContext(cacheId);
- prepareMarshalCacheObjects(failedKeys, cctx);
+ prepareMarshalCacheObject(key, cctx);
- prepareMarshalCacheObjects(remapKeys, cctx);
-
- prepareMarshalCacheObjects(nearVals, cctx);
+ prepareMarshalCacheObject(nearVal, cctx);
if (ret != null)
ret.prepareMarshal(cctx);
@@ -418,11 +393,9 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
GridCacheContext cctx = ctx.cacheContext(cacheId);
- finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
-
- finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
+ finishUnmarshalCacheObject(key, cctx, ldr);
- finishUnmarshalCacheObjects(nearVals, cctx, ldr);
+ finishUnmarshalCacheObject(nearVal, cctx, ldr);
if (ret != null)
ret.finishUnmarshal(cctx, ldr);
@@ -455,7 +428,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
writer.incrementState();
case 4:
- if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeBoolean("failed", failed))
return false;
writer.incrementState();
@@ -467,31 +440,31 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
writer.incrementState();
case 6:
- if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+ if (!writer.writeLong("nearExpireTime", nearExpireTime))
return false;
writer.incrementState();
case 7:
- if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
+ if (!writer.writeBoolean("nearSkip", nearSkip))
return false;
writer.incrementState();
case 8:
- if (!writer.writeMessage("nearTtls", nearTtls))
+ if (!writer.writeLong("nearTtl", nearTtl))
return false;
writer.incrementState();
case 9:
- if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("nearVal", nearVal))
return false;
writer.incrementState();
case 10:
- if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
+ if (!writer.writeBoolean("hasNearVal", hasNearVal))
return false;
writer.incrementState();
@@ -503,7 +476,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
writer.incrementState();
case 12:
- if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("key", key))
return false;
writer.incrementState();
@@ -539,7 +512,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
reader.incrementState();
case 4:
- failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+ failed = reader.readBoolean("failed");
if (!reader.isLastRead())
return false;
@@ -555,7 +528,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
reader.incrementState();
case 6:
- nearExpireTimes = reader.readMessage("nearExpireTimes");
+ nearExpireTime = reader.readLong("nearExpireTime");
if (!reader.isLastRead())
return false;
@@ -563,7 +536,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
reader.incrementState();
case 7:
- nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
+ nearSkip = reader.readBoolean("nearSkip");
if (!reader.isLastRead())
return false;
@@ -571,7 +544,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
reader.incrementState();
case 8:
- nearTtls = reader.readMessage("nearTtls");
+ nearTtl = reader.readLong("nearTtl");
if (!reader.isLastRead())
return false;
@@ -579,7 +552,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
reader.incrementState();
case 9:
- nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+ nearVal = reader.readMessage("nearVal");
if (!reader.isLastRead())
return false;
@@ -587,7 +560,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
reader.incrementState();
case 10:
- nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
+ hasNearVal = reader.readBoolean("hasNearVal");
if (!reader.isLastRead())
return false;
@@ -603,7 +576,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme
reader.incrementState();
case 12:
- remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+ key = reader.readMessage("key");
if (!reader.isLastRead())
return false;