You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/16 15:45:49 UTC
[1/3] ignite git commit: ignite-4705
Repository: ignite
Updated Branches:
refs/heads/ignite-4705 7287a9368 -> eef1d3108
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 5dfea79..9da6b2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -177,6 +177,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
int cacheId,
UUID nodeId,
long futId,
+ UUID nearNodeId,
+ long nearFutId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -188,7 +190,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
boolean keepBinary,
boolean skipStore
) {
- super(cacheId, nodeId);
+ super(cacheId, nodeId, nearNodeId, nearFutId);
this.futId = futId;
this.writeVer = writeVer;
@@ -610,139 +612,139 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
}
switch (writer.state()) {
- case 3:
+ case 6:
if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
return false;
writer.incrementState();
- case 4:
+ case 7:
if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 5:
+ case 8:
if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 6:
+ case 9:
if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
- case 7:
+ case 10:
if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
return false;
writer.incrementState();
- case 8:
+ case 11:
if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
- case 9:
+ case 12:
if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 10:
+ case 13:
if (!writer.writeBoolean("keepBinary", keepBinary))
return false;
writer.incrementState();
- case 11:
+ case 14:
if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 12:
+ case 15:
if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 13:
+ case 16:
if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
return false;
writer.incrementState();
- case 14:
+ case 17:
if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 15:
+ case 18:
if (!writer.writeMessage("nearTtls", nearTtls))
return false;
writer.incrementState();
- case 16:
+ case 19:
if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 17:
+ case 20:
if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 18:
+ case 21:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 19:
+ case 22:
if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
- case 20:
+ case 23:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 21:
+ case 24:
if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
- case 22:
+ case 25:
if (!writer.writeMessage("ttls", ttls))
return false;
writer.incrementState();
- case 23:
+ case 26:
if (!writer.writeMessage("updateCntrs", updateCntrs))
return false;
writer.incrementState();
- case 24:
+ case 27:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 25:
+ case 28:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -764,7 +766,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
return false;
switch (reader.state()) {
- case 3:
+ case 6:
conflictExpireTimes = reader.readMessage("conflictExpireTimes");
if (!reader.isLastRead())
@@ -772,7 +774,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 4:
+ case 7:
conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -780,7 +782,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 5:
+ case 8:
entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
@@ -788,7 +790,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 6:
+ case 9:
flags = reader.readByte("flags");
if (!reader.isLastRead())
@@ -796,7 +798,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 7:
+ case 10:
forceTransformBackups = reader.readBoolean("forceTransformBackups");
if (!reader.isLastRead())
@@ -804,7 +806,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 8:
+ case 11:
futId = reader.readLong("futId");
if (!reader.isLastRead())
@@ -812,7 +814,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 9:
+ case 12:
invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
if (!reader.isLastRead())
@@ -820,7 +822,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 10:
+ case 13:
keepBinary = reader.readBoolean("keepBinary");
if (!reader.isLastRead())
@@ -828,7 +830,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 11:
+ case 14:
keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -836,7 +838,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 12:
+ case 15:
nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
@@ -844,7 +846,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 13:
+ case 16:
nearExpireTimes = reader.readMessage("nearExpireTimes");
if (!reader.isLastRead())
@@ -852,7 +854,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 14:
+ case 17:
nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -860,7 +862,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 15:
+ case 18:
nearTtls = reader.readMessage("nearTtls");
if (!reader.isLastRead())
@@ -868,7 +870,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 16:
+ case 19:
nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -876,7 +878,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 17:
+ case 20:
prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -884,7 +886,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 18:
+ case 21:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -892,7 +894,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 19:
+ case 22:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -904,7 +906,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 20:
+ case 23:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -912,7 +914,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 21:
+ case 24:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -920,7 +922,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 22:
+ case 25:
ttls = reader.readMessage("ttls");
if (!reader.isLastRead())
@@ -928,7 +930,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 23:
+ case 26:
updateCntrs = reader.readMessage("updateCntrs");
if (!reader.isLastRead())
@@ -936,7 +938,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 24:
+ case 27:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -944,7 +946,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 25:
+ case 28:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -970,7 +972,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 26;
+ return 29;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 76d28c9..c803d1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -156,6 +156,13 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
nearEvicted.add(key);
}
+ /**
+ * @param nearEvicted Evicted near cache keys.
+ */
+ void nearEvicted(List<KeyCacheObject> nearEvicted) {
+ this.nearEvicted = nearEvicted;
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 42f4bc3..82f171d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -299,7 +298,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
*/
public abstract void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr);
- public abstract void onResult(UUID nodeId, GridNearAtomicDhtResponse res);
+ public abstract void onResult(UUID nodeId, GridDhtAtomicNearResponse res);
/**
* @param req Request.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java
deleted file mode 100644
index fc99a5b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * TODO IGNITE-4705: no need send mapping if it == affinity.
- */
-public class GridNearAtomicDhtResponse extends GridCacheMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Message index. */
- public static final int CACHE_MSG_IDX = nextIndexId();
-
- /** */
- private static final int HAS_RESULT_MASK = 0x1;
-
- /** */
- private static final int RESULT_SUCCESS_MASK = 0x2;
-
- /** */
- private long futId;
-
- /** */
- @GridDirectCollection(UUID.class)
- private List<UUID> mapping;
-
- /** */
- private byte flags;
-
- /**
- *
- */
- public GridNearAtomicDhtResponse() {
- // No-op.
- }
-
- /**
- * @param futId Future ID.
- * @param mapping Update mapping.
- */
- public GridNearAtomicDhtResponse(long futId, List<UUID> mapping) {
- this.futId = futId;
- this.mapping = mapping;
- }
-
- /**
- * @param success Success flag.
- */
- public void setResult(boolean success) {
- setFlag(true, HAS_RESULT_MASK);
-
- setFlag(success, RESULT_SUCCESS_MASK);
- }
-
- /**
- * @return Operation result.
- */
- public GridCacheReturn result() {
- assert hasResult();
-
- return new GridCacheReturn(true, isFlag(RESULT_SUCCESS_MASK));
- }
-
- /**
- * @return {@code True} if response contains operation result.
- */
- public boolean hasResult() {
- return isFlag(HAS_RESULT_MASK);
- }
-
- /**
- * @return Update mapping.
- */
- public List<UUID> mapping() {
- return mapping;
- }
-
- /**
- * @param flag Set or clear.
- * @param mask Mask.
- */
- private void setFlag(boolean flag, int mask) {
- flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
- }
-
- /**
- * Reads flag mask.
- *
- * @param mask Mask to read.
- * @return Flag value.
- */
- private boolean isFlag(int mask) {
- return (flags & mask) != 0;
- }
-
- /**
- * @return Future ID.
- */
- public long futureId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return -45;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 6;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeLong("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- futId = reader.readLong("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearAtomicDhtResponse.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 8c3a364..2016c98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -50,6 +51,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -69,6 +71,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
private GridNearAtomicAbstractUpdateRequest req;
/** */
+ private Set<UUID> rcvd;
+
+ /** */
private Set<UUID> mapping;
/**
@@ -130,6 +135,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridNearAtomicUpdateResponse res = null;
GridNearAtomicAbstractUpdateRequest req;
+ GridCacheReturn opRes0 = null;
synchronized (mux) {
req = this.req != null && this.req.nodeId().equals(nodeId) ? this.req : null;
@@ -147,6 +153,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
res.addFailedKeys(req.keys(), e);
}
+ else {
+ if (mapping != null && mapping.remove(nodeId)) {
+ if (mapping.isEmpty() && opRes != null)
+ opRes0 = opRes;
+ }
+ }
}
if (res != null) {
@@ -158,6 +170,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
onResult(nodeId, res, true);
}
+ else if (opRes0 != null)
+ onDone(opRes0);
return false;
}
@@ -193,16 +207,44 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
return false;
}
+ /**
+ * @param nodeIds DHT nodes.
+ */
+ private void initMapping(List<UUID> nodeIds) {
+ mapping = U.newHashSet(nodeIds.size());
+
+ for (UUID dhtNodeId : nodeIds) {
+ if (cctx.discovery().node(dhtNodeId) != null)
+ mapping.add(dhtNodeId);
+ }
+ }
+
/** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridNearAtomicDhtResponse res) {
+ @Override public void onResult(UUID nodeId, GridDhtAtomicNearResponse res) {
GridCacheReturn opRes0 = null;
synchronized (mux) {
if (futId == null || futId != res.futureId())
return;
- if (mapping == null)
- mapping = new HashSet<>(res.mapping());
+ if (res.mapping() != null) {
+ // Mapping is sent from dht nodes.
+ if (mapping == null)
+ initMapping(res.mapping());
+ }
+ else {
+ // Mapping and result are sent from primary.
+ if (mapping == null) {
+ if (rcvd == null)
+ rcvd = new HashSet<>();
+
+ rcvd.add(nodeId);
+
+ return; // Need wait for response from primary.
+ }
+ else
+ mapping.remove(nodeId);
+ }
mapping.remove(nodeId);
@@ -250,6 +292,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
mapErrTopVer = req.topologyVersion();
}
else if (res.error() != null) {
+ // TODO IGNITE-4705: assert only 1 key?
if (res.failedKeys() != null) {
if (err == null)
err = new CachePartialUpdateCheckedException(
@@ -280,6 +323,18 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
else
opRes = ret;
+
+ if (res.mapping() != null) {
+ initMapping(res.mapping());
+
+ if (rcvd != null)
+ mapping.removeAll(rcvd);
+ }
+ else
+ mapping = Collections.emptySet();
+
+ if (!mapping.isEmpty())
+ return;
}
if (remapKey) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index c075f09..7b573b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -256,7 +256,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridNearAtomicDhtResponse res) {
+ @Override public void onResult(UUID nodeId, GridDhtAtomicNearResponse res) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index b089193..f6c2a2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -105,6 +105,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** Near expire times. */
private GridLongList nearExpireTimes;
+ /** */
+ @GridDirectCollection(UUID.class)
+ private List<UUID> mapping;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -125,6 +129,20 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
this.addDepInfo = addDepInfo;
}
+ /**
+ * @return Update mapping.
+ */
+ public List<UUID> mapping() {
+ return mapping;
+ }
+
+ /**
+ * @param mapping Mapping.
+ */
+ public void mapping(List<UUID> mapping) {
+ this.mapping = mapping;
+ }
+
/** {@inheritDoc} */
@Override public int lookupIndex() {
return CACHE_MSG_IDX;
@@ -472,48 +490,54 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
case 6:
- if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+ if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
return false;
writer.incrementState();
case 7:
- if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
+ if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
return false;
writer.incrementState();
case 8:
- if (!writer.writeMessage("nearTtls", nearTtls))
+ if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
return false;
writer.incrementState();
case 9:
- if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("nearTtls", nearTtls))
return false;
writer.incrementState();
case 10:
- if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
+ if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 11:
- if (!writer.writeMessage("nearVer", nearVer))
+ if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
return false;
writer.incrementState();
case 12:
- if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("nearVer", nearVer))
return false;
writer.incrementState();
case 13:
+ if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
if (!writer.writeMessage("ret", ret))
return false;
@@ -560,7 +584,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 6:
- nearExpireTimes = reader.readMessage("nearExpireTimes");
+ mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
if (!reader.isLastRead())
return false;
@@ -568,7 +592,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 7:
- nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
+ nearExpireTimes = reader.readMessage("nearExpireTimes");
if (!reader.isLastRead())
return false;
@@ -576,7 +600,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 8:
- nearTtls = reader.readMessage("nearTtls");
+ nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
@@ -584,7 +608,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 9:
- nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+ nearTtls = reader.readMessage("nearTtls");
if (!reader.isLastRead())
return false;
@@ -592,7 +616,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 10:
- nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
+ nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -600,7 +624,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 11:
- nearVer = reader.readMessage("nearVer");
+ nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
if (!reader.isLastRead())
return false;
@@ -608,7 +632,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 12:
- remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+ nearVer = reader.readMessage("nearVer");
if (!reader.isLastRead())
return false;
@@ -616,6 +640,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 13:
+ remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
ret = reader.readMessage("ret");
if (!reader.isLastRead())
@@ -635,7 +667,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 14;
+ return 15;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
new file mode 100644
index 0000000..106612c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class UpdateErrors implements Message {
+ /** Failed keys. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> failedKeys;
+
+ /** Update error. */
+ @GridDirectTransient
+ private IgniteCheckedException err;
+
+ /** Serialized update error. */
+ private byte[] errBytes;
+
+ /**
+ *
+ */
+ public UpdateErrors() {
+ // No-op.
+ }
+
+ /**
+ * @param err Error.
+ */
+ public void onError(IgniteCheckedException err){
+ this.err = err;
+ }
+
+ /**
+ * @return Error.
+ */
+ public IgniteCheckedException error() {
+ return err;
+ }
+
+ /**
+ * @return Failed keys.
+ */
+ public Collection<KeyCacheObject> failedKeys() {
+ return failedKeys;
+ }
+
+ /**
+ * Adds key to collection of failed keys.
+ *
+ * @param key Key to add.
+ * @param e Error cause.
+ */
+ public void addFailedKey(KeyCacheObject key, Throwable e) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>();
+
+ failedKeys.add(key);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys.");
+
+ err.addSuppressed(e);
+ }
+
+ /** {@inheritDoc} */
+ void prepareMarshal(GridCacheMessage msg, GridCacheContext cctx) throws IgniteCheckedException {
+ msg.prepareMarshalCacheObjects(failedKeys, cctx);
+
+ if (errBytes == null)
+ errBytes = U.marshal(cctx.marshaller(), err);
+ }
+
+ /** {@inheritDoc} */
+ void finishUnmarshal(GridCacheMessage msg, GridCacheContext cctx, ClassLoader ldr) throws IgniteCheckedException {
+ msg.finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+
+ if (errBytes != null && err == null)
+ err = U.unmarshal(cctx.marshaller(), errBytes, U.resolveClassLoader(ldr, cctx.gridConfig()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(UpdateErrors.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -46;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 41632ef..f8ae661 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.io.Externalizable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -43,7 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
@@ -299,11 +300,12 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
* @param nodeId Sender node ID.
* @param req Dht atomic update request.
* @param res Dht atomic update response.
+ * @return Evicted near keys (if any).
*/
- public void processDhtAtomicUpdateRequest(
+ @Nullable public List<KeyCacheObject> processDhtAtomicUpdateRequest(
UUID nodeId,
GridDhtAtomicAbstractUpdateRequest req,
- GridDhtAtomicUpdateResponse res
+ GridDhtAtomicNearResponse res
) {
GridCacheVersion ver = req.writeVersion();
@@ -313,6 +315,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+ List<KeyCacheObject> nearEvicted = null;
+
for (int i = 0; i < req.nearSize(); i++) {
KeyCacheObject key = req.nearKey(i);
@@ -322,7 +326,10 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
GridCacheEntryEx entry = peekEx(key);
if (entry == null) {
- res.addNearEvicted(key);
+ if (nearEvicted == null)
+ nearEvicted = new ArrayList<>();
+
+ nearEvicted.add(key);
break;
}
@@ -388,6 +395,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
res.addFailedKey(key, new IgniteCheckedException("Failed to update near cache key: " + key, e));
}
}
+
+ return nearEvicted;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
index 40e563c..8d15e5e 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java
@@ -22,7 +22,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.yardstick.cache.model.SampleValue;
/**
- * Ignite benchmark that performs invoke operations.
+ * Ignite benchmark that performs getAndPut operations.
*/
public class IgniteGetAndPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
index 49ae985..0a3794c 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java
@@ -27,7 +27,7 @@ import org.apache.ignite.yardstick.cache.model.SampleValue;
import org.yardstickframework.BenchmarkConfiguration;
/**
- * Ignite benchmark that performs invoke operations.
+ * Ignite benchmark that performs getAndPut operations.
*/
public class IgniteGetAndPutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
/** */
[2/3] ignite git commit: ignite-4705
Posted by sb...@apache.org.
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c5c5eb5a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c5c5eb5a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c5c5eb5a
Branch: refs/heads/ignite-4705
Commit: c5c5eb5ade63a64ccf8c193db77978c4e297fc19
Parents: 7287a93
Author: sboikov <sb...@gridgain.com>
Authored: Thu Feb 16 15:07:54 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Feb 16 15:23:12 2017 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 30 +-
.../communication/GridIoMessageFactory.java | 10 +-
.../processors/cache/GridCacheMessage.java | 4 +-
.../cache/GridDeferredAckMessageSender.java | 11 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 49 ++-
.../GridDhtAtomicAbstractUpdateRequest.java | 135 +++++++-
.../dht/atomic/GridDhtAtomicCache.java | 320 ++++++++++++-------
.../dht/atomic/GridDhtAtomicNearResponse.java | 268 ++++++++++++++++
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 24 +-
.../GridDhtAtomicSingleUpdateRequest.java | 56 ++--
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 8 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 98 +++---
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 7 +
.../GridNearAtomicAbstractUpdateFuture.java | 3 +-
.../dht/atomic/GridNearAtomicDhtResponse.java | 222 -------------
.../GridNearAtomicSingleUpdateFuture.java | 61 +++-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +-
.../atomic/GridNearAtomicUpdateResponse.java | 62 +++-
.../distributed/dht/atomic/UpdateErrors.java | 187 +++++++++++
.../distributed/near/GridNearAtomicCache.java | 17 +-
.../cache/IgniteGetAndPutBenchmark.java | 2 +-
.../cache/IgniteGetAndPutTxBenchmark.java | 2 +-
22 files changed, 1081 insertions(+), 497 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 16b1e01..6636bf2 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -46,17 +46,18 @@ import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -181,19 +182,20 @@ public class MessageCodeGenerator {
// gen.generateAll(true);
-// gen.generateAndWrite(GridNearAtomicDhtResponse.class);
-// gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class);
-// gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class);
-// gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class);
-// gen.generateAndWrite(GridDhtAtomicDeferredUpdateResponse.class);
-// gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class);
-// gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);
-// gen.generateAndWrite(GridDhtAtomicUpdateResponse.class);
-// gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class);
-// gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class);
-// gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
-// gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
-// gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
+ gen.generateAndWrite(UpdateErrors.class);
+ gen.generateAndWrite(GridDhtAtomicNearResponse.class);
+ //gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class);
+ gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class);
+ gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class);
+ gen.generateAndWrite(GridDhtAtomicDeferredUpdateResponse.class);
+ gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class);
+ gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);
+ gen.generateAndWrite(GridDhtAtomicUpdateResponse.class);
+ gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class);
+ gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class);
+ gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
+ gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
+ gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
// gen.generateAndWrite(GridMessageCollection.class);
// gen.generateAndWrite(DataStreamerEntry.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 769a615..5ed46ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -67,15 +67,16 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -174,8 +175,13 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -46:
+ msg = new UpdateErrors();
+
+ break;
+
case -45:
- msg = new GridNearAtomicDhtResponse();
+ msg = new GridDhtAtomicNearResponse();
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 0646d5a..b9fb56a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -501,7 +501,7 @@ public abstract class GridCacheMessage implements Message {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- protected final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+ public final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
GridCacheContext ctx) throws IgniteCheckedException {
if (col == null)
return;
@@ -553,7 +553,7 @@ public abstract class GridCacheMessage implements Message {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- protected final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+ public final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
GridCacheContext ctx,
ClassLoader ldr)
throws IgniteCheckedException
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
index 8df883a..37ecc79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -21,7 +21,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -41,16 +40,16 @@ public abstract class GridDeferredAckMessageSender<T> {
private GridTimeoutProcessor time;
/** Closure processor. */
- public GridClosureProcessor closure;
+ public GridClosureProcessor c;
/**
* @param time Time.
- * @param closure Closure.
+ * @param c Closure.
*/
public GridDeferredAckMessageSender(GridTimeoutProcessor time,
- GridClosureProcessor closure) {
+ GridClosureProcessor c) {
this.time = time;
- this.closure = closure;
+ this.c = c;
}
/**
@@ -151,7 +150,7 @@ public abstract class GridDeferredAckMessageSender<T> {
/** {@inheritDoc} */
@Override public void onTimeout() {
if (guard.compareAndSet(false, true)) {
- closure.runLocalSafe(new Runnable() {
+ c.runLocalSafe(new Runnable() {
@Override public void run() {
writeLock().lock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 10d1c4b..d494d98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -93,9 +93,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** Continuous query closures. */
private Collection<CI1<Boolean>> cntQryClsrs;
- /** */
- private final boolean waitForExchange;
-
/** Response count. */
private volatile int resCnt;
@@ -113,14 +110,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes) {
this.cctx = cctx;
-
- futId = cctx.mvcc().atomicFutureId();
this.updateReq = updateReq;
this.completionCb = completionCb;
this.updateRes = updateRes;
this.writeVer = writeVer;
- waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
+ futId = cctx.mvcc().atomicFutureId();
if (log == null) {
msgLog = cctx.shared().atomicMessageLogger();
@@ -130,6 +125,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** {@inheritDoc} */
@Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ boolean waitForExchange = !updateReq.topologyLocked();
+
if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
return this;
@@ -160,7 +157,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @param updateCntr Partition update counter.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- final void addWriteEntry(GridDhtCacheEntry entry,
+ final void addWriteEntry(
+ UUID nearNodeId,
+ GridDhtCacheEntry entry,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
@@ -190,7 +189,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
if (updateReq == null) {
updateReq = createRequest(
- node,
+ node.id(),
+ nearNodeId,
futId,
writeVer,
syncMode,
@@ -236,7 +236,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @param ttl TTL for near cache update (optional).
* @param expireTime Expire time for near cache update (optional).
*/
- final void addNearWriteEntries(Collection<UUID> readers,
+ final void addNearWriteEntries(
+ UUID nearNodeId,
+ Collection<UUID> readers,
GridDhtCacheEntry entry,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
@@ -259,7 +261,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
continue;
updateReq = createRequest(
- node,
+ node.id(),
+ nearNodeId,
futId,
writeVer,
syncMode,
@@ -352,9 +355,21 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* Sends requests to remote nodes.
*/
final void map() {
+ boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
+
if (!F.isEmpty(mappings)) {
+ List<UUID> dhtNodes = null;
+
+ if (fullSync) {
+ dhtNodes = new ArrayList<>(mappings.size());
+
+ dhtNodes.addAll(mappings.keySet());
+ }
+
for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
try {
+ req.dhtNodes(dhtNodes);
+
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (msgLog.isDebugEnabled()) {
@@ -383,7 +398,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
// Send response right away if no ACKs from backup is required.
// Backups will send ACKs anyway, future will be completed after all backups have replied.
- if (updateReq.writeSynchronizationMode() != FULL_SYNC)
+ if (!fullSync)
completionCb.apply(updateReq, updateRes);
}
@@ -400,7 +415,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
/**
- * @param node Node.
+ * @param nodeId Node ID.
+ * @param nodeId Near node ID.
* @param futId Future ID.
* @param writeVer Update version.
* @param syncMode Write synchronization mode.
@@ -411,7 +427,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @return Request.
*/
protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
- ClusterNode node,
+ UUID nodeId,
+ UUID nearNodeId,
long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@@ -449,9 +466,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
for (CI1<Boolean> clsr : cntQryClsrs)
clsr.apply(suc);
}
-
- if (updateReq.writeSynchronizationMode() == FULL_SYNC)
- completionCb.apply(updateReq, updateRes);
+//
+// if (updateReq.writeSynchronizationMode() == FULL_SYNC)
+// completionCb.apply(updateReq, updateRes);
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 7aa440d..3edbf8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -18,10 +18,13 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.List;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -29,6 +32,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -46,6 +52,16 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
@GridDirectTransient
private boolean onRes;
+ /** */
+ private UUID nearNodeId;
+
+ /** */
+ private long nearFutId;
+
+ /** */
+ @GridDirectCollection(UUID.class)
+ private List<UUID> dhtNodes;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -58,10 +74,35 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
+ * @param nearNodeId Near node ID.
+ * @param nearFutId Future ID on near node.
*/
- protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId) {
+ protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId, UUID nearNodeId, long nearFutId) {
this.cacheId = cacheId;
this.nodeId = nodeId;
+ this.nearNodeId = nearNodeId;
+ this.nearFutId = nearFutId;
+ }
+
+ /**
+ * @return Near node ID.
+ */
+ public UUID nearNodeId() {
+ return nearNodeId;
+ }
+
+ /**
+ * @param dhtNodes DHT nodes.
+ */
+ public void dhtNodes(List<UUID> dhtNodes) {
+ this.dhtNodes = dhtNodes;
+ }
+
+ /**
+ * @return DHT nodes.
+ */
+ public List<UUID> dhtNodes() {
+ return dhtNodes;
}
/** {@inheritDoc} */
@@ -166,11 +207,18 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
public abstract int taskNameHash();
/**
- * @return Version assigned on primary node.
+ * @return Future ID on primary node.
*/
public abstract long futureId();
/**
+ * @return Future ID on near node.
+ */
+ public final long nearFutureId() {
+ return nearFutId;
+ }
+
+ /**
* @return Write version.
*/
public abstract GridCacheVersion writeVersion();
@@ -284,4 +332,87 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
* @return Optional arguments for entry processor.
*/
@Nullable public abstract Object[] invokeArguments();
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 6;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeCollection("dhtNodes", dhtNodes, MessageCollectionItemType.UUID))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeLong("nearFutId", nearFutId))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ dhtNodes = reader.readCollection("dhtNodes", MessageCollectionItemType.UUID);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ nearFutId = reader.readLong("nearFutId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ nearNodeId = reader.readUuid("nearNodeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtAtomicAbstractUpdateRequest.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 87ac54b..3b81ee7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -55,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
@@ -93,14 +92,13 @@ import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -112,6 +110,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -211,17 +210,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
- if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
- assert req.writeSynchronizationMode() != FULL_ASYNC : req;
-
- // Always send reply in CLOCK ordering mode.
- sendNearUpdateReply(res.nodeId(), res);
-
- return;
- }
-
- // Request should be for primary keys only in PRIMARY ordering mode.
- assert req.hasPrimary() : req;
+// if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
+// assert req.writeSynchronizationMode() != FULL_ASYNC : req;
+//
+// // Always send reply in CLOCK ordering mode.
+// sendNearUpdateReply(res.nodeId(), res);
+//
+// return;
+// }
+//
+// // Request should be for primary keys only in PRIMARY ordering mode.
+// assert req.hasPrimary() : req;
if (req.writeSynchronizationMode() != FULL_ASYNC)
sendNearUpdateReply(res.nodeId(), res);
@@ -422,14 +421,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicDhtResponse.class, new CI2<UUID, GridNearAtomicDhtResponse>() {
- @Override public void apply(UUID uuid, GridNearAtomicDhtResponse msg) {
-
+ ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicNearResponse.class, new CI2<UUID, GridDhtAtomicNearResponse>() {
+ @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) {
+ processDhtAtomicNearResponse(uuid, msg);
}
@Override public String toString() {
- return "GridDhtAtomicDeferredUpdateResponse handler " +
- "[msgIdx=" + GridNearAtomicDhtResponse.CACHE_MSG_IDX + ']';
+ return "GridDhtAtomicNearResponse handler " +
+ "[msgIdx=" + GridDhtAtomicNearResponse.CACHE_MSG_IDX + ']';
}
});
@@ -1819,12 +1818,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- // Do not check topology version for CLOCK versioning since
- // partition exchange will wait for near update future (if future is on server node).
- // Also do not check topology version if topology was locked on near node by
+ // Do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
- !needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
@@ -1836,19 +1832,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
- GridCacheVersion ver = req.updateVersion();
-
- if (ver == null) {
- // Assign next version for update inside entries lock.
- ver = ctx.versions().next(top.topologyVersion());
+ // Assign next version for update inside entries lock.
+ GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
- if (hasNear)
- res.nearVersion(ver);
+ if (hasNear)
+ res.nearVersion(ver);
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Assigned update version [futId=" + req.futureId() +
- ", writeVer=" + ver + ']');
- }
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Assigned update version [futId=" + req.futureId() +
+ ", writeVer=" + ver + ']');
}
assert ver != null : "Got null version for update request: " + req;
@@ -2413,7 +2405,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* Updates locked entries one-by-one.
*
- * @param node Originating node.
+ * @param nearNode Originating node.
* @param hasNear {@code True} if originating node has near cache.
* @param req Update request.
* @param res Update response.
@@ -2429,7 +2421,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
private UpdateSingleResult updateSingle(
- ClusterNode node,
+ ClusterNode nearNode,
boolean hasNear,
GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res,
@@ -2473,9 +2465,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
- boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(),
- req.topologyVersion());
-
Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
Collection<UUID> readers = null;
@@ -2483,38 +2472,37 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (checkReaders) {
readers = entry.readers();
- filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+ filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id()));
}
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
- node.id(),
+ nearNode.id(),
locNodeId,
op,
writeVal,
req.invokeArguments(),
- (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()))
- && writeThrough() && !req.skipStore(),
+ writeThrough() && !req.skipStore(),
!req.skipStore(),
sndPrevVal || req.returnValue(),
req.keepBinary(),
expiry,
- true,
- true,
- primary,
- ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
+ /*event*/true,
+ /*metrics*/true,
+ /*primary*/true,
+ /*verCheck*/false,
topVer,
req.filter(),
- replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+ replicate ? DR_PRIMARY : DR_NONE,
newConflictTtl,
newConflictExpireTime,
newConflictVer,
- true,
+ /*conflictResolve*/true,
intercept,
req.subjectId(),
taskName,
- null,
- null,
+ /*prevVal*/null,
+ /*updateCntr*/null,
dhtFut);
if (dhtFut == null && !F.isEmpty(filteredReaders)) {
@@ -2535,7 +2523,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
EntryProcessor<Object, Object, Object> entryProcessor = null;
if (!readersOnly) {
- dhtFut.addWriteEntry(entry,
+ dhtFut.addWriteEntry(
+ nearNode.id(),
+ entry,
updRes.newValue(),
entryProcessor,
updRes.newTtl(),
@@ -2547,7 +2537,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (!F.isEmpty(filteredReaders))
- dhtFut.addNearWriteEntries(filteredReaders,
+ dhtFut.addNearWriteEntries(
+ nearNode.id(),
+ filteredReaders,
entry,
updRes.newValue(),
entryProcessor,
@@ -2562,8 +2554,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (hasNear) {
- if (primary && updRes.sendToDht()) {
- if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) {
+ if (updRes.sendToDht()) {
+ if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
// If put the same value as in request then do not need to send it back.
if (op == TRANSFORM || writeVal != updRes.newValue()) {
res.addNearValue(i,
@@ -2575,13 +2567,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());
if (updRes.newValue() != null) {
- IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+ IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
assert f == null : f;
}
}
- else if (F.contains(readers, node.id())) // Reader became primary or backup.
- entry.removeReader(node.id(), req.messageId());
+ else if (F.contains(readers, nearNode.id())) // Reader became primary or backup.
+ entry.removeReader(nearNode.id(), req.messageId());
else
res.addSkippedIndex(i);
}
@@ -2603,7 +2595,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) {
if (retVal == null)
- retVal = new GridCacheReturn(node.isLocal());
+ retVal = new GridCacheReturn(nearNode.isLocal());
retVal.addEntryProcessResult(ctx,
k,
@@ -2619,7 +2611,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CacheObject ret = updRes.oldValue();
retVal = new GridCacheReturn(ctx,
- node.isLocal(),
+ nearNode.isLocal(),
req.keepBinary(),
req.returnValue() ? ret : null,
updRes.success());
@@ -2639,7 +2631,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param firstEntryIdx Index of the first entry in the request keys collection.
* @param entries Entries to update.
* @param ver Version to set.
- * @param node Originating node.
+ * @param nearNode Originating node.
* @param writeVals Write values.
* @param putMap Values to put.
* @param rmvKeys Keys to remove.
@@ -2661,7 +2653,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final int firstEntryIdx,
final List<GridDhtCacheEntry> entries,
final GridCacheVersion ver,
- final ClusterNode node,
+ final ClusterNode nearNode,
@Nullable final List<CacheObject> writeVals,
@Nullable final Map<KeyCacheObject, CacheObject> putMap,
@Nullable final Collection<KeyCacheObject> rmvKeys,
@@ -2765,12 +2757,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (checkReaders) {
readers = entry.readers();
- filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+ filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id()));
}
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
- node.id(),
+ nearNode.id(),
locNodeId,
op,
writeVal,
@@ -2831,7 +2823,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
if (!batchRes.readersOnly()) {
- dhtFut.addWriteEntry(entry,
+ dhtFut.addWriteEntry(
+ nearNode.id(),
+ entry,
writeVal,
entryProcessor,
updRes.newTtl(),
@@ -2843,7 +2837,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (!F.isEmpty(filteredReaders))
- dhtFut.addNearWriteEntries(filteredReaders,
+ dhtFut.addNearWriteEntries(
+ nearNode.id(),
+ filteredReaders,
entry,
writeVal,
entryProcessor,
@@ -2853,7 +2849,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (hasNear) {
if (primary) {
- if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) {
+ if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
int idx = firstEntryIdx + i;
if (req.operation() == TRANSFORM) {
@@ -2866,13 +2862,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
if (writeVal != null || entry.hasValue()) {
- IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+ IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
assert f == null : f;
}
}
- else if (readers.contains(node.id())) // Reader became primary or backup.
- entry.removeReader(node.id(), req.messageId());
+ else if (readers.contains(nearNode.id())) // Reader became primary or backup.
+ entry.removeReader(nearNode.id(), req.messageId());
else
res.addSkippedIndex(firstEntryIdx + i);
}
@@ -3148,31 +3144,35 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean force
) {
- if (!force) {
- if (updateReq.fastMap())
- return null;
-
- AffinityTopologyVersion topVer = updateReq.topologyVersion();
-
- Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer);
-
- // We are on primary node for some key.
- assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer +
- ctx.kernalContext().discovery().discoCache(topVer) + ']';
-
- if (nodes.size() == 1) {
- if (log.isDebugEnabled())
- log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " +
- "[topVer=" + topVer + ", updateReq=" + updateReq + ']');
-
- return null;
- }
- }
-
if (updateReq.size() == 1)
return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
else
return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+// if (!force) {
+// if (updateReq.fastMap())
+// return null;
+//
+// AffinityTopologyVersion topVer = updateReq.topologyVersion();
+//
+// Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer);
+//
+// // We are on primary node for some key.
+// assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer +
+// ctx.kernalContext().discovery().discoCache(topVer) + ']';
+//
+// if (nodes.size() == 1) {
+// if (log.isDebugEnabled())
+// log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " +
+// "[topVer=" + topVer + ", updateReq=" + updateReq + ']');
+//
+// return null;
+// }
+// }
+//
+// if (updateReq.size() == 1)
+// return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+// else
+// return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
}
/**
@@ -3225,9 +3225,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheVersion ver = req.writeVersion();
- // Always send update reply.
- GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureId(),
- ctx.deploymentEnabled());
+ GridDhtAtomicNearResponse nearRes = ctx.config().getWriteSynchronizationMode() == FULL_SYNC ?
+ new GridDhtAtomicNearResponse(req.nearFutureId(), req.dhtNodes()) : null;
Boolean replicate = ctx.isDrEnabled();
@@ -3311,39 +3310,113 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Ignore.
}
catch (IgniteCheckedException e) {
- res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+ if (nearRes != null)
+ nearRes.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+
+ U.error(log, "Failed to update key on backup node: " + key, e);
}
}
- if (isNearEnabled(cacheCfg))
- ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res);
+ GridDhtAtomicUpdateResponse dhtRes = null;
- try {
- if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) {
- ctx.io().send(nodeId, res, ctx.ioPolicy());
+ if (isNearEnabled(cacheCfg)) {
+ List<KeyCacheObject> nearEvicted =
+ ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Sent DHT atomic update response [futId=" + req.futureId() +
- ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
- }
- }
+ dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureId(), ctx.deploymentEnabled());
+
+ dhtRes.nearEvicted(nearEvicted);
+ }
+
+ final boolean RES_AFTER_ACK = false;
+
+ if (nearRes != null) {
+ if (RES_AFTER_ACK)
+ sendDhtNearResponse(nodeId, req, nearRes);
else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureId() +
- ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
- }
+ sendDhtNearResponse(null, req, nearRes);
- // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
sendDeferredUpdateResponse(nodeId, req.futureId());
}
}
+ else
+ sendDeferredUpdateResponse(nodeId, req.futureId());
+
+ if (dhtRes != null)
+ sendDhtPrimaryResponse(nodeId, req, dhtRes);
+ }
+
+ /**
+ * @param nodeId Primary node ID.
+ * @param req Request.
+ * @param dhtRes Response to send.
+ */
+ private void sendDhtPrimaryResponse(UUID nodeId,
+ GridDhtAtomicAbstractUpdateRequest req,
+ GridDhtAtomicUpdateResponse dhtRes) {
+ try {
+ ctx.io().send(nodeId, dhtRes, ctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Sent DHT response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", writeVer=" + req.writeVersion() +
+ ", node=" + nodeId + ']');
+ }
+ }
catch (ClusterTopologyCheckedException ignored) {
- U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureId() +
- ", node=" + req.nodeId() + ']');
+ U.warn(msgLog, "Failed to send DHT response, node left [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + nodeId + ']');
}
catch (IgniteCheckedException e) {
- U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureId() +
- ", node=" + nodeId + ", res=" + res + ']', e);
+ U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + nodeId +
+ ", res=" + dhtRes + ']', e);
+ }
+ }
+
+ /**
+ * @param req Request.
+ * @param nearRes Response to send.
+ */
+ private void sendDhtNearResponse(final UUID primaryId,
+ final GridDhtAtomicAbstractUpdateRequest req,
+ GridDhtAtomicNearResponse nearRes) {
+ try {
+ ClusterNode node = ctx.discovery().node(req.nearNodeId());
+
+ if (node == null)
+ throw new ClusterTopologyCheckedException("Node left: " + req.nearNodeId());
+
+ if (primaryId != null) {
+ ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy(), new IgniteInClosure<IgniteException>() {
+ @Override public void apply(IgniteException e) {
+ sendDeferredUpdateResponse(primaryId, req.futureId());
+ }
+ });
+ }
+ else
+ ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Sent DHT near response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", writeVer=" + req.writeVersion() +
+ ", node=" + req.nearNodeId() + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ U.warn(msgLog, "Failed to send DHT near response, node left [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + req.nearNodeId() + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + req.nearNodeId() +
+ ", res=" + nearRes + ']', e);
}
}
@@ -3359,8 +3432,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param nodeId Node ID.
* @param res Response.
*/
- private void processNearAtomicDhtResponse(UUID nodeId, GridNearAtomicDhtResponse res) {
+ private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+ GridNearAtomicAbstractUpdateFuture updateFut =
+ (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
+ if (updateFut != null) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Received DHT atomic near response [futId=" + res.futureId() +
+ ", node=" + nodeId + ']');
+ }
+
+ updateFut.onResult(nodeId, res);
+ }
+ else {
+ U.warn(msgLog, "Failed to find update future DHT atomic near response [futId=" + res.futureId() +
+ ", node=" + nodeId +
+ ", res=" + res + ']');
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
new file mode 100644
index 0000000..628e1dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * TODO IGNITE-4705: no not send mapping if it == affinity?
+ */
+public class GridDhtAtomicNearResponse extends GridCacheMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** */
+ private static final int HAS_RESULT_MASK = 0x1;
+
+ /** */
+ private static final int RESULT_SUCCESS_MASK = 0x2;
+
+ /** */
+ private long futId;
+
+ /** */
+ @GridDirectCollection(UUID.class)
+ private List<UUID> mapping;
+
+ /** */
+ private byte flags;
+
+ /** */
+ private UpdateErrors errors;
+
+ /**
+ *
+ */
+ public GridDhtAtomicNearResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param mapping Update mapping.
+ */
+ public GridDhtAtomicNearResponse(long futId, List<UUID> mapping) {
+ this.futId = futId;
+ this.mapping = mapping;
+ }
+
+ /**
+ * @param key Key.
+ * @param e Error.
+ */
+ public void addFailedKey(KeyCacheObject key, Throwable e) {
+ if (errors == null)
+ errors = new UpdateErrors();
+
+ errors.addFailedKey(key, e);
+ }
+
+ /**
+ * @param success Success flag.
+ */
+ public void setResult(boolean success) {
+ setFlag(true, HAS_RESULT_MASK);
+
+ setFlag(success, RESULT_SUCCESS_MASK);
+ }
+
+ /**
+ * @return Operation result.
+ */
+ public GridCacheReturn result() {
+ assert hasResult();
+
+ return new GridCacheReturn(true, isFlag(RESULT_SUCCESS_MASK));
+ }
+
+ /**
+ * @return {@code True} if response contains operation result.
+ */
+ public boolean hasResult() {
+ return isFlag(HAS_RESULT_MASK);
+ }
+
+ /**
+ * @return Update mapping.
+ */
+ public List<UUID> mapping() {
+ return mapping;
+ }
+
+ /**
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reads flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -45;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 7;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (errors != null)
+ errors.prepareMarshal(this, ctx.cacheContext(cacheId));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (errors != null)
+ errors.finishUnmarshal(this, ctx.cacheContext(cacheId), ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeMessage("errors", errors))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ errors = reader.readMessage("errors");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtAtomicNearResponse.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 0c8e482..671034c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -44,9 +43,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
/** */
private static final long serialVersionUID = 0L;
- /** */
- private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4");
-
/** Future keys. */
private KeyCacheObject key;
@@ -97,7 +93,8 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
/** {@inheritDoc} */
@Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
- ClusterNode node,
+ UUID nodeId,
+ UUID nearNodeId,
long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@@ -106,11 +103,13 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer
) {
- if (canUseSingleRequest(node, ttl, conflictExpireTime, conflictVer)) {
+ if (canUseSingleRequest(ttl, conflictExpireTime, conflictVer)) {
return new GridDhtAtomicSingleUpdateRequest(
cctx.cacheId(),
- node.id(),
+ nodeId,
futId,
+ nearNodeId,
+ updateReq.futureId(),
writeVer,
syncMode,
topVer,
@@ -123,8 +122,10 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
else {
return new GridDhtAtomicUpdateRequest(
cctx.cacheId(),
- node.id(),
+ nodeId,
futId,
+ nearNodeId,
+ updateReq.futureId(),
writeVer,
syncMode,
topVer,
@@ -167,18 +168,15 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
}
/**
- * @param node Target node
* @param ttl TTL.
* @param conflictExpireTime Conflict expire time.
* @param conflictVer Conflict version.
* @return {@code True} if it is possible to use {@link GridDhtAtomicSingleUpdateRequest}.
*/
- private boolean canUseSingleRequest(ClusterNode node,
- long ttl,
+ private boolean canUseSingleRequest(long ttl,
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer) {
- return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 &&
- (ttl == CU.TTL_NOT_CHANGED) &&
+ return (ttl == CU.TTL_NOT_CHANGED) &&
(conflictExpireTime == CU.EXPIRE_TIME_CALCULATE) &&
conflictVer == null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 127c2be..e46c843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -51,7 +51,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** Near cache key flag. */
private static final int NEAR_FLAG_MASK = 0x80;
- /** Future version. */
+ /** Future ID on primary. */
protected long futId;
/** Write version. */
@@ -116,6 +116,8 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
int cacheId,
UUID nodeId,
long futId,
+ UUID nearNodeId,
+ long nearFutId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -125,7 +127,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
boolean keepBinary,
boolean skipStore
) {
- super(cacheId, nodeId);
+ super(cacheId, nodeId, nearNodeId, nearFutId);
this.futId = futId;
this.writeVer = writeVer;
this.syncMode = syncMode;
@@ -423,73 +425,73 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
}
switch (writer.state()) {
- case 3:
+ case 6:
if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
- case 4:
+ case 7:
if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
- case 5:
+ case 8:
if (!writer.writeMessage("key", key))
return false;
writer.incrementState();
- case 6:
+ case 9:
if (!writer.writeInt("partId", partId))
return false;
writer.incrementState();
- case 7:
+ case 10:
if (!writer.writeMessage("prevVal", prevVal))
return false;
writer.incrementState();
- case 8:
+ case 11:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 9:
+ case 12:
if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
- case 10:
+ case 13:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 11:
+ case 14:
if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
- case 12:
+ case 15:
if (!writer.writeLong("updateCntr", updateCntr))
return false;
writer.incrementState();
- case 13:
+ case 16:
if (!writer.writeMessage("val", val))
return false;
writer.incrementState();
- case 14:
+ case 17:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -511,7 +513,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
return false;
switch (reader.state()) {
- case 3:
+ case 6:
flags = reader.readByte("flags");
if (!reader.isLastRead())
@@ -519,7 +521,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 4:
+ case 7:
futId = reader.readLong("futId");
if (!reader.isLastRead())
@@ -527,7 +529,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 5:
+ case 8:
key = reader.readMessage("key");
if (!reader.isLastRead())
@@ -535,7 +537,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 6:
+ case 9:
partId = reader.readInt("partId");
if (!reader.isLastRead())
@@ -543,7 +545,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 7:
+ case 10:
prevVal = reader.readMessage("prevVal");
if (!reader.isLastRead())
@@ -551,7 +553,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 8:
+ case 11:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -559,7 +561,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 9:
+ case 12:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -571,7 +573,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 10:
+ case 13:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -579,7 +581,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 11:
+ case 14:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -587,7 +589,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 12:
+ case 15:
updateCntr = reader.readLong("updateCntr");
if (!reader.isLastRead())
@@ -595,7 +597,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 13:
+ case 16:
val = reader.readMessage("val");
if (!reader.isLastRead())
@@ -603,7 +605,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 14:
+ case 17:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -653,7 +655,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 15;
+ return 18;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 7cb75fa..ea6a1b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -122,7 +122,9 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
}
/** {@inheritDoc} */
- @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(ClusterNode node,
+ @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
+ UUID nodeId,
+ UUID nearNodeId,
long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@@ -133,8 +135,10 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
) {
return new GridDhtAtomicUpdateRequest(
cctx.cacheId(),
- node.id(),
+ nodeId,
futId,
+ nearNodeId,
+ updateReq.futureId(),
writeVer,
syncMode,
topVer,
[3/3] ignite git commit: ignite-4705
Posted by sb...@apache.org.
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eef1d310
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eef1d310
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eef1d310
Branch: refs/heads/ignite-4705
Commit: eef1d3108178f17293175c0e6eb04707c89ca876
Parents: c5c5eb5
Author: sboikov <sb...@gridgain.com>
Authored: Thu Feb 16 17:36:24 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Feb 16 17:36:24 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 4 ++
.../processors/cache/GridCacheMessage.java | 2 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 22 ++++--
.../GridDhtAtomicAbstractUpdateRequest.java | 74 +++++++++++++++++++-
.../dht/atomic/GridDhtAtomicCache.java | 10 +--
.../GridDhtAtomicDeferredUpdateResponse.java | 7 +-
.../dht/atomic/GridDhtAtomicNearResponse.java | 38 +++++-----
.../GridDhtAtomicSingleUpdateRequest.java | 48 ++-----------
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 52 ++------------
.../distributed/dht/atomic/UpdateErrors.java | 6 ++
10 files changed, 137 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index f9952b3..0f7371d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
@@ -361,6 +362,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (depEnabled)
cctx.deploy().ignoreOwnership(true);
+ if (!cacheMsg.partitionExchangeMessage())
+ log.info("Cache message: " + cacheMsg);
+
unmarshall(nodeId, cacheMsg);
if (cacheMsg.classError() != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index b9fb56a..3ec5323 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message {
private static final long serialVersionUID = 0L;
/** Maximum number of cache lookup indexes. */
- public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 5;
+ public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 6;
/** Cache message index field name. */
public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX";
http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index d494d98..1c83163 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -353,8 +354,10 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/**
* Sends requests to remote nodes.
+ *
+ * @param ret Cache operation return value.
*/
- final void map() {
+ final void map(GridCacheReturn ret) {
boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
if (!F.isEmpty(mappings)) {
@@ -369,6 +372,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
try {
req.dhtNodes(dhtNodes);
+ req.setResult(ret.success());
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
@@ -392,14 +396,18 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
registerResponse(req.nodeId());
}
}
- }
- else
- onDone();
- // Send response right away if no ACKs from backup is required.
- // Backups will send ACKs anyway, future will be completed after all backups have replied.
- if (!fullSync)
+ // Send response right away if no ACKs from backup is required.
+ // Backups will send ACKs anyway, future will be completed after all backups have replied.
+ if (!fullSync)
+ completionCb.apply(updateReq, updateRes);
+ }
+ else {
+ // Reply.
completionCb.apply(updateReq, updateRes);
+
+ onDone();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 3edbf8c..30c07e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -41,6 +41,21 @@ import org.jetbrains.annotations.Nullable;
*
*/
public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+ /** Skip store flag bit mask. */
+ public static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01;
+
+ /** Keep binary flag. */
+ public static final int DHT_ATOMIC_KEEP_BINARY_FLAG_MASK = 0x02;
+
+ /** Near cache key flag. */
+ public static final int DHT_ATOMIC_NEAR_FLAG_MASK = 0x04;
+
+ /** */
+ public static final int DHT_ATOMIC_HAS_RESULT_MASK = 0x08;
+
+ /** */
+ public static final int DHT_ATOMIC_RESULT_SUCCESS_MASK = 0x10;
+
/** Message index. */
public static final int CACHE_MSG_IDX = nextIndexId();
@@ -58,6 +73,9 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
/** */
private long nearFutId;
+ /** Additional flags. */
+ protected byte flags;
+
/** */
@GridDirectCollection(UUID.class)
private List<UUID> dhtNodes;
@@ -85,6 +103,15 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
}
/**
+ * @param success Success flag.
+ */
+ public void setResult(boolean success) {
+ setFlag(true, DHT_ATOMIC_HAS_RESULT_MASK);
+
+ setFlag(success, DHT_ATOMIC_RESULT_SUCCESS_MASK);
+ }
+
+ /**
* @return Near node ID.
*/
public UUID nearNodeId() {
@@ -118,6 +145,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
}
/**
+ * @return Flags.
+ */
+ public final byte flags() {
+ return flags;
+ }
+
+ /**
* @return Keep binary flag.
*/
public abstract boolean keepBinary();
@@ -333,9 +367,29 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
*/
@Nullable public abstract Object[] invokeArguments();
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ protected final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 6;
+ return 7;
}
/** {@inheritDoc} */
@@ -360,12 +414,18 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
writer.incrementState();
case 4:
- if (!writer.writeLong("nearFutId", nearFutId))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 5:
+ if (!writer.writeLong("nearFutId", nearFutId))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
@@ -396,7 +456,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
reader.incrementState();
case 4:
- nearFutId = reader.readLong("nearFutId");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -404,6 +464,14 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
reader.incrementState();
case 5:
+ nearFutId = reader.readLong("nearFutId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 3b81ee7..2f6e320 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -421,7 +421,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicNearResponse.class, new CI2<UUID, GridDhtAtomicNearResponse>() {
+ ctx.io().addHandler(ctx.cacheId(),
+ GridDhtAtomicNearResponse.class,
+ new CI2<UUID, GridDhtAtomicNearResponse>() {
@Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) {
processDhtAtomicNearResponse(uuid, msg);
}
@@ -1969,7 +1971,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else {
// If there are backups, map backup update future.
if (dhtFut != null)
- dhtFut.map();
+ dhtFut.map(res.returnValue());
// Otherwise, complete the call.
else
completionCb.apply(req, res);
@@ -3226,9 +3228,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheVersion ver = req.writeVersion();
GridDhtAtomicNearResponse nearRes = ctx.config().getWriteSynchronizationMode() == FULL_SYNC ?
- new GridDhtAtomicNearResponse(req.nearFutureId(), req.dhtNodes()) : null;
+ new GridDhtAtomicNearResponse(ctx.cacheId(), req.nearFutureId(), req.dhtNodes(), req.flags()) : null;
- Boolean replicate = ctx.isDrEnabled();
+ boolean replicate = ctx.isDrEnabled();
boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index b662476..bd2bae0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -42,7 +41,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
public static final int CACHE_MSG_IDX = nextIndexId();
/** ACK future versions. */
- @GridDirectCollection(GridCacheVersion.class)
+ @GridDirectCollection(Long.class)
private Collection<Long> futIds;
/** {@inheritDoc} */
@@ -105,7 +104,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
switch (writer.state()) {
case 3:
- if (!writer.writeCollection("futIds", futIds, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("futIds", futIds, MessageCollectionItemType.LONG))
return false;
writer.incrementState();
@@ -127,7 +126,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
switch (reader.state()) {
case 3:
- futIds = reader.readCollection("futIds", MessageCollectionItemType.MSG);
+ futIds = reader.readCollection("futIds", MessageCollectionItemType.LONG);
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index 628e1dc..4110b5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -26,10 +26,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.*;
+
/**
* TODO IGNITE-4705: no not send mapping if it == affinity?
*/
@@ -41,12 +44,6 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
public static final int CACHE_MSG_IDX = nextIndexId();
/** */
- private static final int HAS_RESULT_MASK = 0x1;
-
- /** */
- private static final int RESULT_SUCCESS_MASK = 0x2;
-
- /** */
private long futId;
/** */
@@ -67,12 +64,16 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
}
/**
+ * @param cacheId Cache ID.
* @param futId Future ID.
* @param mapping Update mapping.
+ * @param flags Flags.
*/
- public GridDhtAtomicNearResponse(long futId, List<UUID> mapping) {
+ public GridDhtAtomicNearResponse(int cacheId, long futId, List<UUID> mapping, byte flags) {
+ this.cacheId = cacheId;
this.futId = futId;
this.mapping = mapping;
+ this.flags = flags;
}
/**
@@ -87,28 +88,19 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
}
/**
- * @param success Success flag.
- */
- public void setResult(boolean success) {
- setFlag(true, HAS_RESULT_MASK);
-
- setFlag(success, RESULT_SUCCESS_MASK);
- }
-
- /**
* @return Operation result.
*/
public GridCacheReturn result() {
assert hasResult();
- return new GridCacheReturn(true, isFlag(RESULT_SUCCESS_MASK));
+ return new GridCacheReturn(true, isFlag(DHT_ATOMIC_RESULT_SUCCESS_MASK));
}
/**
* @return {@code True} if response contains operation result.
*/
public boolean hasResult() {
- return isFlag(HAS_RESULT_MASK);
+ return isFlag(DHT_ATOMIC_HAS_RESULT_MASK);
}
/**
@@ -144,6 +136,11 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
}
/** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
@Override public byte directType() {
return -45;
}
@@ -265,4 +262,9 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
return reader.afterMessageRead(GridDhtAtomicNearResponse.class);
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAtomicNearResponse.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index e46c843..678f3f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -48,9 +48,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** */
private static final long serialVersionUID = 0L;
- /** Near cache key flag. */
- private static final int NEAR_FLAG_MASK = 0x80;
-
/** Future ID on primary. */
protected long futId;
@@ -69,9 +66,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** Task name hash. */
protected int taskNameHash;
- /** Additional flags. */
- protected byte flags;
-
/** Key to update. */
@GridToStringInclude
protected KeyCacheObject key;
@@ -226,7 +220,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public boolean skipStore() {
- return isFlag(SKIP_STORE_FLAG_MASK);
+ return isFlag(DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -364,21 +358,21 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public boolean keepBinary() {
- return isFlag(KEEP_BINARY_FLAG_MASK);
+ return isFlag(DHT_ATOMIC_KEEP_BINARY_FLAG_MASK);
}
/**
*
*/
private boolean near() {
- return isFlag(NEAR_FLAG_MASK);
+ return isFlag(DHT_ATOMIC_NEAR_FLAG_MASK);
}
/**
*
*/
private void near(boolean near) {
- setFlag(near, NEAR_FLAG_MASK);
+ setFlag(near, DHT_ATOMIC_NEAR_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -425,12 +419,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
}
switch (writer.state()) {
- case 6:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
case 7:
if (!writer.writeLong("futId", futId))
return false;
@@ -513,14 +501,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
return false;
switch (reader.state()) {
- case 6:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 7:
futId = reader.readLong("futId");
@@ -658,26 +638,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
return 18;
}
- /**
- * Sets flag mask.
- *
- * @param flag Set or clear.
- * @param mask Mask.
- */
- private void setFlag(boolean flag, int mask) {
- flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
- }
-
- /**
- * Reags flag mask.
- *
- * @param mask Mask to read.
- * @return Flag value.
- */
- private boolean isFlag(int mask) {
- return (flags & mask) != 0;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtAtomicSingleUpdateRequest.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 9da6b2e..7a210ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -148,9 +148,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
/** Keep binary flag. */
private boolean keepBinary;
- /** Additional flags. */
- private byte flags;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -522,7 +519,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
/** {@inheritDoc} */
@Override public boolean skipStore() {
- return isFlag(SKIP_STORE_FLAG_MASK);
+ return isFlag(DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
}
/** {@inheritDoc} */
@@ -612,26 +609,20 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
}
switch (writer.state()) {
- case 6:
- if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
- return false;
-
- writer.incrementState();
-
case 7:
- if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
return false;
writer.incrementState();
case 8:
- if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 9:
- if (!writer.writeByte("flags", flags))
+ if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
@@ -766,16 +757,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
return false;
switch (reader.state()) {
- case 6:
- conflictExpireTimes = reader.readMessage("conflictExpireTimes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 7:
- conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+ conflictExpireTimes = reader.readMessage("conflictExpireTimes");
if (!reader.isLastRead())
return false;
@@ -783,7 +766,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
case 8:
- entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+ conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -791,7 +774,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
case 9:
- flags = reader.readByte("flags");
+ entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
return false;
@@ -975,27 +958,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
return 29;
}
- /**
- * Sets flag mask.
- *
- * @param flag Set or clear.
- * @param mask Mask.
- */
- private void setFlag(boolean flag, int mask) {
- flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
- }
-
- /**
- * Reags flag mask.
- *
- * @param mask Mask to read.
- * @return Flag value.
- */
- private boolean isFlag(int mask) {
- return (flags & mask) != 0;
- }
-
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtAtomicUpdateRequest.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
index 106612c..4d12198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -184,4 +185,9 @@ public class UpdateErrors implements Message {
@Override public void onAckReceived() {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(UpdateErrors.class, this);
+ }
}