You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/07/30 03:02:04 UTC
[1/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-104 e6f0ac3c4 -> aa11f6446
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/03e48ba8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/03e48ba8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/03e48ba8
Branch: refs/heads/ignite-104
Commit: 03e48ba8ccb417cfd1f512f2bd1efe6ceb6c86fa
Parents: e6f0ac3
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Tue Jul 28 22:19:49 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Tue Jul 28 22:19:49 2015 -0700
----------------------------------------------------------------------
.../cache/distributed/dht/atomic/GridDhtAtomicCache.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03e48ba8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index bb036da..7a8cc06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1764,7 +1764,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtFut.addNearWriteEntries(filteredReaders,
entry,
updRes.newValue(),
- null,
+ op == TRANSFORM ? req.entryProcessor(i) : null,
updRes.newTtl(),
updRes.conflictExpireTime());
}
[7/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates
Posted by vk...@apache.org.
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/112c567c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/112c567c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/112c567c
Branch: refs/heads/ignite-104
Commit: 112c567cb780fc04a0d81c9b8b1e1f60cd2fbabf
Parents: 9781ea4
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 17:58:33 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 17:58:33 2015 -0700
----------------------------------------------------------------------
.../dht/atomic/GridNearAtomicUpdateRequest.java | 48 +++++++-------------
.../atomic/GridNearAtomicUpdateResponse.java | 20 ++------
2 files changed, 20 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/112c567c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 734cf6d..86c5ab8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -666,60 +666,54 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
writer.incrementState();
case 16:
-// if (!writer.writeInt("part", part))
-// return false;
-
- writer.incrementState();
-
- case 17:
if (!writer.writeBoolean("retval", retval))
return false;
writer.incrementState();
- case 18:
+ case 17:
if (!writer.writeBoolean("skipStore", skipStore))
return false;
writer.incrementState();
- case 19:
+ case 18:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 20:
+ case 19:
if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
- case 21:
+ case 20:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 22:
+ case 21:
if (!writer.writeBoolean("topLocked", topLocked))
return false;
writer.incrementState();
- case 23:
+ case 22:
if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
- case 24:
+ case 23:
if (!writer.writeMessage("updateVer", updateVer))
return false;
writer.incrementState();
- case 25:
+ case 24:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
@@ -850,14 +844,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 16:
-// part = reader.readInt("part");
-//
-// if (!reader.isLastRead())
-// return false;
-
- reader.incrementState();
-
- case 17:
retval = reader.readBoolean("retval");
if (!reader.isLastRead())
@@ -865,7 +851,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 18:
+ case 17:
skipStore = reader.readBoolean("skipStore");
if (!reader.isLastRead())
@@ -873,7 +859,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 19:
+ case 18:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -881,7 +867,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 20:
+ case 19:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -893,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 21:
+ case 20:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -901,7 +887,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 22:
+ case 21:
topLocked = reader.readBoolean("topLocked");
if (!reader.isLastRead())
@@ -909,7 +895,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 23:
+ case 22:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -917,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 24:
+ case 23:
updateVer = reader.readMessage("updateVer");
if (!reader.isLastRead())
@@ -925,7 +911,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 25:
+ case 24:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -945,7 +931,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 26;
+ return 25;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/112c567c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 2b30536..8e1bee2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -485,18 +485,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
case 12:
-// if (!writer.writeInt("part", part))
-// return false;
-
- writer.incrementState();
-
- case 13:
if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 14:
+ case 13:
if (!writer.writeMessage("ret", ret))
return false;
@@ -591,14 +585,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 12:
-// part = reader.readInt("part");
-//
-// if (!reader.isLastRead())
-// return false;
-
- reader.incrementState();
-
- case 13:
remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -606,7 +592,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
- case 14:
+ case 13:
ret = reader.readMessage("ret");
if (!reader.isLastRead())
@@ -626,7 +612,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 15;
+ return 14;
}
/** {@inheritDoc} */
[5/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates
Posted by vk...@apache.org.
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7c73fc5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7c73fc5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7c73fc5d
Branch: refs/heads/ignite-104
Commit: 7c73fc5d8d81f3cded6bffd4dcf3d1e48ad84d64
Parents: e5c69b8
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 17:26:44 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 17:26:44 2015 -0700
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 9 ++-------
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 3 ---
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 19 ++-----------------
.../distributed/near/GridNearAtomicCache.java | 4 ++--
4 files changed, 6 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 02e48df..31606b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1767,7 +1767,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtFut.addNearWriteEntries(filteredReaders,
entry,
updRes.newValue(),
- op == TRANSFORM ? req.entryProcessor(i) : null,
updRes.newTtl(),
updRes.conflictExpireTime());
}
@@ -2034,13 +2033,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (dhtFut != null) {
- EntryProcessor<Object, Object, Object> entryProcessor =
- entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
-
if (!batchRes.readersOnly())
dhtFut.addWriteEntry(entry,
writeVal,
- entryProcessor,
+ entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()),
updRes.newTtl(),
CU.EXPIRE_TIME_CALCULATE,
null);
@@ -2049,7 +2045,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtFut.addNearWriteEntries(filteredReaders,
entry,
writeVal,
- entryProcessor,
updRes.newTtl(),
CU.EXPIRE_TIME_CALCULATE);
}
@@ -2465,7 +2460,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/*event*/true,
/*metrics*/true,
/*primary*/false,
- /*check version*/!req.forceTransformBackups(),
+ /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
req.topologyVersion(),
CU.empty0(),
replicate ? DR_BACKUP : DR_NONE,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 15ec121..ab0c2e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -270,14 +270,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
* @param readers Entry readers.
* @param entry Entry.
* @param val Value.
- * @param entryProcessor Entry processor..
* @param ttl TTL for near cache update (optional).
* @param expireTime Expire time for near cache update (optional).
*/
public void addNearWriteEntries(Iterable<UUID> readers,
GridDhtCacheEntry entry,
@Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long expireTime) {
CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
@@ -323,7 +321,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
updateReq.addNearWriteValue(entry.key(),
val,
- entryProcessor,
ttl,
expireTime);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7149dec..6340c93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -267,36 +267,21 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/**
* @param key Key to add.
* @param val Value, {@code null} if should be removed.
- * @param entryProcessor Entry processor.
* @param ttl TTL.
* @param expireTime Expire time.
*/
public void addNearWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long expireTime)
{
if (nearKeys == null) {
nearKeys = new ArrayList<>();
-
- if (forceTransformBackups) {
- nearEntryProcessors = new ArrayList<>();
- nearEntryProcessorsBytes = new ArrayList<>();
- }
- else
- nearVals = new ArrayList<>();
+ nearVals = new ArrayList<>();
}
nearKeys.add(key);
-
- if (forceTransformBackups) {
- assert entryProcessor != null;
-
- nearEntryProcessors.add(entryProcessor);
- }
- else
- nearVals.add(val);
+ nearVals.add(val);
if (ttl >= 0) {
if (nearTtls == null) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 0aa1638..707facc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -225,7 +225,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
/*write-through*/false,
/*read-through*/false,
/*retval*/false,
- /**expiry policy*/null,
+ /*expiry policy*/null,
/*event*/true,
/*metrics*/true,
/*primary*/false,
@@ -336,7 +336,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
/*event*/true,
/*metrics*/true,
/*primary*/false,
- /*check version*/!req.forceTransformBackups(),
+ /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
req.topologyVersion(),
CU.empty0(),
DR_NONE,
[8/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates
Posted by vk...@apache.org.
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/aa11f644
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/aa11f644
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/aa11f644
Branch: refs/heads/ignite-104
Commit: aa11f6446f14174e0ca4e67b85b1403ec6ed7016
Parents: 112c567
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 18:01:36 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 18:01:36 2015 -0700
----------------------------------------------------------------------
.../dht/atomic/GridAtomicMappingKey.java | 86 -------------------
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 87 +++++++++++++++++---
2 files changed, 75 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa11f644/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
deleted file mode 100644
index 52e3c7f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-
-/**
- * Mapping Key.
- */
-class GridAtomicMappingKey {
- /** Node ID. */
- private final UUID nodeId;
-
- /** Partition. */
- private final int part;
-
- /**
- * @param nodeId Node ID.
- * @param part Partition.
- */
- GridAtomicMappingKey(UUID nodeId, int part) {
- assert nodeId != null;
- assert part >= -1 : part;
-
- this.nodeId = nodeId;
- this.part = part;
- }
-
- /**
- * @return Node ID.
- */
- UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @return Partition.
- */
- int partition() {
- return part;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- GridAtomicMappingKey key = (GridAtomicMappingKey)o;
-
- return nodeId.equals(key.nodeId) && part == key.part;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = nodeId.hashCode();
-
- res = 31 * res + part;
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridAtomicMappingKey.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa11f644/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 8595dc7..93c20da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -73,7 +73,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** Mappings. */
@GridToStringInclude
- private ConcurrentMap<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
+ private ConcurrentMap<MappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
/** Entries with readers. */
private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
@@ -142,8 +142,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** {@inheritDoc} */
@Override public Collection<? extends ClusterNode> nodes() {
- return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() {
- @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) {
+ return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() {
+ @Override public ClusterNode apply(MappingKey mappingKey) {
return cctx.kernalContext().discovery().node(mappingKey.nodeId());
}
}), F.notNull());
@@ -154,15 +154,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
if (log.isDebugEnabled())
log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']');
- Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size());
+ Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size());
- for (GridAtomicMappingKey mappingKey : mappings.keySet()) {
+ for (MappingKey mappingKey : mappings.keySet()) {
if (mappingKey.nodeId().equals(nodeId))
mappingKeys.add(mappingKey);
}
if (!mappingKeys.isEmpty()) {
- for (GridAtomicMappingKey mappingKey : mappingKeys)
+ for (MappingKey mappingKey : mappingKeys)
mappings.remove(mappingKey);
checkComplete();
@@ -234,7 +234,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
for (ClusterNode node : dhtNodes) {
UUID nodeId = node.id();
- GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
+ MappingKey mappingKey = new MappingKey(nodeId, part);
if (!nodeId.equals(cctx.localNodeId())) {
GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
@@ -287,7 +287,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1;
for (UUID nodeId : readers) {
- GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
+ MappingKey mappingKey = new MappingKey(nodeId, part);
GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
@@ -345,8 +345,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
*/
public void map() {
if (!mappings.isEmpty()) {
- for (Map.Entry<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
- GridAtomicMappingKey mappingKey = e.getKey();
+ for (Map.Entry<MappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
+ MappingKey mappingKey = e.getKey();
GridDhtAtomicUpdateRequest req = e.getValue();
UUID nodeId = mappingKey.nodeId();
@@ -429,7 +429,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
}
}
- mappings.remove(new GridAtomicMappingKey(nodeId, updateRes.partition()));
+ mappings.remove(new MappingKey(nodeId, updateRes.partition()));
checkComplete();
}
@@ -445,7 +445,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
for (Integer part : res.partitions())
- mappings.remove(new GridAtomicMappingKey(nodeId, part));
+ mappings.remove(new MappingKey(nodeId, part));
checkComplete();
}
@@ -468,4 +468,67 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
return S.toString(GridDhtAtomicUpdateFuture.class, this);
}
+ /**
+ * Mapping Key.
+ */
+ private static class MappingKey {
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Partition. */
+ private final int part;
+
+ /**
+ * @param nodeId Node ID.
+ * @param part Partition.
+ */
+ MappingKey(UUID nodeId, int part) {
+ assert nodeId != null;
+ assert part >= -1 : part;
+
+ this.nodeId = nodeId;
+ this.part = part;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Partition.
+ */
+ int partition() {
+ return part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ MappingKey key = (MappingKey)o;
+
+ return nodeId.equals(key.nodeId) && part == key.part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+
+ res = 31 * res + part;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MappingKey.class, this);
+ }
+ }
}
[2/8] incubator-ignite git commit: #ignite-gg-10610: Security hole if
DataStreamer is used for populating the cache
Posted by vk...@apache.org.
#ignite-gg-10610: Security hole if DataStreamer is used for populating the cache
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5288b2d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5288b2d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5288b2d8
Branch: refs/heads/ignite-104
Commit: 5288b2d8b882bbb86d69e1019821d51803685861
Parents: a127756
Author: ivasilinets <iv...@gridgain.com>
Authored: Wed Jul 29 15:27:31 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Wed Jul 29 15:27:31 2015 +0300
----------------------------------------------------------------------
.../datastreamer/DataStreamerImpl.java | 22 ++++++++++++++++++++
.../datastreamer/DataStreamerUpdateJob.java | 20 +++++++++++++++++-
2 files changed, 41 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5288b2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 605f478..5fae676 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.security.*;
import org.apache.ignite.stream.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
@@ -413,6 +414,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
@Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
A.notEmpty(entries, "entries");
+ checkSecurityPermission(SecurityPermission.CACHE_PUT);
+
enterBusy();
try {
@@ -520,6 +523,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
@Override public IgniteFuture<?> addData(K key, V val) {
A.notNull(key, "key");
+ if (val == null)
+ checkSecurityPermission(SecurityPermission.CACHE_REMOVE);
+ else
+ checkSecurityPermission(SecurityPermission.CACHE_PUT);
+
KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key, true);
CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
@@ -980,6 +988,20 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
/**
+ * Check permissions for streaming.
+ *
+ * @param perm Security permission.
+ * @throws org.apache.ignite.plugin.security.SecurityException If permissions are not enough for streaming.
+ */
+ private void checkSecurityPermission(SecurityPermission perm)
+ throws org.apache.ignite.plugin.security.SecurityException{
+ if (!ctx.security().enabled())
+ return;
+
+ ctx.security().authorize(cacheName, perm, null);
+ }
+
+ /**
*
*/
private class Buffer {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5288b2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
index 21ba3ac..9e0703a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.plugin.security.*;
import org.apache.ignite.stream.*;
import org.jetbrains.annotations.*;
@@ -106,8 +107,13 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> {
CacheObject val = e.getValue();
- if (val != null)
+ if (val != null) {
+ checkSecurityPermission(SecurityPermission.CACHE_PUT);
+
val.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
+ }
+ else
+ checkSecurityPermission(SecurityPermission.CACHE_REMOVE);
}
if (unwrapEntries()) {
@@ -139,4 +145,16 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> {
private boolean unwrapEntries() {
return !(rcvr instanceof DataStreamerCacheUpdaters.InternalUpdater);
}
+
+ /**
+ * @param perm Security permission.
+ * @throws org.apache.ignite.plugin.security.SecurityException If permission is not enough.
+ */
+ private void checkSecurityPermission(SecurityPermission perm)
+ throws org.apache.ignite.plugin.security.SecurityException {
+ if (!ctx.security().enabled())
+ return;
+
+ ctx.security().authorize(cacheName, perm, null);
+ }
}
[3/8] incubator-ignite git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104
Posted by vk...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/424ab07c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/424ab07c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/424ab07c
Branch: refs/heads/ignite-104
Commit: 424ab07cb3dff8f7aeafeb1de5af1af8045145e3
Parents: 03e48ba 5288b2d
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 12:39:02 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 12:39:02 2015 -0700
----------------------------------------------------------------------
.../datastreamer/DataStreamerImpl.java | 22 ++++++++++++++++++++
.../datastreamer/DataStreamerUpdateJob.java | 20 +++++++++++++++++-
2 files changed, 41 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[4/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates
Posted by vk...@apache.org.
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e5c69b83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e5c69b83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e5c69b83
Branch: refs/heads/ignite-104
Commit: e5c69b831a8f564440bd0960cc2a865cd907525a
Parents: 424ab07
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 14:19:24 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 14:19:24 2015 -0700
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 9 ++++--
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 33 +++++++++++++++-----
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 7 ++++-
3 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7a8cc06..02e48df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1178,6 +1178,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
e.printStackTrace();
}
finally {
+ if (dhtFut != null && !remap)
+ dhtFut.map();
+
if (locked != null)
unlockEntries(locked, req.topologyVersion());
@@ -1221,8 +1224,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else {
// If there are backups, map backup update future.
if (dhtFut != null)
- dhtFut.map();
- // Otherwise, complete the call.
+ dhtFut.onMapped();
+ // Otherwise, complete the call.
else
completionCb.apply(req, res);
}
@@ -2523,7 +2526,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
catch (ClusterTopologyCheckedException ignored) {
U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " +
- req.nodeId());
+ nodeId);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 3a68263..15ec121 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -90,6 +90,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** */
private boolean waitForExchange;
+ /** */
+ private boolean mapped;
+
/**
* @param cctx Cache context.
* @param completionCb Callback to invoke when future is completed.
@@ -349,37 +352,51 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
GridAtomicMappingKey mappingKey = e.getKey();
GridDhtAtomicUpdateRequest req = e.getValue();
+ UUID nodeId = mappingKey.nodeId();
+ int part = mappingKey.partition();
+
+ assert !nodeId.equals(cctx.localNodeId());
+
try {
if (log.isDebugEnabled())
- log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+ log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
- if (mappingKey.partition() >= 0) {
- Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), false);
+ if (part >= 0) {
+ Object topic = CU.partitionMessageTopic(cctx, part, false);
- cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(),
+ cctx.io().sendOrderedMessage(nodeId, topic, req, cctx.ioPolicy(),
2 * cctx.gridConfig().getNetworkTimeout());
}
else {
- assert mappingKey.partition() == -1;
+ assert part == -1;
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ cctx.io().send(nodeId, req, cctx.ioPolicy());
}
}
catch (ClusterTopologyCheckedException ignored) {
U.warn(log, "Failed to send update request to backup node because it left grid: " +
- req.nodeId());
+ nodeId);
mappings.remove(mappingKey);
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to send update request to backup node (did node leave the grid?): "
- + req.nodeId(), ex);
+ + nodeId, ex);
mappings.remove(mappingKey);
}
}
}
+ mapped = true;
+ }
+
+ /**
+ * On mapped callback.
+ */
+ public void onMapped() {
+ assert mapped;
+
checkComplete();
// Send response right away if no ACKs from backup is required.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 35c6910..7149dec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -44,7 +44,12 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** Message index. */
public static final int CACHE_MSG_IDX = nextIndexId();
- /** Node ID. */
+ /**
+ * Node ID.
+ *
+ * @deprecated Not used anymore, but removal will break compatibility.
+ */
+ @Deprecated
private UUID nodeId;
/** Future version. */
[6/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates
Posted by vk...@apache.org.
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9781ea43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9781ea43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9781ea43
Branch: refs/heads/ignite-104
Commit: 9781ea4384a553e5126b8a7320f7070f6a340809
Parents: 7c73fc5
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 17:57:49 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 17:57:49 2015 -0700
----------------------------------------------------------------------
.../org/apache/ignite/internal/GridTopic.java | 17 +-
.../processors/cache/GridCacheIoManager.java | 3 +-
.../processors/cache/GridCacheUtils.java | 4 +-
.../dht/atomic/GridDhtAtomicCache.java | 29 ++--
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 172 ++++++-------------
.../dht/atomic/GridNearAtomicUpdateRequest.java | 27 +--
.../atomic/GridNearAtomicUpdateResponse.java | 28 +--
8 files changed, 87 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 3cf92f8..e9da40c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -185,11 +185,10 @@ public enum GridTopic {
/**
* @param id1 ID1.
* @param id2 ID2.
- * @param id3 ID3.
* @return Grid message topic with specified IDs.
*/
- public Object topic(int id1, int id2, byte id3) {
- return new T9(this, id1, id2, id3);
+ public Object topic(int id1, int id2) {
+ return new T9(this, id1, id2);
}
/**
@@ -782,9 +781,6 @@ public enum GridTopic {
/** */
private int id2;
- /** */
- private int id3;
-
/**
* No-arg constructor needed for {@link Serializable}.
*/
@@ -796,13 +792,11 @@ public enum GridTopic {
* @param topic Topic.
* @param id1 ID1.
* @param id2 ID2.
- * @param id3 ID3.
*/
- private T9(GridTopic topic, int id1, int id2, byte id3) {
+ private T9(GridTopic topic, int id1, int id2) {
this.topic = topic;
this.id1 = id1;
this.id2 = id2;
- this.id3 = id3;
}
/** {@inheritDoc} */
@@ -811,7 +805,6 @@ public enum GridTopic {
res += 31 * res + id1;
res += 31 * res + id2;
- res += 31 * res + id3;
return res;
}
@@ -821,7 +814,7 @@ public enum GridTopic {
if (obj.getClass() == T9.class) {
T9 that = (T9)obj;
- return topic == that.topic && id1 == that.id1 && id2 == that.id2 && id3 == that.id3;
+ return topic == that.topic && id1 == that.id1 && id2 == that.id2;
}
return false;
@@ -832,7 +825,6 @@ public enum GridTopic {
out.writeByte(topic.ordinal());
out.writeInt(id1);
out.writeInt(id2);
- out.writeByte(id3);
}
/** {@inheritDoc} */
@@ -840,7 +832,6 @@ public enum GridTopic {
topic = fromOrdinal(in.readByte());
id1 = in.readInt();
id2 = in.readInt();
- id3 = in.readByte();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index dec6aef..5858424 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -394,8 +394,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
nodeId,
- req.futureVersion(),
- req.partition());
+ req.futureVersion());
res.error(req.classError());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 96df7c5..d82acca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1739,9 +1739,9 @@ public class GridCacheUtils {
* @param part Partition.
* @return Per-partition message topic.
*/
- public static Object partitionMessageTopic(GridCacheContext ctx, int part, boolean nearMsg) {
+ public static Object partitionMessageTopic(GridCacheContext ctx, int part) {
assert part >= 0;
- return TOPIC_CACHE.topic(ctx.cacheId(), part, (byte)(nearMsg ? 1 : 0));
+ return TOPIC_CACHE.topic(ctx.cacheId(), part);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 31606b2..3084e68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -181,15 +181,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
+ ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ processNearAtomicUpdateRequest(nodeId, req);
+ }
+ });
+
if (ctx.config().isAtomicOrderedUpdates()) {
for (int part = 0; part < ctx.affinity().partitions(); part++) {
- ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, true), new CI2<UUID, GridNearAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
- processNearAtomicUpdateRequest(nodeId, req);
- }
- });
-
- ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, false), new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+ ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part), new CI2<UUID, GridDhtAtomicUpdateRequest>() {
@Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
processDhtAtomicUpdateRequest(nodeId, req);
}
@@ -197,12 +197,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
else {
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
- processNearAtomicUpdateRequest(nodeId, req);
- }
- });
-
ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
@Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
processDhtAtomicUpdateRequest(nodeId, req);
@@ -244,10 +238,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
buf.finish();
if (ctx.config().isAtomicOrderedUpdates()) {
- for (int part = 0; part < ctx.affinity().partitions(); part++) {
- ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part, true));
- ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part, false));
- }
+ for (int part = 0; part < ctx.affinity().partitions(); part++)
+ ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part));
}
}
@@ -1041,8 +1033,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId,
- req.futureVersion(),
- req.partition());
+ req.futureVersion());
List<KeyCacheObject> keys = req.keys();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index ab0c2e1..8595dc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -359,7 +359,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
if (part >= 0) {
- Object topic = CU.partitionMessageTopic(cctx, part, false);
+ Object topic = CU.partitionMessageTopic(cctx, part);
cctx.io().sendOrderedMessage(nodeId, topic, req, cctx.ioPolicy(),
2 * cctx.gridConfig().getNetworkTimeout());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 9b2a5e2..4c8a161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** Mappings. */
@GridToStringInclude
- private ConcurrentMap<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings;
+ private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
/** Error. */
private volatile CachePartialUpdateCheckedException err;
@@ -246,11 +246,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** {@inheritDoc} */
@Override public Collection<? extends ClusterNode> nodes() {
- return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() {
- @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) {
- return cctx.kernalContext().discovery().node(mappingKey.nodeId());
- }
- }), F.notNull());
+ return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
}
/**
@@ -287,24 +283,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return false;
}
- Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size());
- Collection<KeyCacheObject> failedKeys = new ArrayList<>();
-
- for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
- if (e.getKey().nodeId().equals(nodeId)) {
- mappingKeys.add(e.getKey());
-
- failedKeys.addAll(e.getValue().keys());
- }
- }
+ GridNearAtomicUpdateRequest req = mappings.get(nodeId);
- if (!mappingKeys.isEmpty()) {
- if (!failedKeys.isEmpty())
- addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " +
- "response is received: " + nodeId));
+ if (req != null) {
+ addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " +
+ "received: " + nodeId));
- for (GridAtomicMappingKey key : mappingKeys)
- mappings.remove(key);
+ mappings.remove(nodeId);
checkComplete();
@@ -544,9 +529,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
}
else {
- GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, res.partition());
-
- GridNearAtomicUpdateRequest req = mappings.get(mappingKey);
+ GridNearAtomicUpdateRequest req = mappings.get(nodeId);
if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft.
updateNear(req, res);
@@ -564,7 +547,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
opRes = ret;
}
- mappings.remove(mappingKey);
+ mappings.remove(nodeId);
}
checkComplete();
@@ -780,11 +763,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
- int part = cctx.affinity().partition(cacheKey);
- ClusterNode primary = cctx.affinity().primary(part, topVer);
-
- if (!ccfg.isAtomicOrderedUpdates())
- part = -1;
+ ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
if (primary == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
@@ -810,8 +789,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
subjId,
taskNameHash,
skipStore,
- cctx.kernalContext().clientNode(),
- part);
+ cctx.kernalContext().clientNode());
req.addUpdateEntry(cacheKey,
val,
@@ -827,7 +805,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
// Optimize mapping for single key.
- mapSingle(new GridAtomicMappingKey(primary.id(), part), req);
+ mapSingle(primary.id(), req);
return;
}
@@ -847,18 +825,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (conflictRmvVals != null)
conflictRmvValsIt = conflictRmvVals.iterator();
- Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
// Must do this in synchronized block because we need to atomically remove and add mapping.
// Otherwise checkComplete() may see empty intermediate state.
synchronized (this) {
- if (oldNodeId != null) {
- // TODO: IGNITE-104 - Try to avoid iteration.
- for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
- if (e.getKey().nodeId().equals(oldNodeId))
- mappings.remove(e.getKey());
- }
- }
+ if (oldNodeId != null)
+ removeMapping(oldNodeId);
// For fastMap mode wait for all responses before remapping.
if (remap && fastMap && !mappings.isEmpty()) {
@@ -928,10 +901,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
- T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap);
-
- int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1;
- Collection<ClusterNode> affNodes = t.get2();
+ Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
if (affNodes.isEmpty()) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
@@ -952,9 +922,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
UUID nodeId = affNode.id();
- GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
-
- GridNearAtomicUpdateRequest mapped = pendingMappings.get(mappingKey);
+ GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
if (mapped == null) {
mapped = new GridNearAtomicUpdateRequest(
@@ -974,12 +942,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
subjId,
taskNameHash,
skipStore,
- cctx.kernalContext().clientNode(),
- part);
+ cctx.kernalContext().clientNode());
- pendingMappings.put(mappingKey, mapped);
+ pendingMappings.put(nodeId, mapped);
- GridNearAtomicUpdateRequest old = mappings.put(mappingKey, mapped);
+ GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped);
assert old == null || (old != null && remap) :
"Invalid mapping state [old=" + old + ", remap=" + remap + ']';
@@ -997,7 +964,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
if ((single == null || single) && pendingMappings.size() == 1) {
- Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
+ Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
single = true;
@@ -1020,35 +987,31 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
* @return Collection of nodes to which key is mapped.
*/
- private T2<Integer, Collection<ClusterNode>> mapKey(
+ private Collection<ClusterNode> mapKey(
KeyCacheObject key,
AffinityTopologyVersion topVer,
boolean fastMap
) {
GridCacheAffinityManager affMgr = cctx.affinity();
- int part = affMgr.partition(key);
-
// If we can send updates in parallel - do it.
- Collection<ClusterNode> nodes = fastMap ?
- cctx.topology().nodes(part, topVer) :
- Collections.singletonList(affMgr.primary(part, topVer));
-
- return new T2<>(part, nodes);
+ return fastMap ?
+ cctx.topology().nodes(affMgr.partition(key), topVer) :
+ Collections.singletonList(affMgr.primary(key, topVer));
}
/**
* Maps future to single node.
*
- * @param mappingKey Mapping key.
+ * @param nodeId Node ID.
* @param req Request.
*/
- private void mapSingle(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) {
- singleNodeId = mappingKey.nodeId();
+ private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ singleNodeId = nodeId;
singleReq = req;
- if (cctx.localNodeId().equals(mappingKey.nodeId())) {
- cache.updateAllAsyncInternal(mappingKey.nodeId(), req,
+ if (cctx.localNodeId().equals(nodeId)) {
+ cache.updateAllAsyncInternal(nodeId, req,
new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
@Override public void apply(GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res) {
@@ -1063,7 +1026,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- sendRequest(mappingKey, req);
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
onDone(new GridCacheReturn(cctx, true, null, true));
@@ -1079,37 +1042,34 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
*
* @param mappings Mappings to send.
*/
- private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) {
+ private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
UUID locNodeId = cctx.localNodeId();
- Collection<GridNearAtomicUpdateRequest> locUpdates = null;
+ GridNearAtomicUpdateRequest locUpdate = null;
// Send messages to remote nodes first, then run local update.
- for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
- GridAtomicMappingKey mappingKey = e.getKey();
- GridNearAtomicUpdateRequest req = e.getValue();
-
+ for (GridNearAtomicUpdateRequest req : mappings.values()) {
if (locNodeId.equals(req.nodeId())) {
- if (locUpdates == null)
- locUpdates = new ArrayList<>(mappings.size());
+ assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
+ ", req=" + req + ']';
- locUpdates.add(req);
+ locUpdate = req;
}
else {
try {
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- sendRequest(mappingKey, req);
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
}
- catch (IgniteCheckedException ex) {
- addFailedKeys(req.keys(), ex);
+ catch (IgniteCheckedException e) {
+ addFailedKeys(req.keys(), e);
- removeMapping(mappingKey);
+ removeMapping(req.nodeId());
}
if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
- removeMapping(mappingKey);
+ removeMapping(req.nodeId());
}
}
@@ -1117,52 +1077,28 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
// In FULL_ASYNC mode always return (null, true).
opRes = new GridCacheReturn(cctx, true, null, true);
- if (locUpdates != null) {
- for (GridNearAtomicUpdateRequest locUpdate : locUpdates) {
- cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req,
- GridNearAtomicUpdateResponse res) {
- assert res.futureVersion().equals(futVer) : futVer;
+ if (locUpdate != null) {
+ cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+ new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequest req,
+ GridNearAtomicUpdateResponse res) {
+ assert res.futureVersion().equals(futVer) : futVer;
- onResult(res.nodeId(), res);
- }
- });
- }
+ onResult(res.nodeId(), res);
+ }
+ });
}
checkComplete();
}
/**
- * Sends request.
- *
- * @param mappingKey Mapping key.
- * @param req Update request.
- * @throws IgniteCheckedException In case of error.
- */
- private void sendRequest(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req)
- throws IgniteCheckedException {
- if (mappingKey.partition() >= 0) {
- Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), true);
-
- cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(),
- 2 * cctx.gridConfig().getNetworkTimeout());
- }
- else {
- assert mappingKey.partition() == -1;
-
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
- }
- }
-
- /**
* Removes mapping from future mappings map.
*
- * @param mappingKey Mapping key.
+ * @param nodeId Node ID to remove mapping for.
*/
- private void removeMapping(GridAtomicMappingKey mappingKey) {
- mappings.remove(mappingKey);
+ private void removeMapping(UUID nodeId) {
+ mappings.remove(nodeId);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index b3075c4..734cf6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -135,9 +135,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** */
private boolean clientReq;
- /** Partition. */
- private int part;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -165,7 +162,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param clientReq Client node request flag.
- * @param part Partition.
*/
public GridNearAtomicUpdateRequest(
int cacheId,
@@ -184,8 +180,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
@Nullable UUID subjId,
int taskNameHash,
boolean skipStore,
- boolean clientReq,
- int part
+ boolean clientReq
) {
this.cacheId = cacheId;
this.nodeId = nodeId;
@@ -205,7 +200,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
this.taskNameHash = taskNameHash;
this.skipStore = skipStore;
this.clientReq = clientReq;
- this.part = part;
keys = new ArrayList<>();
}
@@ -321,13 +315,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
}
/**
- * @return Partition.
- */
- public int partition() {
- return part;
- }
-
- /**
* @param key Key to add.
* @param val Optional update value.
* @param conflictTtl Conflict TTL (optional).
@@ -679,8 +666,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
writer.incrementState();
case 16:
- if (!writer.writeInt("part", part))
- return false;
+// if (!writer.writeInt("part", part))
+// return false;
writer.incrementState();
@@ -863,10 +850,10 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 16:
- part = reader.readInt("part");
-
- if (!reader.isLastRead())
- return false;
+// part = reader.readInt("part");
+//
+// if (!reader.isLastRead())
+// return false;
reader.incrementState();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index e2d33d5..2b30536 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -92,9 +92,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** Near expire times. */
private GridLongList nearExpireTimes;
- /** Partition. */
- private int part;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -106,13 +103,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param cacheId Cache ID.
* @param nodeId Node ID this reply should be sent to.
* @param futVer Future version.
- * @param part Partition.
*/
- public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, int part) {
+ public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) {
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futVer = futVer;
- this.part = part;
}
/** {@inheritDoc} */
@@ -143,7 +138,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/**
* Sets update error.
- * @param err
+ * @param err Exception.
*/
public void error(IgniteCheckedException err){
this.err = err;
@@ -193,13 +188,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
- * @return Partition.
- */
- public int partition() {
- return part;
- }
-
- /**
* Adds value to be put in near cache on originating node.
*
* @param keyIdx Key index.
@@ -497,8 +485,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
case 12:
- if (!writer.writeInt("part", part))
- return false;
+// if (!writer.writeInt("part", part))
+// return false;
writer.incrementState();
@@ -603,10 +591,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 12:
- part = reader.readInt("part");
-
- if (!reader.isLastRead())
- return false;
+// part = reader.readInt("part");
+//
+// if (!reader.isLastRead())
+// return false;
reader.incrementState();