You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/03 10:47:04 UTC
[1/8] ignite git commit: IGNITE-2532: WIP. Only refactorings for now.
Repository: ignite
Updated Branches:
refs/heads/ignite-2523 [created] 07c23931f
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
new file mode 100644
index 0000000..5de9884
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -0,0 +1,1044 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+
+/**
+ * Lite DHT cache update request sent from near node to primary node.
+ */
+public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
+ implements GridNearAtomicUpdateRequestInterface, GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Target node ID. */
+ @GridDirectTransient
+ private UUID nodeId;
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Fast map flag. */
+ private boolean fastMap;
+
+ /** Update version. Set to non-null if fastMap is {@code true}. */
+ private GridCacheVersion updateVer;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+ private boolean topLocked;
+
+ /** Write synchronization mode. */
+ private CacheWriteSynchronizationMode syncMode;
+
+ /** Update operation. */
+ private GridCacheOperation op;
+
+ /** Keys to update. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> keys;
+
+ /** Values to update. */
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> vals;
+
+ /** Entry processors. */
+ @GridDirectTransient
+ private List<EntryProcessor<Object, Object, Object>> entryProcessors;
+
+ /** Entry processors bytes. */
+ @GridDirectCollection(byte[].class)
+ private List<byte[]> entryProcessorsBytes;
+
+ /** Optional arguments for entry processor. */
+ @GridDirectTransient
+ private Object[] invokeArgs;
+
+ /** Entry processor arguments bytes. */
+ private byte[][] invokeArgsBytes;
+
+ /** Conflict versions. */
+ @GridDirectCollection(GridCacheVersion.class)
+ private List<GridCacheVersion> conflictVers;
+
+ /** Conflict TTLs. */
+ private GridLongList conflictTtls;
+
+ /** Conflict expire times. */
+ private GridLongList conflictExpireTimes;
+
+ /** Return value flag. */
+ private boolean retval;
+
+ /** Expiry policy. */
+ @GridDirectTransient
+ private ExpiryPolicy expiryPlc;
+
+ /** Expiry policy bytes. */
+ private byte[] expiryPlcBytes;
+
+ /** Filter. */
+ private CacheEntryPredicate[] filter;
+
+ /** Flag indicating whether request contains primary keys. */
+ private boolean hasPrimary;
+
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** Task name hash. */
+ private int taskNameHash;
+
+ /** Skip write-through to a persistent storage. */
+ private boolean skipStore;
+
+ /** */
+ private boolean clientReq;
+
+ /** Keep binary flag. */
+ private boolean keepBinary;
+
+ /** */
+ @GridDirectTransient
+ private GridNearAtomicUpdateResponse res;
+
+ /** Maximum possible size of inner collections. */
+ @GridDirectTransient
+ private int initSize;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridNearAtomicSingleUpdateRequest() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID.
+ * @param futVer Future version.
+ * @param fastMap Fast map scheme flag.
+ * @param updateVer Update version set if fast map is performed.
+ * @param topVer Topology version.
+ * @param topLocked Topology locked flag.
+ * @param syncMode Synchronization mode.
+ * @param op Cache update operation.
+ * @param retval Return value required flag.
+ * @param expiryPlc Expiry policy.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @param filter Optional filter for atomic check.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param skipStore Skip write-through to a persistent storage.
+ * @param keepBinary Keep binary flag.
+ * @param clientReq Client node request flag.
+ * @param addDepInfo Deployment info flag.
+ * @param maxEntryCnt Maximum entries count.
+ */
+ public GridNearAtomicSingleUpdateRequest(
+ int cacheId,
+ UUID nodeId,
+ GridCacheVersion futVer,
+ boolean fastMap,
+ @Nullable GridCacheVersion updateVer,
+ @NotNull AffinityTopologyVersion topVer,
+ boolean topLocked,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ boolean retval,
+ @Nullable ExpiryPolicy expiryPlc,
+ @Nullable Object[] invokeArgs,
+ @Nullable CacheEntryPredicate[] filter,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ boolean skipStore,
+ boolean keepBinary,
+ boolean clientReq,
+ boolean addDepInfo,
+ int maxEntryCnt
+ ) {
+ assert futVer != null;
+
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futVer = futVer;
+ this.fastMap = fastMap;
+ this.updateVer = updateVer;
+
+ this.topVer = topVer;
+ this.topLocked = topLocked;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.retval = retval;
+ this.expiryPlc = expiryPlc;
+ this.invokeArgs = invokeArgs;
+ this.filter = filter;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.skipStore = skipStore;
+ this.keepBinary = keepBinary;
+ this.clientReq = clientReq;
+ this.addDepInfo = addDepInfo;
+
+ // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
+ // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
+ // participate in request. As such, we know upper bound of all collections in request. If this bound is lower
+ // than 10, we use it.
+ initSize = Math.min(maxEntryCnt, 10);
+
+ keys = new ArrayList<>(initSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /**
+ * @return Mapped node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Subject ID.
+ */
+ public UUID subjectId() {
+ return subjId;
+ }
+
+ /**
+ * @return Task name hash.
+ */
+ public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return Future version.
+ */
+ public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /**
+ * @return Flag indicating whether this is fast-map udpate.
+ */
+ public boolean fastMap() {
+ return fastMap;
+ }
+
+ /**
+ * @return Update version for fast-map request.
+ */
+ public GridCacheVersion updateVersion() {
+ return updateVer;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Topology locked flag.
+ */
+ public boolean topologyLocked() {
+ return topLocked;
+ }
+
+ /**
+ * @return {@code True} if request sent from client node.
+ */
+ public boolean clientRequest() {
+ return clientReq;
+ }
+
+ /**
+ * @return Cache write synchronization mode.
+ */
+ public CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return syncMode;
+ }
+
+ /**
+ * @return Expiry policy.
+ */
+ public ExpiryPolicy expiry() {
+ return expiryPlc;
+ }
+
+ /**
+ * @return Return value flag.
+ */
+ public boolean returnValue() {
+ return retval;
+ }
+
+ /**
+ * @return Filter.
+ */
+ @Nullable public CacheEntryPredicate[] filter() {
+ return filter;
+ }
+
+ /**
+ * @return Skip write-through to a persistent storage.
+ */
+ public boolean skipStore() {
+ return skipStore;
+ }
+
+ /**
+ * @return Keep binary flag.
+ */
+ public boolean keepBinary() {
+ return keepBinary;
+ }
+
+ /**
+ * @param key Key to add.
+ * @param val Optional update value.
+ * @param conflictTtl Conflict TTL (optional).
+ * @param conflictExpireTime Conflict expire time (optional).
+ * @param conflictVer Conflict version (optional).
+ * @param primary If given key is primary on this mapping.
+ */
+ public void addUpdateEntry(KeyCacheObject key,
+ @Nullable Object val,
+ long conflictTtl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer,
+ boolean primary) {
+ EntryProcessor<Object, Object, Object> entryProcessor = null;
+
+ if (op == TRANSFORM) {
+ assert val instanceof EntryProcessor : val;
+
+ entryProcessor = (EntryProcessor<Object, Object, Object>) val;
+ }
+
+ assert val != null || op == DELETE;
+
+ keys.add(key);
+
+ if (entryProcessor != null) {
+ if (entryProcessors == null)
+ entryProcessors = new ArrayList<>(initSize);
+
+ entryProcessors.add(entryProcessor);
+ }
+ else if (val != null) {
+ assert val instanceof CacheObject : val;
+
+ if (vals == null)
+ vals = new ArrayList<>(initSize);
+
+ vals.add((CacheObject)val);
+ }
+
+ hasPrimary |= primary;
+
+ // In case there is no conflict, do not create the list.
+ if (conflictVer != null) {
+ if (conflictVers == null) {
+ conflictVers = new ArrayList<>(initSize);
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictVers.add(null);
+ }
+
+ conflictVers.add(conflictVer);
+ }
+ else if (conflictVers != null)
+ conflictVers.add(null);
+
+ if (conflictTtl >= 0) {
+ if (conflictTtls == null) {
+ conflictTtls = new GridLongList(keys.size());
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictTtls.add(CU.TTL_NOT_CHANGED);
+ }
+
+ conflictTtls.add(conflictTtl);
+ }
+
+ if (conflictExpireTime >= 0) {
+ if (conflictExpireTimes == null) {
+ conflictExpireTimes = new GridLongList(keys.size());
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
+ }
+
+ conflictExpireTimes.add(conflictExpireTime);
+ }
+ }
+
+ /**
+ * @return Keys for this update request.
+ */
+ public List<KeyCacheObject> keys() {
+ return keys;
+ }
+
+ /**
+ * @return Values for this update request.
+ */
+ public List<?> values() {
+ return op == TRANSFORM ? entryProcessors : vals;
+ }
+
+ /**
+ * @return Update operation.
+ */
+ public GridCacheOperation operation() {
+ return op;
+ }
+
+ /**
+ * @return Optional arguments for entry processor.
+ */
+ @Nullable public Object[] invokeArguments() {
+ return invokeArgs;
+ }
+
+ /**
+ * @param idx Key index.
+ * @return Value.
+ */
+ @SuppressWarnings("unchecked")
+ public CacheObject value(int idx) {
+ assert op == UPDATE : op;
+
+ return vals.get(idx);
+ }
+
+ /**
+ * @param idx Key index.
+ * @return Entry processor.
+ */
+ @SuppressWarnings("unchecked")
+ public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+ assert op == TRANSFORM : op;
+
+ return entryProcessors.get(idx);
+ }
+
+ /**
+ * @param idx Index to get.
+ * @return Write value - either value, or transform closure.
+ */
+ public CacheObject writeValue(int idx) {
+ if (vals != null)
+ return vals.get(idx);
+
+ return null;
+ }
+
+ /**
+ * @return Conflict versions.
+ */
+ @Nullable public List<GridCacheVersion> conflictVersions() {
+ return conflictVers;
+ }
+
+ /**
+ * @param idx Index.
+ * @return Conflict version.
+ */
+ @Nullable public GridCacheVersion conflictVersion(int idx) {
+ if (conflictVers != null) {
+ assert idx >= 0 && idx < conflictVers.size();
+
+ return conflictVers.get(idx);
+ }
+
+ return null;
+ }
+
+ /**
+ * @param idx Index.
+ * @return Conflict TTL.
+ */
+ public long conflictTtl(int idx) {
+ if (conflictTtls != null) {
+ assert idx >= 0 && idx < conflictTtls.size();
+
+ return conflictTtls.get(idx);
+ }
+
+ return CU.TTL_NOT_CHANGED;
+ }
+
+ /**
+ * @param idx Index.
+ * @return Conflict expire time.
+ */
+ public long conflictExpireTime(int idx) {
+ if (conflictExpireTimes != null) {
+ assert idx >= 0 && idx < conflictExpireTimes.size();
+
+ return conflictExpireTimes.get(idx);
+ }
+
+ return CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ /**
+ * @return Flag indicating whether this request contains primary keys.
+ */
+ public boolean hasPrimary() {
+ return hasPrimary;
+ }
+
+ /**
+ * @param res Response.
+ * @return {@code True} if current response was {@code null}.
+ */
+ public boolean onResponse(GridNearAtomicUpdateResponse res) {
+ if (this.res == null) {
+ this.res = res;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @return Response.
+ */
+ @Nullable public GridNearAtomicUpdateResponse response() {
+ return res;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalCacheObjects(keys, cctx);
+
+ if (filter != null) {
+ boolean hasFilter = false;
+
+ for (CacheEntryPredicate p : filter) {
+ if (p != null) {
+ hasFilter = true;
+
+ p.prepareMarshal(cctx);
+ }
+ }
+
+ if (!hasFilter)
+ filter = null;
+ }
+
+ if (expiryPlc != null && expiryPlcBytes == null)
+ expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+
+ if (op == TRANSFORM) {
+ // force addition of deployment info for entry processors if P2P is enabled globally.
+ if (!addDepInfo && ctx.deploymentEnabled())
+ addDepInfo = true;
+
+ if (entryProcessorsBytes == null)
+ entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+
+ if (invokeArgsBytes == null)
+ invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+ }
+ else
+ prepareMarshalCacheObjects(vals, cctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ finishUnmarshalCacheObjects(keys, cctx, ldr);
+
+ if (op == TRANSFORM) {
+ if (entryProcessors == null)
+ entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+ if (invokeArgs == null)
+ invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+ }
+ else
+ finishUnmarshalCacheObjects(vals, cctx, ldr);
+
+ if (filter != null) {
+ for (CacheEntryPredicate p : filter) {
+ if (p != null)
+ p.finishUnmarshal(cctx, ldr);
+ }
+ }
+
+ if (expiryPlcBytes != null && expiryPlc == null)
+ expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeBoolean("clientReq", clientReq))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeMessage("conflictTtls", conflictTtls))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeBoolean("fastMap", fastMap))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeMessage("futVer", futVer))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeBoolean("hasPrimary", hasPrimary))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
+ if (!writer.writeBoolean("keepBinary", keepBinary))
+ return false;
+
+ writer.incrementState();
+
+ case 15:
+ if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 16:
+ if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 17:
+ if (!writer.writeBoolean("retval", retval))
+ return false;
+
+ writer.incrementState();
+
+ case 18:
+ if (!writer.writeBoolean("skipStore", skipStore))
+ return false;
+
+ writer.incrementState();
+
+ case 19:
+ if (!writer.writeUuid("subjId", subjId))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 21:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 22:
+ if (!writer.writeBoolean("topLocked", topLocked))
+ return false;
+
+ writer.incrementState();
+
+ case 23:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
+ if (!writer.writeMessage("updateVer", updateVer))
+ return false;
+
+ writer.incrementState();
+
+ case 25:
+ if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ clientReq = reader.readBoolean("clientReq");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ conflictTtls = reader.readMessage("conflictTtls");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ fastMap = reader.readBoolean("fastMap");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ futVer = reader.readMessage("futVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ hasPrimary = reader.readBoolean("hasPrimary");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
+ keepBinary = reader.readBoolean("keepBinary");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 15:
+ keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 16:
+ byte opOrd;
+
+ opOrd = reader.readByte("op");
+
+ if (!reader.isLastRead())
+ return false;
+
+ op = GridCacheOperation.fromOrdinal(opOrd);
+
+ reader.incrementState();
+
+ case 17:
+ retval = reader.readBoolean("retval");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
+ skipStore = reader.readBoolean("skipStore");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
+ byte syncModeOrd;
+
+ syncModeOrd = reader.readByte("syncMode");
+
+ if (!reader.isLastRead())
+ return false;
+
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+ reader.incrementState();
+
+ case 21:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 22:
+ topLocked = reader.readBoolean("topLocked");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 23:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 24:
+ updateVer = reader.readMessage("updateVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 25:
+ vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearAtomicSingleUpdateRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -23;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 26;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicSingleUpdateRequest.class, this, "filter", Arrays.toString(filter),
+ "parent", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 519df17..8b1673f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -17,16 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -35,16 +26,12 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
-import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -57,35 +44,26 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import javax.cache.expiry.ExpiryPolicy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
/**
* DHT atomic cache near update future.
*/
-public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
- implements GridCacheAtomicFuture<Object>{
- /** Logger reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- protected static IgniteLogger log;
-
- /** Cache context. */
- private final GridCacheContext cctx;
-
- /** Cache. */
- private GridDhtAtomicCache cache;
-
- /** Update operation. */
- private final GridCacheOperation op;
-
+public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFuture
+ {
/** Keys */
private Collection<?> keys;
@@ -93,9 +71,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<?> vals;
- /** Optional arguments for entry processor. */
- private Object[] invokeArgs;
-
/** Conflict put values. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheDrInfo> conflictPutVals;
@@ -104,48 +79,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheVersion> conflictRmvVals;
- /** Return value require flag. */
- private final boolean retval;
-
- /** Expiry policy. */
- private final ExpiryPolicy expiryPlc;
-
- /** Optional filter. */
- private final CacheEntryPredicate[] filter;
-
- /** Write synchronization mode. */
- private final CacheWriteSynchronizationMode syncMode;
-
- /** Raw return value flag. */
- private final boolean rawRetval;
-
- /** Fast map flag. */
- private final boolean fastMap;
-
- /** Near cache flag. */
- private final boolean nearEnabled;
-
- /** Subject ID. */
- private final UUID subjId;
-
- /** Task name hash. */
- private final int taskNameHash;
-
- /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
- private boolean topLocked;
-
- /** Skip store flag. */
- private final boolean skipStore;
-
- /** */
- private final boolean keepBinary;
-
- /** Wait for topology future flag. */
- private final boolean waitTopFut;
-
- /** Remap count. */
- private int remapCnt;
-
/** State. */
private final UpdateState state;
@@ -191,66 +124,27 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
int remapCnt,
boolean waitTopFut
) {
- this.rawRetval = rawRetval;
+ super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
+ skipStore, keepBinary, remapCnt, waitTopFut);
assert vals == null || vals.size() == keys.size();
assert conflictPutVals == null || conflictPutVals.size() == keys.size();
assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
assert subjId != null;
- this.cctx = cctx;
- this.cache = cache;
- this.syncMode = syncMode;
- this.op = op;
this.keys = keys;
this.vals = vals;
- this.invokeArgs = invokeArgs;
this.conflictPutVals = conflictPutVals;
this.conflictRmvVals = conflictRmvVals;
- this.retval = retval;
- this.expiryPlc = expiryPlc;
- this.filter = filter;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.skipStore = skipStore;
- this.keepBinary = keepBinary;
- this.waitTopFut = waitTopFut;
-
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
-
- fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
- cctx.config().getAtomicWriteOrderMode() == CLOCK &&
- !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
-
- nearEnabled = CU.isNearEnabled(cctx);
-
- if (!waitTopFut)
- remapCnt = 1;
-
- this.remapCnt = remapCnt;
state = new UpdateState();
}
/** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
@Override public GridCacheVersion version() {
return state.futureVersion();
}
- /**
- * @return {@code True} if this future should block partition map exchange.
- */
- private boolean waitForPartitionExchange() {
- // Wait fast-map near atomic update futures in CLOCK mode.
- return fastMap;
- }
-
/** {@inheritDoc} */
@Override public Collection<?> keys() {
return keys;
@@ -263,16 +157,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return false;
}
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- // No-op.
- }
-
/**
* Performs future mapping.
*/
@@ -345,23 +229,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
/**
- * Updates near cache.
- *
- * @param req Update request.
- * @param res Update response.
- */
- private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
- assert nearEnabled;
-
- if (res.remapKeys() != null || !req.hasPrimary())
- return;
-
- GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
- near.processNearAtomicUpdateResponse(req, res);
- }
-
- /**
* Maps future on ready topology.
*/
private void mapOnTopology() {
@@ -418,35 +285,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
/**
- * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
- */
- private boolean storeFuture() {
- return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
- }
-
- /**
- * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
- * node and send updates in parallel to all participating nodes.
- *
- * @param key Key to map.
- * @param topVer Topology version to map.
- * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
- * @return Collection of nodes to which key is mapped.
- */
- private Collection<ClusterNode> mapKey(
- KeyCacheObject key,
- AffinityTopologyVersion topVer,
- boolean fastMap
- ) {
- GridCacheAffinityManager affMgr = cctx.affinity();
-
- // If we can send updates in parallel - do it.
- return fastMap ?
- cctx.topology().nodes(affMgr.partition(key), topVer) :
- Collections.singletonList(affMgr.primary(key, topVer));
- }
-
- /**
* Maps future to single node.
*
* @param nodeId Node ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 1a7fa88..340dbf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -57,7 +57,8 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPD
/**
* Lite DHT cache update request sent from near node to primary node.
*/
-public class GridNearAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+public class GridNearAtomicUpdateRequest extends GridCacheMessage
+ implements GridNearAtomicUpdateRequestInterface, GridCacheDeployable {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
new file mode 100644
index 0000000..9f17756
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Base interface for near atomic update interfaces.
+ */
+public interface GridNearAtomicUpdateRequestInterface {
+ public List<KeyCacheObject> keys();
+
+ public AffinityTopologyVersion topologyVersion();
+
+ public GridCacheVersion futureVersion();
+
+ public boolean returnValue();
+
+ public int taskNameHash();
+
+ /**
+ * @return Flag indicating whether this is fast-map udpate.
+ */
+ public boolean fastMap();
+
+ /**
+ * @return Update version for fast-map request.
+ */
+ public GridCacheVersion updateVersion();
+
+ public boolean clientRequest();
+
+ public boolean topologyLocked();
+
+ public ExpiryPolicy expiry();
+
+ public boolean skipStore();
+
+ public GridCacheOperation operation();
+
+ public CacheWriteSynchronizationMode writeSynchronizationMode();
+
+ public UUID subjectId();
+
+ @Nullable public Object[] invokeArguments();
+
+ public boolean keepBinary();
+
+ @Nullable public CacheEntryPredicate[] filter();
+
+ public UUID nodeId();
+
+ public void nodeId(UUID nodeId);
+
+ public boolean hasPrimary();
+
+ @Nullable public List<GridCacheVersion> conflictVersions();
+
+ @Nullable public GridCacheVersion conflictVersion(int idx);
+
+ public long conflictTtl(int idx);
+
+ public long conflictExpireTime(int idx);
+
+ public List<?> values();
+
+ public CacheObject value(int idx);
+
+ public long messageId();
+
+ public EntryProcessor<Object, Object, Object> entryProcessor(int idx);
+
+ public CacheObject writeValue(int idx);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 63c073d..2a91968 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -45,7 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequestInterface;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -127,7 +127,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
* @param res Update response.
*/
public void processNearAtomicUpdateResponse(
- GridNearAtomicUpdateRequest req,
+ GridNearAtomicUpdateRequestInterface req,
GridNearAtomicUpdateResponse res
) {
if (F.size(res.failedKeys()) == req.keys().size())
[4/8] ignite git commit: IGNITE-2532: WIP. Only refactorings for now.
Posted by vo...@apache.org.
IGNITE-2532: WIP. Only refactorings for now.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/29c2aee6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/29c2aee6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/29c2aee6
Branch: refs/heads/ignite-2523
Commit: 29c2aee6b6ffac4f27da3732575f7d82b0e8bedd
Parents: 4665283
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 11:09:13 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 11:09:13 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAtomicFuture.java | 6 -
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 5 -
.../GridNearAtomicSingleUpdateFuture.java | 1264 ------------------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 1179 ++++++++--------
4 files changed, 572 insertions(+), 1882 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/29c2aee6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 359909e..be24191 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.Collection;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -38,9 +37,4 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
* @return Future or {@code null} if no need to wait.
*/
public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer);
-
- /**
- * @return Future keys.
- */
- public Collection<?> keys();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/29c2aee6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 6891d3b..9fe60c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -201,11 +201,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
return null;
}
- /** {@inheritDoc} */
- @Override public Collection<KeyCacheObject> keys() {
- return keys;
- }
-
/**
* @param entry Entry to map.
* @param val Value to write.
http://git-wip-us.apache.org/repos/asf/ignite/blob/29c2aee6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
deleted file mode 100644
index d633e47..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ /dev/null
@@ -1,1264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
-import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
-
-/**
- * DHT atomic cache single near update future.
- */
-public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
- implements GridCacheAtomicFuture<Object>{
- /** Logger reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- protected static IgniteLogger log;
-
- /** Cache context. */
- private final GridCacheContext cctx;
-
- /** Cache. */
- private GridDhtAtomicCache cache;
-
- /** Update operation. */
- private final GridCacheOperation op;
-
- /** Keys */
- private Collection<?> keys;
-
- /** Values. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- private Collection<?> vals;
-
- /** Optional arguments for entry processor. */
- private Object[] invokeArgs;
-
- /** Conflict put values. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- private Collection<GridCacheDrInfo> conflictPutVals;
-
- /** Conflict remove values. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- private Collection<GridCacheVersion> conflictRmvVals;
-
- /** Return value require flag. */
- private final boolean retval;
-
- /** Expiry policy. */
- private final ExpiryPolicy expiryPlc;
-
- /** Optional filter. */
- private final CacheEntryPredicate[] filter;
-
- /** Write synchronization mode. */
- private final CacheWriteSynchronizationMode syncMode;
-
- /** Raw return value flag. */
- private final boolean rawRetval;
-
- /** Fast map flag. */
- private final boolean fastMap;
-
- /** Near cache flag. */
- private final boolean nearEnabled;
-
- /** Subject ID. */
- private final UUID subjId;
-
- /** Task name hash. */
- private final int taskNameHash;
-
- /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
- private boolean topLocked;
-
- /** Skip store flag. */
- private final boolean skipStore;
-
- /** */
- private final boolean keepBinary;
-
- /** Wait for topology future flag. */
- private final boolean waitTopFut;
-
- /** Remap count. */
- private int remapCnt;
-
- /** State. */
- private final UpdateState state;
-
- /**
- * @param cctx Cache context.
- * @param cache Cache instance.
- * @param syncMode Write synchronization mode.
- * @param op Update operation.
- * @param keys Keys to update.
- * @param vals Values or transform closure.
- * @param invokeArgs Optional arguments for entry processor.
- * @param conflictPutVals Conflict put values (optional).
- * @param conflictRmvVals Conflict remove values (optional).
- * @param retval Return value require flag.
- * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
- * @param expiryPlc Expiry policy explicitly specified for cache operation.
- * @param filter Entry filter.
- * @param subjId Subject ID.
- * @param taskNameHash Task name hash code.
- * @param skipStore Skip store flag.
- * @param keepBinary Keep binary flag.
- * @param remapCnt Maximum number of retries.
- * @param waitTopFut If {@code false} does not wait for affinity change future.
- */
- public GridNearAtomicSingleUpdateFuture(
- GridCacheContext cctx,
- GridDhtAtomicCache cache,
- CacheWriteSynchronizationMode syncMode,
- GridCacheOperation op,
- Collection<?> keys,
- @Nullable Collection<?> vals,
- @Nullable Object[] invokeArgs,
- @Nullable Collection<GridCacheDrInfo> conflictPutVals,
- @Nullable Collection<GridCacheVersion> conflictRmvVals,
- final boolean retval,
- final boolean rawRetval,
- @Nullable ExpiryPolicy expiryPlc,
- final CacheEntryPredicate[] filter,
- UUID subjId,
- int taskNameHash,
- boolean skipStore,
- boolean keepBinary,
- int remapCnt,
- boolean waitTopFut
- ) {
- this.rawRetval = rawRetval;
-
- assert vals == null || vals.size() == keys.size();
- assert conflictPutVals == null || conflictPutVals.size() == keys.size();
- assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
- assert subjId != null;
-
- this.cctx = cctx;
- this.cache = cache;
- this.syncMode = syncMode;
- this.op = op;
- this.keys = keys;
- this.vals = vals;
- this.invokeArgs = invokeArgs;
- this.conflictPutVals = conflictPutVals;
- this.conflictRmvVals = conflictRmvVals;
- this.retval = retval;
- this.expiryPlc = expiryPlc;
- this.filter = filter;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.skipStore = skipStore;
- this.keepBinary = keepBinary;
- this.waitTopFut = waitTopFut;
-
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
-
- fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
- cctx.config().getAtomicWriteOrderMode() == CLOCK &&
- !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
-
- nearEnabled = CU.isNearEnabled(cctx);
-
- if (!waitTopFut)
- remapCnt = 1;
-
- this.remapCnt = remapCnt;
-
- state = new UpdateState();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return state.futureVersion();
- }
-
- /**
- * @return {@code True} if this future should block partition map exchange.
- */
- private boolean waitForPartitionExchange() {
- // Wait fast-map near atomic update futures in CLOCK mode.
- return fastMap;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<?> keys() {
- return keys;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- state.onNodeLeft(nodeId);
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- // No-op.
- }
-
- /**
- * Performs future mapping.
- */
- public void map() {
- AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
-
- if (topVer == null)
- mapOnTopology();
- else {
- topLocked = true;
-
- // Cannot remap.
- remapCnt = 1;
-
- state.map(topVer, null);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
- if (waitForPartitionExchange()) {
- GridFutureAdapter<Void> fut = state.completeFuture(topVer);
-
- if (fut != null && isDone()) {
- fut.onDone();
-
- return null;
- }
-
- return fut;
- }
-
- return null;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ConstantConditions")
- @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
- assert res == null || res instanceof GridCacheReturn;
-
- GridCacheReturn ret = (GridCacheReturn)res;
-
- Object retval =
- res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
- cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
-
- if (op == TRANSFORM && retval == null)
- retval = Collections.emptyMap();
-
- if (super.onDone(retval, err)) {
- GridCacheVersion futVer = state.onFutureDone();
-
- if (futVer != null)
- cctx.mvcc().removeAtomicFuture(futVer);
-
- return true;
- }
-
- return false;
- }
-
- /**
- * Response callback.
- *
- * @param nodeId Node ID.
- * @param res Update response.
- */
- public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
- state.onResult(nodeId, res, false);
- }
-
- /**
- * Updates near cache.
- *
- * @param req Update request.
- * @param res Update response.
- */
- private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
- assert nearEnabled;
-
- if (res.remapKeys() != null || !req.hasPrimary())
- return;
-
- GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
- near.processNearAtomicUpdateResponse(req, res);
- }
-
- /**
- * Maps future on ready topology.
- */
- private void mapOnTopology() {
- cache.topology().readLock();
-
- AffinityTopologyVersion topVer = null;
-
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
-
- return;
- }
-
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
-
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
-
- if (err != null) {
- onDone(err);
-
- return;
- }
-
- topVer = fut.topologyVersion();
- }
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
- }
- else
- onDone(new GridCacheTryPutFailedException());
-
- return;
- }
- }
- finally {
- cache.topology().readUnlock();
- }
-
- state.map(topVer, null);
- }
-
- /**
- * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
- */
- private boolean storeFuture() {
- return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
- }
-
- /**
- * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
- * node and send updates in parallel to all participating nodes.
- *
- * @param key Key to map.
- * @param topVer Topology version to map.
- * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
- * @return Collection of nodes to which key is mapped.
- */
- private Collection<ClusterNode> mapKey(
- KeyCacheObject key,
- AffinityTopologyVersion topVer,
- boolean fastMap
- ) {
- GridCacheAffinityManager affMgr = cctx.affinity();
-
- // If we can send updates in parallel - do it.
- return fastMap ?
- cctx.topology().nodes(affMgr.partition(key), topVer) :
- Collections.singletonList(affMgr.primary(key, topVer));
- }
-
- /**
- * Maps future to single node.
- *
- * @param nodeId Node ID.
- * @param req Request.
- */
- private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
- if (cctx.localNodeId().equals(nodeId)) {
- cache.updateAllAsyncInternal(nodeId, req,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
- onResult(res.nodeId(), res);
- }
- });
- }
- else {
- try {
- if (log.isDebugEnabled())
- log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
-
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
-
- if (syncMode == FULL_ASYNC)
- onDone(new GridCacheReturn(cctx, true, true, null, true));
- }
- catch (IgniteCheckedException e) {
- state.onSendError(req, e);
- }
- }
- }
-
- /**
- * Sends messages to remote nodes and updates local cache.
- *
- * @param mappings Mappings to send.
- */
- private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
- UUID locNodeId = cctx.localNodeId();
-
- GridNearAtomicUpdateRequest locUpdate = null;
-
- // Send messages to remote nodes first, then run local update.
- for (GridNearAtomicUpdateRequest req : mappings.values()) {
- if (locNodeId.equals(req.nodeId())) {
- assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
- ", req=" + req + ']';
-
- locUpdate = req;
- }
- else {
- try {
- if (log.isDebugEnabled())
- log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
-
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- state.onSendError(req, e);
- }
- }
- }
-
- if (locUpdate != null) {
- cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
- onResult(res.nodeId(), res);
- }
- });
- }
-
- if (syncMode == FULL_ASYNC)
- onDone(new GridCacheReturn(cctx, true, true, null, true));
- }
-
- /**
- *
- */
- private class UpdateState {
- /** Current topology version. */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
-
- /** */
- private GridCacheVersion updVer;
-
- /** Topology version when got mapping error. */
- private AffinityTopologyVersion mapErrTopVer;
-
- /** Mappings if operations is mapped to more than one node. */
- @GridToStringInclude
- private Map<UUID, GridNearAtomicUpdateRequest> mappings;
-
- /** */
- private int resCnt;
-
- /** Error. */
- private CachePartialUpdateCheckedException err;
-
- /** Future ID. */
- private GridCacheVersion futVer;
-
- /** Completion future for a particular topology version. */
- private GridFutureAdapter<Void> topCompleteFut;
-
- /** Keys to remap. */
- private Collection<KeyCacheObject> remapKeys;
-
- /** Not null is operation is mapped to single node. */
- private GridNearAtomicUpdateRequest singleReq;
-
- /** Operation result. */
- private GridCacheReturn opRes;
-
- /**
- * @return Future version.
- */
- @Nullable synchronized GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /**
- * @param nodeId Left node ID.
- */
- void onNodeLeft(UUID nodeId) {
- GridNearAtomicUpdateResponse res = null;
-
- synchronized (this) {
- GridNearAtomicUpdateRequest req;
-
- if (singleReq != null)
- req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
- else
- req = mappings != null ? mappings.get(nodeId) : null;
-
- if (req != null && req.response() == null) {
- res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- nodeId,
- req.futureVersion(),
- cctx.deploymentEnabled());
-
- ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
- "before response is received: " + nodeId);
-
- e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
-
- res.addFailedKeys(req.keys(), e);
- }
- }
-
- if (res != null)
- onResult(nodeId, res, true);
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- * @param nodeErr {@code True} if response was created on node failure.
- */
- @SuppressWarnings("unchecked")
- void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
- GridNearAtomicUpdateRequest req;
-
- AffinityTopologyVersion remapTopVer = null;
-
- GridCacheReturn opRes0 = null;
- CachePartialUpdateCheckedException err0 = null;
-
- boolean rcvAll;
-
- GridFutureAdapter<?> fut0 = null;
-
- synchronized (this) {
- if (!res.futureVersion().equals(futVer))
- return;
-
- if (singleReq != null) {
- if (!singleReq.nodeId().equals(nodeId))
- return;
-
- req = singleReq;
-
- singleReq = null;
-
- rcvAll = true;
- }
- else {
- req = mappings != null ? mappings.get(nodeId) : null;
-
- if (req != null && req.onResponse(res)) {
- resCnt++;
-
- rcvAll = mappings.size() == resCnt;
- }
- else
- return;
- }
-
- assert req != null && req.topologyVersion().equals(topVer) : req;
-
- if (res.remapKeys() != null) {
- assert !fastMap || cctx.kernalContext().clientNode();
-
- if (remapKeys == null)
- remapKeys = U.newHashSet(res.remapKeys().size());
-
- remapKeys.addAll(res.remapKeys());
-
- if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
- mapErrTopVer = req.topologyVersion();
- }
- else if (res.error() != null) {
- if (res.failedKeys() != null)
- addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
- }
- else {
- if (!req.fastMap() || req.hasPrimary()) {
- GridCacheReturn ret = res.returnValue();
-
- if (op == TRANSFORM) {
- if (ret != null)
- addInvokeResults(ret);
- }
- else
- opRes = ret;
- }
- }
-
- if (rcvAll) {
- if (remapKeys != null) {
- assert mapErrTopVer != null;
-
- remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
- }
- else {
- if (err != null &&
- X.hasCause(err, CachePartialUpdateCheckedException.class) &&
- X.hasCause(err, ClusterTopologyCheckedException.class) &&
- storeFuture() &&
- --remapCnt > 0) {
- ClusterTopologyCheckedException topErr =
- X.cause(err, ClusterTopologyCheckedException.class);
-
- if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
- CachePartialUpdateCheckedException cause =
- X.cause(err, CachePartialUpdateCheckedException.class);
-
- assert cause != null && cause.topologyVersion() != null : err;
-
- remapTopVer =
- new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
-
- err = null;
-
- Collection<Object> failedKeys = cause.failedKeys();
-
- remapKeys = new ArrayList<>(failedKeys.size());
-
- for (Object key : failedKeys)
- remapKeys.add(cctx.toCacheKeyObject(key));
-
- updVer = null;
- }
- }
- }
-
- if (remapTopVer == null) {
- err0 = err;
- opRes0 = opRes;
- }
- else {
- fut0 = topCompleteFut;
-
- topCompleteFut = null;
-
- cctx.mvcc().removeAtomicFuture(futVer);
-
- futVer = null;
- topVer = AffinityTopologyVersion.ZERO;
- }
- }
- }
-
- if (res.error() != null && res.failedKeys() == null) {
- onDone(res.error());
-
- return;
- }
-
- if (rcvAll && nearEnabled) {
- if (mappings != null) {
- for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = req0.response();
-
- assert res0 != null : req0;
-
- updateNear(req0, res0);
- }
- }
- else if (!nodeErr)
- updateNear(req, res);
- }
-
- if (remapTopVer != null) {
- if (fut0 != null)
- fut0.onDone();
-
- if (!waitTopFut) {
- onDone(new GridCacheTryPutFailedException());
-
- return;
- }
-
- if (topLocked) {
- assert !F.isEmpty(remapKeys) : remapKeys;
-
- CachePartialUpdateCheckedException e =
- new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
-
- ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
- "Failed to update keys, topology changed while execute atomic update inside transaction.");
-
- cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
-
- e.add(remapKeys, cause);
-
- onDone(e);
-
- return;
- }
-
- IgniteInternalFuture<AffinityTopologyVersion> fut =
- cctx.shared().exchange().affinityReadyFuture(remapTopVer);
-
- if (fut == null)
- fut = new GridFinishedFuture<>(remapTopVer);
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- try {
- AffinityTopologyVersion topVer = fut.get();
-
- map(topVer, remapKeys);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- }
- });
- }
- });
-
- return;
- }
-
- if (rcvAll)
- onDone(opRes0, err0);
- }
-
- /**
- * @param req Request.
- * @param e Error.
- */
- void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
- synchronized (this) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- req.nodeId(),
- req.futureVersion(),
- cctx.deploymentEnabled());
-
- res.addFailedKeys(req.keys(), e);
-
- onResult(req.nodeId(), res, true);
- }
- }
-
- /**
- * @param topVer Topology version.
- * @param remapKeys Keys to remap.
- */
- void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
- Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
-
- if (F.isEmpty(topNodes)) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid)."));
-
- return;
- }
-
- Exception err = null;
- GridNearAtomicUpdateRequest singleReq0 = null;
- Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
-
- int size = keys.size();
-
- GridCacheVersion futVer = cctx.versions().next(topVer);
-
- GridCacheVersion updVer;
-
- // Assign version on near node in CLOCK ordering mode even if fastMap is false.
- if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
- updVer = this.updVer;
-
- if (updVer == null) {
- updVer = cctx.versions().next(topVer);
-
- if (log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
- }
- }
- else
- updVer = null;
-
- try {
- if (size == 1 && !fastMap) {
- assert remapKeys == null || remapKeys.size() == 1;
-
- singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
- }
- else {
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
- topVer,
- futVer,
- updVer,
- remapKeys);
-
- if (pendingMappings.size() == 1)
- singleReq0 = F.firstValue(pendingMappings);
- else {
- if (syncMode == PRIMARY_SYNC) {
- mappings0 = U.newHashMap(pendingMappings.size());
-
- for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
- if (req.hasPrimary())
- mappings0.put(req.nodeId(), req);
- }
- }
- else
- mappings0 = pendingMappings;
-
- assert !mappings0.isEmpty() || size == 0 : GridNearAtomicSingleUpdateFuture.this;
- }
- }
-
- synchronized (this) {
- assert this.futVer == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
-
- this.topVer = topVer;
- this.updVer = updVer;
- this.futVer = futVer;
-
- resCnt = 0;
-
- singleReq = singleReq0;
- mappings = mappings0;
-
- this.remapKeys = null;
- }
- }
- catch (Exception e) {
- err = e;
- }
-
- if (err != null) {
- onDone(err);
-
- return;
- }
-
- if (storeFuture()) {
- if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicSingleUpdateFuture.this)) {
- assert isDone() : GridNearAtomicSingleUpdateFuture.this;
-
- return;
- }
- }
-
- // Optimize mapping for single key.
- if (singleReq0 != null)
- mapSingle(singleReq0.nodeId(), singleReq0);
- else {
- assert mappings0 != null;
-
- if (size == 0)
- onDone(new GridCacheReturn(cctx, true, true, null, true));
- else
- doUpdate(mappings0);
- }
- }
-
- /**
- * @param topVer Topology version.
- * @return Future.
- */
- @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
- if (this.topVer == AffinityTopologyVersion.ZERO)
- return null;
-
- if (this.topVer.compareTo(topVer) < 0) {
- if (topCompleteFut == null)
- topCompleteFut = new GridFutureAdapter<>();
-
- return topCompleteFut;
- }
-
- return null;
- }
-
- /**
- * @return Future version.
- */
- GridCacheVersion onFutureDone() {
- GridCacheVersion ver0;
-
- GridFutureAdapter<Void> fut0;
-
- synchronized (this) {
- fut0 = topCompleteFut;
-
- topCompleteFut = null;
-
- ver0 = futVer;
-
- futVer = null;
- }
-
- if (fut0 != null)
- fut0.onDone();
-
- return ver0;
- }
-
- /**
- * @param topNodes Cache nodes.
- * @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
- * @param remapKeys Keys to remap.
- * @return Mapping.
- * @throws Exception If failed.
- */
- private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
- AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer,
- @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
- Iterator<?> it = null;
-
- if (vals != null)
- it = vals.iterator();
-
- Iterator<GridCacheDrInfo> conflictPutValsIt = null;
-
- if (conflictPutVals != null)
- conflictPutValsIt = conflictPutVals.iterator();
-
- Iterator<GridCacheVersion> conflictRmvValsIt = null;
-
- if (conflictRmvVals != null)
- conflictRmvValsIt = conflictRmvVals.iterator();
-
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
-
- // Create mappings first, then send messages.
- for (Object key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
-
- Object val;
- GridCacheVersion conflictVer;
- long conflictTtl;
- long conflictExpireTime;
-
- if (vals != null) {
- val = it.next();
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-
- if (val == null)
- throw new NullPointerException("Null value.");
- }
- else if (conflictPutVals != null) {
- GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
-
- val = conflictPutVal.valueEx();
- conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
- conflictExpireTime = conflictPutVal.expireTime();
- }
- else if (conflictRmvVals != null) {
- val = null;
- conflictVer = conflictRmvValsIt.next();
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else {
- val = null;
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
-
- if (val == null && op != GridCacheOperation.DELETE)
- continue;
-
- KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
-
- if (remapKeys != null && !remapKeys.contains(cacheKey))
- continue;
-
- if (op != TRANSFORM)
- val = cctx.toCacheObject(val);
-
- Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
-
- if (affNodes.isEmpty())
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
-
- int i = 0;
-
- for (ClusterNode affNode : affNodes) {
- if (affNode == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
-
- UUID nodeId = affNode.id();
-
- GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
-
- if (mapped == null) {
- mapped = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- nodeId,
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- keys.size());
-
- pendingMappings.put(nodeId, mapped);
- }
-
- mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
-
- i++;
- }
- }
-
- return pendingMappings;
- }
-
- /**
- * @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
- * @return Request.
- * @throws Exception If failed.
- */
- private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer) throws Exception {
- Object key = F.first(keys);
-
- Object val;
- GridCacheVersion conflictVer;
- long conflictTtl;
- long conflictExpireTime;
-
- if (vals != null) {
- // Regular PUT.
- val = F.first(vals);
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else if (conflictPutVals != null) {
- // Conflict PUT.
- GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
-
- val = conflictPutVal.valueEx();
- conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
- conflictExpireTime = conflictPutVal.expireTime();
- }
- else if (conflictRmvVals != null) {
- // Conflict REMOVE.
- val = null;
- conflictVer = F.first(conflictRmvVals);
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else {
- // Regular REMOVE.
- val = null;
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
-
- // We still can get here if user pass map with single element.
- if (key == null)
- throw new NullPointerException("Null key.");
-
- if (val == null && op != GridCacheOperation.DELETE)
- throw new NullPointerException("Null value.");
-
- KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
-
- if (op != TRANSFORM)
- val = cctx.toCacheObject(val);
-
- ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
-
- if (primary == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid).");
-
- GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- primary.id(),
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- 1);
-
- req.addUpdateEntry(cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer,
- true);
-
- return req;
- }
-
- /**
- * @param ret Result from single node.
- */
- @SuppressWarnings("unchecked")
- private void addInvokeResults(GridCacheReturn ret) {
- assert op == TRANSFORM : op;
- assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
- if (ret.value() != null) {
- if (opRes != null)
- opRes.mergeEntryProcessResults(ret);
- else
- opRes = ret;
- }
- }
-
- /**
- * @param failedKeys Failed keys.
- * @param topVer Topology version for failed update.
- * @param err Error cause.
- */
- private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
- AffinityTopologyVersion topVer,
- Throwable err) {
- CachePartialUpdateCheckedException err0 = this.err;
-
- if (err0 == null)
- err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
-
- Collection<Object> keys = new ArrayList<>(failedKeys.size());
-
- for (KeyCacheObject key : failedKeys)
- keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
-
- err0.add(keys, err, topVer);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized String toString() {
- return S.toString(UpdateState.class, this);
- }
- }
-
- /** {@inheritDoc} */
- public String toString() {
- return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
- }
-}
[5/8] ignite git commit: IGNITE-2532: Single update request is
finalyl wired up. Though, it is not optimzied yet.
Posted by vo...@apache.org.
IGNITE-2532: Single update request is finalyl wired up. Though, it is not optimzied yet.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a1a31d2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a1a31d2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a1a31d2
Branch: refs/heads/ignite-2523
Commit: 2a1a31d2d0999fdb8854a05fd7a75a4cd0b159a4
Parents: 29c2aee
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 11:36:39 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 11:36:39 2016 +0300
----------------------------------------------------------------------
.../GridNearAbstractAtomicUpdateFuture.java | 4 +
.../dht/atomic/GridNearAtomicUpdateFuture.java | 142 +++++++++++++------
.../dht/atomic/GridNearAtomicUpdateRequest.java | 3 +-
.../GridNearAtomicUpdateRequestInterface.java | 4 +
4 files changed, 107 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
index 60e0c5f..f8c6810 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -53,6 +54,9 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
*/
public abstract class GridNearAbstractAtomicUpdateFuture extends GridFutureAdapter<Object>
implements GridCacheAtomicFuture<Object> {
+ /** */
+ public static final IgniteProductVersion SINGLE_PUT_MSG_SINCE = IgniteProductVersion.fromString("1.6.0");
+
/** Logger reference. */
protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 2aa510d..493c765 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -108,7 +108,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
private Collection<KeyCacheObject> remapKeys;
/** Not null is operation is mapped to single node. */
- private GridNearAtomicUpdateRequest singleReq;
+ private GridNearAtomicUpdateRequestInterface singleReq;
/** Operation result. */
private GridCacheReturn opRes;
@@ -195,7 +195,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
GridNearAtomicUpdateResponse res = null;
synchronized (this) {
- GridNearAtomicUpdateRequest req;
+ GridNearAtomicUpdateRequestInterface req;
if (singleReq != null)
req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
@@ -339,11 +339,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
* @param nodeId Node ID.
* @param req Request.
*/
- private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequestInterface req) {
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(nodeId, req,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ new CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
onResult(res.nodeId(), res);
}
});
@@ -353,7 +353,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ if (req instanceof GridNearAtomicUpdateRequest)
+ cctx.io().send(req.nodeId(), (GridNearAtomicUpdateRequest)req, cctx.ioPolicy());
+ else {
+ assert req instanceof GridNearAtomicSingleUpdateRequest;
+
+ cctx.io().send(req.nodeId(), (GridNearAtomicSingleUpdateRequest)req, cctx.ioPolicy());
+ }
if (syncMode == FULL_ASYNC)
onDone(new GridCacheReturn(cctx, true, true, null, true));
@@ -372,7 +378,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
UUID locNodeId = cctx.localNodeId();
- GridNearAtomicUpdateRequest locUpdate = null;
+ GridNearAtomicUpdateRequestInterface locUpdate = null;
// Send messages to remote nodes first, then run local update.
for (GridNearAtomicUpdateRequest req : mappings.values()) {
@@ -415,7 +421,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
*/
@SuppressWarnings("unchecked")
void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
- GridNearAtomicUpdateRequest req;
+ GridNearAtomicUpdateRequestInterface req;
AffinityTopologyVersion remapTopVer = null;
@@ -545,7 +551,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
if (rcvAll && nearEnabled) {
if (mappings != null) {
- for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+ for (GridNearAtomicUpdateRequestInterface req0 : mappings.values()) {
GridNearAtomicUpdateResponse res0 = req0.response();
assert res0 != null : req0;
@@ -619,7 +625,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
* @param req Request.
* @param e Error.
*/
- void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+ void onSendError(GridNearAtomicUpdateRequestInterface req, IgniteCheckedException e) {
synchronized (this) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
req.nodeId(),
@@ -647,7 +653,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
Exception err = null;
- GridNearAtomicUpdateRequest singleReq0 = null;
+ GridNearAtomicUpdateRequestInterface singleReq0 = null;
Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
int size = keys.size();
@@ -674,7 +680,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
if (size == 1 && !fastMap) {
assert remapKeys == null || remapKeys.size() == 1;
- singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+ singleReq0 = mapSingleUpdate(topVer, topNodes, futVer, updVer);
}
else {
Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
@@ -799,6 +805,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
* @return Mapping.
* @throws Exception If failed.
*/
+ @SuppressWarnings("ConstantConditions")
private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
AffinityTopologyVersion topVer,
GridCacheVersion futVer,
@@ -926,14 +933,14 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
/**
* @param topVer Topology version.
+ * @param topNodes Topology nodes.
* @param futVer Future version.
* @param updVer Update version.
* @return Request.
* @throws Exception If failed.
*/
- private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer) throws Exception {
+ private GridNearAtomicUpdateRequestInterface mapSingleUpdate(AffinityTopologyVersion topVer,
+ Collection<ClusterNode> topNodes, GridCacheVersion futVer, @Nullable GridCacheVersion updVer) throws Exception {
Object key = F.first(keys);
Object val;
@@ -990,36 +997,81 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
"left the grid).");
- GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- primary.id(),
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- 1);
-
- req.addUpdateEntry(cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer,
- true);
-
- return req;
+ // Decide whether we will use optimzied version of update request.
+ boolean optimize = true;
+
+ for (ClusterNode topNode : topNodes) {
+ if (topNode.version().compareTo(SINGLE_PUT_MSG_SINCE) < 0) {
+ optimize = false;
+
+ break;
+ }
+ }
+
+ if (optimize) {
+ GridNearAtomicSingleUpdateRequest req = new GridNearAtomicSingleUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ 1);
+
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
+
+ return req;
+ }
+ else {
+ GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ 1);
+
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
+
+ return req;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 340dbf6..674a5be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -373,6 +373,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage
* @param conflictVer Conflict version (optional).
* @param primary If given key is primary on this mapping.
*/
+ @SuppressWarnings("unchecked")
public void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
@@ -384,7 +385,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage
if (op == TRANSFORM) {
assert val instanceof EntryProcessor : val;
- entryProcessor = (EntryProcessor<Object, Object, Object>) val;
+ entryProcessor = (EntryProcessor<Object, Object, Object>)val;
}
assert val != null || op == DELETE;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a1a31d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
index 9f17756..2ef4bae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestInterface.java
@@ -98,4 +98,8 @@ public interface GridNearAtomicUpdateRequestInterface {
public EntryProcessor<Object, Object, Object> entryProcessor(int idx);
public CacheObject writeValue(int idx);
+
+ @Nullable public GridNearAtomicUpdateResponse response();
+
+ public boolean onResponse(GridNearAtomicUpdateResponse res);
}
[2/8] ignite git commit: IGNITE-2532: WIP. Only refactorings for now.
Posted by vo...@apache.org.
IGNITE-2532: WIP. Only refactorings for now.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46652832
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46652832
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46652832
Branch: refs/heads/ignite-2523
Commit: 466528329942f009c9e591a16b9453b95830b0cb
Parents: e6acce6
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 10:24:01 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 10:24:01 2016 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 6 +
.../processors/cache/GridCacheIoManager.java | 17 +
.../dht/atomic/GridDhtAtomicCache.java | 42 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 12 +-
.../GridNearAbstractAtomicUpdateFuture.java | 248 ++++
.../GridNearAtomicSingleUpdateFuture.java | 1264 ++++++++++++++++++
.../GridNearAtomicSingleUpdateRequest.java | 1044 +++++++++++++++
.../dht/atomic/GridNearAtomicUpdateFuture.java | 186 +--
.../dht/atomic/GridNearAtomicUpdateRequest.java | 3 +-
.../GridNearAtomicUpdateRequestInterface.java | 101 ++
.../distributed/near/GridNearAtomicCache.java | 4 +-
11 files changed, 2725 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3c7f378..25e07b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlock
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
@@ -726,6 +727,11 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case -23:
+ msg = new GridNearAtomicSingleUpdateRequest();
+
+ break;
+
// [-3..119] [124] - this
// [120..123] - DR
// [-4..-22] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b297827..57545af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
@@ -586,6 +587,22 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
+ case -23: {
+ GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
+
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+ ctx.cacheId(),
+ nodeId,
+ req.futureVersion(),
+ ctx.deploymentEnabled());
+
+ res.error(req.classError());
+
+ sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ }
+
+ break;
+
default:
throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
+ msg + "]", msg.classError());
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f6f57ee..cdaa061 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -139,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
/** Update reply closure. */
- private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
+ private CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> updateReplyClos;
/** Pending */
private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
@@ -192,9 +192,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ updateReplyClos = new CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse>() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ @Override public void apply(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
assert req.writeSynchronizationMode() != FULL_ASYNC : req;
@@ -256,6 +256,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
+ ctx.io().addHandler(ctx.cacheId(), GridNearAtomicSingleUpdateRequest.class, new CI2<UUID, GridNearAtomicSingleUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridNearAtomicSingleUpdateRequest req) {
+ processNearAtomicUpdateRequest(nodeId, req);
+ }
+ });
+
ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() {
@Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
processNearAtomicUpdateResponse(nodeId, res);
@@ -1304,8 +1310,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
public void updateAllAsyncInternal(
final UUID nodeId,
- final GridNearAtomicUpdateRequest req,
- final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ final GridNearAtomicUpdateRequestInterface req,
+ final CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb
) {
IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
@@ -1329,8 +1335,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
public void updateAllAsyncInternal0(
UUID nodeId,
- GridNearAtomicUpdateRequest req,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ GridNearAtomicUpdateRequestInterface req,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
ctx.deploymentEnabled());
@@ -1552,12 +1558,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private UpdateBatchResult updateWithBatch(
ClusterNode node,
boolean hasNear,
- GridNearAtomicUpdateRequest req,
+ GridNearAtomicUpdateRequestInterface req,
GridNearAtomicUpdateResponse res,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
@@ -1968,12 +1974,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private UpdateSingleResult updateSingle(
ClusterNode node,
boolean hasNear,
- GridNearAtomicUpdateRequest req,
+ GridNearAtomicUpdateRequestInterface req,
GridNearAtomicUpdateResponse res,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
@@ -2208,8 +2214,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable Collection<KeyCacheObject> rmvKeys,
@Nullable Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
- final GridNearAtomicUpdateRequest req,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
+ final GridNearAtomicUpdateRequestInterface req,
final GridNearAtomicUpdateResponse res,
boolean replicate,
UpdateBatchResult batchRes,
@@ -2587,7 +2593,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* will return false.
* @return {@code True} if filter evaluation succeeded.
*/
- private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req,
+ private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequestInterface req,
GridNearAtomicUpdateResponse res) {
try {
return ctx.isAllLocked(entry, req.filter());
@@ -2602,7 +2608,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @param req Request to remap.
*/
- private void remapToNewPrimary(GridNearAtomicUpdateRequest req) {
+ private void remapToNewPrimary(GridNearAtomicUpdateRequestInterface req) {
assert req.writeSynchronizationMode() == FULL_ASYNC : req;
if (log.isDebugEnabled())
@@ -2681,9 +2687,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
GridCacheVersion writeVer,
- GridNearAtomicUpdateRequest updateReq,
+ GridNearAtomicUpdateRequestInterface updateReq,
GridNearAtomicUpdateResponse updateRes,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
boolean force
) {
if (!force) {
@@ -2714,7 +2720,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param nodeId Sender node ID.
* @param req Near atomic update request.
*/
- private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequestInterface req) {
if (log.isDebugEnabled())
log.debug("Processing near atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index e31af19..6891d3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -53,8 +53,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* DHT atomic cache backup update future.
*/
-public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
- implements GridCacheAtomicFuture<Void> {
+public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implements GridCacheAtomicFuture<Void> {
/** */
private static final long serialVersionUID = 0L;
@@ -78,7 +77,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** Completion callback. */
@GridToStringExclude
- private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+ private final CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb;
/** Mappings. */
@GridToStringInclude
@@ -88,7 +87,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
/** Update request. */
- private final GridNearAtomicUpdateRequest updateReq;
+ private final GridNearAtomicUpdateRequestInterface updateReq;
/** Update response. */
private final GridNearAtomicUpdateResponse updateRes;
@@ -111,10 +110,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
*/
public GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicUpdateRequest,
- GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
GridCacheVersion writeVer,
- GridNearAtomicUpdateRequest updateReq,
+ GridNearAtomicUpdateRequestInterface updateReq,
GridNearAtomicUpdateResponse updateRes
) {
this.cctx = cctx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
new file mode 100644
index 0000000..60e0c5f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ * Base class for near atomic update futures.
+ */
+public abstract class GridNearAbstractAtomicUpdateFuture extends GridFutureAdapter<Object>
+ implements GridCacheAtomicFuture<Object> {
+ /** Logger reference. */
+ protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
+ /** Optional arguments for entry processor. */
+ protected Object[] invokeArgs;
+
+ /** Cache context. */
+ protected final GridCacheContext cctx;
+
+ /** Cache. */
+ protected final GridDhtAtomicCache cache;
+
+ /** Update operation. */
+ protected final GridCacheOperation op;
+
+ /** Return value require flag. */
+ protected final boolean retval;
+
+ /** Expiry policy. */
+ protected final ExpiryPolicy expiryPlc;
+
+ /** Optional filter. */
+ protected final CacheEntryPredicate[] filter;
+
+ /** Write synchronization mode. */
+ protected final CacheWriteSynchronizationMode syncMode;
+
+ /** Raw return value flag. */
+ protected final boolean rawRetval;
+
+ /** Fast map flag. */
+ protected final boolean fastMap;
+
+ /** Near cache flag. */
+ protected final boolean nearEnabled;
+
+ /** Subject ID. */
+ protected final UUID subjId;
+
+ /** Task name hash. */
+ protected final int taskNameHash;
+
+ /** Skip store flag. */
+ protected final boolean skipStore;
+
+ /** */
+ protected final boolean keepBinary;
+
+ /** Wait for topology future flag. */
+ protected final boolean waitTopFut;
+
+ /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
+ protected boolean topLocked;
+
+ /** Remap count. */
+ protected int remapCnt;
+
+ /**
+ * @param cctx Cache context.
+ * @param cache Cache instance.
+ * @param syncMode Write synchronization mode.
+ * @param op Update operation.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @param retval Return value require flag.
+ * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
+ * @param expiryPlc Expiry policy explicitly specified for cache operation.
+ * @param filter Entry filter.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param skipStore Skip store flag.
+ * @param keepBinary Keep binary flag.
+ * @param remapCnt Maximum number of retries.
+ * @param waitTopFut If {@code false} does not wait for affinity change future.
+ */
+ public GridNearAbstractAtomicUpdateFuture(
+ GridCacheContext cctx,
+ GridDhtAtomicCache cache,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ @Nullable Object[] invokeArgs,
+ final boolean retval,
+ final boolean rawRetval,
+ @Nullable ExpiryPolicy expiryPlc,
+ final CacheEntryPredicate[] filter,
+ UUID subjId,
+ int taskNameHash,
+ boolean skipStore,
+ boolean keepBinary,
+ int remapCnt,
+ boolean waitTopFut
+ ) {
+ this.rawRetval = rawRetval;
+
+ assert subjId != null;
+
+ this.cctx = cctx;
+ this.cache = cache;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.invokeArgs = invokeArgs;
+ this.retval = retval;
+ this.expiryPlc = expiryPlc;
+ this.filter = filter;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.skipStore = skipStore;
+ this.keepBinary = keepBinary;
+ this.waitTopFut = waitTopFut;
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+
+ fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+ cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+ !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+ nearEnabled = CU.isNearEnabled(cctx);
+
+ if (!waitTopFut)
+ remapCnt = 1;
+
+ this.remapCnt = remapCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ // No-op.
+ }
+
+ /**
+ * @return {@code True} if this future should block partition map exchange.
+ */
+ protected boolean waitForPartitionExchange() {
+ // Wait fast-map near atomic update futures in CLOCK mode.
+ return fastMap;
+ }
+
+ /**
+ * Updates near cache.
+ *
+ * @param req Update request.
+ * @param res Update response.
+ */
+ protected void updateNear(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
+ assert nearEnabled;
+
+ if (res.remapKeys() != null || !req.hasPrimary())
+ return;
+
+ GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+ near.processNearAtomicUpdateResponse(req, res);
+ }
+
+ /**
+ * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+ */
+ protected boolean storeFuture() {
+ return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+ }
+
+ /**
+ * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+ * node and send updates in parallel to all participating nodes.
+ *
+ * @param key Key to map.
+ * @param topVer Topology version to map.
+ * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
+ * @return Collection of nodes to which key is mapped.
+ */
+ protected Collection<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer, boolean fastMap
+ ) {
+ GridCacheAffinityManager affMgr = cctx.affinity();
+
+ // If we can send updates in parallel - do it.
+ return fastMap ?
+ cctx.topology().nodes(affMgr.partition(key), topVer) :
+ Collections.singletonList(affMgr.primary(key, topVer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
new file mode 100644
index 0000000..d633e47
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -0,0 +1,1264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ * DHT atomic cache single near update future.
+ */
+public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
+ implements GridCacheAtomicFuture<Object>{
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
+ /** Cache context. */
+ private final GridCacheContext cctx;
+
+ /** Cache. */
+ private GridDhtAtomicCache cache;
+
+ /** Update operation. */
+ private final GridCacheOperation op;
+
+ /** Keys */
+ private Collection<?> keys;
+
+ /** Values. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ private Collection<?> vals;
+
+ /** Optional arguments for entry processor. */
+ private Object[] invokeArgs;
+
+ /** Conflict put values. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ private Collection<GridCacheDrInfo> conflictPutVals;
+
+ /** Conflict remove values. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ private Collection<GridCacheVersion> conflictRmvVals;
+
+ /** Return value require flag. */
+ private final boolean retval;
+
+ /** Expiry policy. */
+ private final ExpiryPolicy expiryPlc;
+
+ /** Optional filter. */
+ private final CacheEntryPredicate[] filter;
+
+ /** Write synchronization mode. */
+ private final CacheWriteSynchronizationMode syncMode;
+
+ /** Raw return value flag. */
+ private final boolean rawRetval;
+
+ /** Fast map flag. */
+ private final boolean fastMap;
+
+ /** Near cache flag. */
+ private final boolean nearEnabled;
+
+ /** Subject ID. */
+ private final UUID subjId;
+
+ /** Task name hash. */
+ private final int taskNameHash;
+
+ /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
+ private boolean topLocked;
+
+ /** Skip store flag. */
+ private final boolean skipStore;
+
+ /** */
+ private final boolean keepBinary;
+
+ /** Wait for topology future flag. */
+ private final boolean waitTopFut;
+
+ /** Remap count. */
+ private int remapCnt;
+
+ /** State. */
+ private final UpdateState state;
+
+ /**
+ * @param cctx Cache context.
+ * @param cache Cache instance.
+ * @param syncMode Write synchronization mode.
+ * @param op Update operation.
+ * @param keys Keys to update.
+ * @param vals Values or transform closure.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @param conflictPutVals Conflict put values (optional).
+ * @param conflictRmvVals Conflict remove values (optional).
+ * @param retval Return value require flag.
+ * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
+ * @param expiryPlc Expiry policy explicitly specified for cache operation.
+ * @param filter Entry filter.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param skipStore Skip store flag.
+ * @param keepBinary Keep binary flag.
+ * @param remapCnt Maximum number of retries.
+ * @param waitTopFut If {@code false} does not wait for affinity change future.
+ */
+ public GridNearAtomicSingleUpdateFuture(
+ GridCacheContext cctx,
+ GridDhtAtomicCache cache,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ Collection<?> keys,
+ @Nullable Collection<?> vals,
+ @Nullable Object[] invokeArgs,
+ @Nullable Collection<GridCacheDrInfo> conflictPutVals,
+ @Nullable Collection<GridCacheVersion> conflictRmvVals,
+ final boolean retval,
+ final boolean rawRetval,
+ @Nullable ExpiryPolicy expiryPlc,
+ final CacheEntryPredicate[] filter,
+ UUID subjId,
+ int taskNameHash,
+ boolean skipStore,
+ boolean keepBinary,
+ int remapCnt,
+ boolean waitTopFut
+ ) {
+ this.rawRetval = rawRetval;
+
+ assert vals == null || vals.size() == keys.size();
+ assert conflictPutVals == null || conflictPutVals.size() == keys.size();
+ assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
+ assert subjId != null;
+
+ this.cctx = cctx;
+ this.cache = cache;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.keys = keys;
+ this.vals = vals;
+ this.invokeArgs = invokeArgs;
+ this.conflictPutVals = conflictPutVals;
+ this.conflictRmvVals = conflictRmvVals;
+ this.retval = retval;
+ this.expiryPlc = expiryPlc;
+ this.filter = filter;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.skipStore = skipStore;
+ this.keepBinary = keepBinary;
+ this.waitTopFut = waitTopFut;
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+
+ fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+ cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+ !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+ nearEnabled = CU.isNearEnabled(cctx);
+
+ if (!waitTopFut)
+ remapCnt = 1;
+
+ this.remapCnt = remapCnt;
+
+ state = new UpdateState();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion version() {
+ return state.futureVersion();
+ }
+
+ /**
+ * @return {@code True} if this future should block partition map exchange.
+ */
+ private boolean waitForPartitionExchange() {
+ // Wait fast-map near atomic update futures in CLOCK mode.
+ return fastMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<?> keys() {
+ return keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ state.onNodeLeft(nodeId);
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ // No-op.
+ }
+
+ /**
+ * Performs future mapping.
+ */
+ public void map() {
+ AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
+
+ if (topVer == null)
+ mapOnTopology();
+ else {
+ topLocked = true;
+
+ // Cannot remap.
+ remapCnt = 1;
+
+ state.map(topVer, null);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (waitForPartitionExchange()) {
+ GridFutureAdapter<Void> fut = state.completeFuture(topVer);
+
+ if (fut != null && isDone()) {
+ fut.onDone();
+
+ return null;
+ }
+
+ return fut;
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ConstantConditions")
+ @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+ assert res == null || res instanceof GridCacheReturn;
+
+ GridCacheReturn ret = (GridCacheReturn)res;
+
+ Object retval =
+ res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
+ cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
+
+ if (op == TRANSFORM && retval == null)
+ retval = Collections.emptyMap();
+
+ if (super.onDone(retval, err)) {
+ GridCacheVersion futVer = state.onFutureDone();
+
+ if (futVer != null)
+ cctx.mvcc().removeAtomicFuture(futVer);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Response callback.
+ *
+ * @param nodeId Node ID.
+ * @param res Update response.
+ */
+ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ state.onResult(nodeId, res, false);
+ }
+
+ /**
+ * Updates near cache.
+ *
+ * @param req Update request.
+ * @param res Update response.
+ */
+ private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ assert nearEnabled;
+
+ if (res.remapKeys() != null || !req.hasPrimary())
+ return;
+
+ GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+ near.processNearAtomicUpdateResponse(req, res);
+ }
+
+ /**
+ * Maps future on ready topology.
+ */
+ private void mapOnTopology() {
+ cache.topology().readLock();
+
+ AffinityTopologyVersion topVer = null;
+
+ try {
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
+
+ return;
+ }
+
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+
+ if (fut.isDone()) {
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
+
+ return;
+ }
+
+ topVer = fut.topologyVersion();
+ }
+ else {
+ if (waitTopFut) {
+ assert !topLocked : this;
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
+ }
+ else
+ onDone(new GridCacheTryPutFailedException());
+
+ return;
+ }
+ }
+ finally {
+ cache.topology().readUnlock();
+ }
+
+ state.map(topVer, null);
+ }
+
+ /**
+ * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+ */
+ private boolean storeFuture() {
+ return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+ }
+
+ /**
+ * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+ * node and send updates in parallel to all participating nodes.
+ *
+ * @param key Key to map.
+ * @param topVer Topology version to map.
+ * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
+ * @return Collection of nodes to which key is mapped.
+ */
+ private Collection<ClusterNode> mapKey(
+ KeyCacheObject key,
+ AffinityTopologyVersion topVer,
+ boolean fastMap
+ ) {
+ GridCacheAffinityManager affMgr = cctx.affinity();
+
+ // If we can send updates in parallel - do it.
+ return fastMap ?
+ cctx.topology().nodes(affMgr.partition(key), topVer) :
+ Collections.singletonList(affMgr.primary(key, topVer));
+ }
+
+ /**
+ * Maps future to single node.
+ *
+ * @param nodeId Node ID.
+ * @param req Request.
+ */
+ private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ if (cctx.localNodeId().equals(nodeId)) {
+ cache.updateAllAsyncInternal(nodeId, req,
+ new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ onResult(res.nodeId(), res);
+ }
+ });
+ }
+ else {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+
+ if (syncMode == FULL_ASYNC)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ }
+ catch (IgniteCheckedException e) {
+ state.onSendError(req, e);
+ }
+ }
+ }
+
+ /**
+ * Sends messages to remote nodes and updates local cache.
+ *
+ * @param mappings Mappings to send.
+ */
+ private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+ UUID locNodeId = cctx.localNodeId();
+
+ GridNearAtomicUpdateRequest locUpdate = null;
+
+ // Send messages to remote nodes first, then run local update.
+ for (GridNearAtomicUpdateRequest req : mappings.values()) {
+ if (locNodeId.equals(req.nodeId())) {
+ assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
+ ", req=" + req + ']';
+
+ locUpdate = req;
+ }
+ else {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ state.onSendError(req, e);
+ }
+ }
+ }
+
+ if (locUpdate != null) {
+ cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+ new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ onResult(res.nodeId(), res);
+ }
+ });
+ }
+
+ if (syncMode == FULL_ASYNC)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ }
+
+ /**
+ *
+ */
+ private class UpdateState {
+ /** Current topology version. */
+ private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+
+ /** */
+ private GridCacheVersion updVer;
+
+ /** Topology version when got mapping error. */
+ private AffinityTopologyVersion mapErrTopVer;
+
+ /** Mappings if operations is mapped to more than one node. */
+ @GridToStringInclude
+ private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+
+ /** */
+ private int resCnt;
+
+ /** Error. */
+ private CachePartialUpdateCheckedException err;
+
+ /** Future ID. */
+ private GridCacheVersion futVer;
+
+ /** Completion future for a particular topology version. */
+ private GridFutureAdapter<Void> topCompleteFut;
+
+ /** Keys to remap. */
+ private Collection<KeyCacheObject> remapKeys;
+
+ /** Not null is operation is mapped to single node. */
+ private GridNearAtomicUpdateRequest singleReq;
+
+ /** Operation result. */
+ private GridCacheReturn opRes;
+
+ /**
+ * @return Future version.
+ */
+ @Nullable synchronized GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /**
+ * @param nodeId Left node ID.
+ */
+ void onNodeLeft(UUID nodeId) {
+ GridNearAtomicUpdateResponse res = null;
+
+ synchronized (this) {
+ GridNearAtomicUpdateRequest req;
+
+ if (singleReq != null)
+ req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+ else
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.response() == null) {
+ res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ nodeId,
+ req.futureVersion(),
+ cctx.deploymentEnabled());
+
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+ "before response is received: " + nodeId);
+
+ e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+ res.addFailedKeys(req.keys(), e);
+ }
+ }
+
+ if (res != null)
+ onResult(nodeId, res, true);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ * @param nodeErr {@code True} if response was created on node failure.
+ */
+ @SuppressWarnings("unchecked")
+ void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+ GridNearAtomicUpdateRequest req;
+
+ AffinityTopologyVersion remapTopVer = null;
+
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
+
+ boolean rcvAll;
+
+ GridFutureAdapter<?> fut0 = null;
+
+ synchronized (this) {
+ if (!res.futureVersion().equals(futVer))
+ return;
+
+ if (singleReq != null) {
+ if (!singleReq.nodeId().equals(nodeId))
+ return;
+
+ req = singleReq;
+
+ singleReq = null;
+
+ rcvAll = true;
+ }
+ else {
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.onResponse(res)) {
+ resCnt++;
+
+ rcvAll = mappings.size() == resCnt;
+ }
+ else
+ return;
+ }
+
+ assert req != null && req.topologyVersion().equals(topVer) : req;
+
+ if (res.remapKeys() != null) {
+ assert !fastMap || cctx.kernalContext().clientNode();
+
+ if (remapKeys == null)
+ remapKeys = U.newHashSet(res.remapKeys().size());
+
+ remapKeys.addAll(res.remapKeys());
+
+ if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+ mapErrTopVer = req.topologyVersion();
+ }
+ else if (res.error() != null) {
+ if (res.failedKeys() != null)
+ addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+ }
+ else {
+ if (!req.fastMap() || req.hasPrimary()) {
+ GridCacheReturn ret = res.returnValue();
+
+ if (op == TRANSFORM) {
+ if (ret != null)
+ addInvokeResults(ret);
+ }
+ else
+ opRes = ret;
+ }
+ }
+
+ if (rcvAll) {
+ if (remapKeys != null) {
+ assert mapErrTopVer != null;
+
+ remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
+ }
+ else {
+ if (err != null &&
+ X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+ X.hasCause(err, ClusterTopologyCheckedException.class) &&
+ storeFuture() &&
+ --remapCnt > 0) {
+ ClusterTopologyCheckedException topErr =
+ X.cause(err, ClusterTopologyCheckedException.class);
+
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ CachePartialUpdateCheckedException cause =
+ X.cause(err, CachePartialUpdateCheckedException.class);
+
+ assert cause != null && cause.topologyVersion() != null : err;
+
+ remapTopVer =
+ new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+
+ err = null;
+
+ Collection<Object> failedKeys = cause.failedKeys();
+
+ remapKeys = new ArrayList<>(failedKeys.size());
+
+ for (Object key : failedKeys)
+ remapKeys.add(cctx.toCacheKeyObject(key));
+
+ updVer = null;
+ }
+ }
+ }
+
+ if (remapTopVer == null) {
+ err0 = err;
+ opRes0 = opRes;
+ }
+ else {
+ fut0 = topCompleteFut;
+
+ topCompleteFut = null;
+
+ cctx.mvcc().removeAtomicFuture(futVer);
+
+ futVer = null;
+ topVer = AffinityTopologyVersion.ZERO;
+ }
+ }
+ }
+
+ if (res.error() != null && res.failedKeys() == null) {
+ onDone(res.error());
+
+ return;
+ }
+
+ if (rcvAll && nearEnabled) {
+ if (mappings != null) {
+ for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+ GridNearAtomicUpdateResponse res0 = req0.response();
+
+ assert res0 != null : req0;
+
+ updateNear(req0, res0);
+ }
+ }
+ else if (!nodeErr)
+ updateNear(req, res);
+ }
+
+ if (remapTopVer != null) {
+ if (fut0 != null)
+ fut0.onDone();
+
+ if (!waitTopFut) {
+ onDone(new GridCacheTryPutFailedException());
+
+ return;
+ }
+
+ if (topLocked) {
+ assert !F.isEmpty(remapKeys) : remapKeys;
+
+ CachePartialUpdateCheckedException e =
+ new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+ ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+ "Failed to update keys, topology changed while execute atomic update inside transaction.");
+
+ cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+ e.add(remapKeys, cause);
+
+ onDone(e);
+
+ return;
+ }
+
+ IgniteInternalFuture<AffinityTopologyVersion> fut =
+ cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+ if (fut == null)
+ fut = new GridFinishedFuture<>(remapTopVer);
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ AffinityTopologyVersion topVer = fut.get();
+
+ map(topVer, remapKeys);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ });
+
+ return;
+ }
+
+ if (rcvAll)
+ onDone(opRes0, err0);
+ }
+
+ /**
+ * @param req Request.
+ * @param e Error.
+ */
+ void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+ synchronized (this) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.nodeId(),
+ req.futureVersion(),
+ cctx.deploymentEnabled());
+
+ res.addFailedKeys(req.keys(), e);
+
+ onResult(req.nodeId(), res, true);
+ }
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param remapKeys Keys to remap.
+ */
+ void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+ Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+
+ if (F.isEmpty(topNodes)) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid)."));
+
+ return;
+ }
+
+ Exception err = null;
+ GridNearAtomicUpdateRequest singleReq0 = null;
+ Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
+
+ int size = keys.size();
+
+ GridCacheVersion futVer = cctx.versions().next(topVer);
+
+ GridCacheVersion updVer;
+
+ // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+ if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ updVer = this.updVer;
+
+ if (updVer == null) {
+ updVer = cctx.versions().next(topVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Assigned fast-map version for update on near node: " + updVer);
+ }
+ }
+ else
+ updVer = null;
+
+ try {
+ if (size == 1 && !fastMap) {
+ assert remapKeys == null || remapKeys.size() == 1;
+
+ singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+ }
+ else {
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+ topVer,
+ futVer,
+ updVer,
+ remapKeys);
+
+ if (pendingMappings.size() == 1)
+ singleReq0 = F.firstValue(pendingMappings);
+ else {
+ if (syncMode == PRIMARY_SYNC) {
+ mappings0 = U.newHashMap(pendingMappings.size());
+
+ for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+ if (req.hasPrimary())
+ mappings0.put(req.nodeId(), req);
+ }
+ }
+ else
+ mappings0 = pendingMappings;
+
+ assert !mappings0.isEmpty() || size == 0 : GridNearAtomicSingleUpdateFuture.this;
+ }
+ }
+
+ synchronized (this) {
+ assert this.futVer == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+ this.topVer = topVer;
+ this.updVer = updVer;
+ this.futVer = futVer;
+
+ resCnt = 0;
+
+ singleReq = singleReq0;
+ mappings = mappings0;
+
+ this.remapKeys = null;
+ }
+ }
+ catch (Exception e) {
+ err = e;
+ }
+
+ if (err != null) {
+ onDone(err);
+
+ return;
+ }
+
+ if (storeFuture()) {
+ if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicSingleUpdateFuture.this)) {
+ assert isDone() : GridNearAtomicSingleUpdateFuture.this;
+
+ return;
+ }
+ }
+
+ // Optimize mapping for single key.
+ if (singleReq0 != null)
+ mapSingle(singleReq0.nodeId(), singleReq0);
+ else {
+ assert mappings0 != null;
+
+ if (size == 0)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ else
+ doUpdate(mappings0);
+ }
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return Future.
+ */
+ @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (this.topVer == AffinityTopologyVersion.ZERO)
+ return null;
+
+ if (this.topVer.compareTo(topVer) < 0) {
+ if (topCompleteFut == null)
+ topCompleteFut = new GridFutureAdapter<>();
+
+ return topCompleteFut;
+ }
+
+ return null;
+ }
+
+ /**
+ * @return Future version.
+ */
+ GridCacheVersion onFutureDone() {
+ GridCacheVersion ver0;
+
+ GridFutureAdapter<Void> fut0;
+
+ synchronized (this) {
+ fut0 = topCompleteFut;
+
+ topCompleteFut = null;
+
+ ver0 = futVer;
+
+ futVer = null;
+ }
+
+ if (fut0 != null)
+ fut0.onDone();
+
+ return ver0;
+ }
+
+ /**
+ * @param topNodes Cache nodes.
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @param remapKeys Keys to remap.
+ * @return Mapping.
+ * @throws Exception If failed.
+ */
+ private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+ AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer,
+ @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+ Iterator<?> it = null;
+
+ if (vals != null)
+ it = vals.iterator();
+
+ Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+
+ if (conflictPutVals != null)
+ conflictPutValsIt = conflictPutVals.iterator();
+
+ Iterator<GridCacheVersion> conflictRmvValsIt = null;
+
+ if (conflictRmvVals != null)
+ conflictRmvValsIt = conflictRmvVals.iterator();
+
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+
+ // Create mappings first, then send messages.
+ for (Object key : keys) {
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
+
+ if (vals != null) {
+ val = it.next();
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+ if (val == null)
+ throw new NullPointerException("Null value.");
+ }
+ else if (conflictPutVals != null) {
+ GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
+
+ val = conflictPutVal.valueEx();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ val = null;
+ conflictVer = conflictRmvValsIt.next();
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else {
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ if (val == null && op != GridCacheOperation.DELETE)
+ continue;
+
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+ if (remapKeys != null && !remapKeys.contains(cacheKey))
+ continue;
+
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
+
+ Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+
+ if (affNodes.isEmpty())
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
+
+ int i = 0;
+
+ for (ClusterNode affNode : affNodes) {
+ if (affNode == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
+
+ UUID nodeId = affNode.id();
+
+ GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+
+ if (mapped == null) {
+ mapped = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ keys.size());
+
+ pendingMappings.put(nodeId, mapped);
+ }
+
+ mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+
+ i++;
+ }
+ }
+
+ return pendingMappings;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @return Request.
+ * @throws Exception If failed.
+ */
+ private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer) throws Exception {
+ Object key = F.first(keys);
+
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
+
+ if (vals != null) {
+ // Regular PUT.
+ val = F.first(vals);
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else if (conflictPutVals != null) {
+ // Conflict PUT.
+ GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+
+ val = conflictPutVal.valueEx();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ // Conflict REMOVE.
+ val = null;
+ conflictVer = F.first(conflictRmvVals);
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else {
+ // Regular REMOVE.
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ // We still can get here if user pass map with single element.
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
+ if (val == null && op != GridCacheOperation.DELETE)
+ throw new NullPointerException("Null value.");
+
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
+
+ ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+
+ if (primary == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid).");
+
+ GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ 1);
+
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
+
+ return req;
+ }
+
+ /**
+ * @param ret Result from single node.
+ */
+ @SuppressWarnings("unchecked")
+ private void addInvokeResults(GridCacheReturn ret) {
+ assert op == TRANSFORM : op;
+ assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+ if (ret.value() != null) {
+ if (opRes != null)
+ opRes.mergeEntryProcessResults(ret);
+ else
+ opRes = ret;
+ }
+ }
+
+ /**
+ * @param failedKeys Failed keys.
+ * @param topVer Topology version for failed update.
+ * @param err Error cause.
+ */
+ private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+ AffinityTopologyVersion topVer,
+ Throwable err) {
+ CachePartialUpdateCheckedException err0 = this.err;
+
+ if (err0 == null)
+ err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+ Collection<Object> keys = new ArrayList<>(failedKeys.size());
+
+ for (KeyCacheObject key : failedKeys)
+ keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+
+ err0.add(keys, err, topVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized String toString() {
+ return S.toString(UpdateState.class, this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
+ }
+}
[6/8] ignite git commit: IGNITE-2532: WIP on single message
optimization.
Posted by vo...@apache.org.
IGNITE-2532: WIP on single message optimization.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52d20cdc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52d20cdc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52d20cdc
Branch: refs/heads/ignite-2523
Commit: 52d20cdcdc886c6ceaed49239e822a1d6deaa7dd
Parents: 2a1a31d
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 12:03:31 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 12:03:31 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMessage.java | 49 ++++
.../GridNearAtomicSingleUpdateRequest.java | 258 +++++++------------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 6 +-
3 files changed, 137 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/52d20cdc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 83e3aa7..cdf579d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -463,6 +463,24 @@ public abstract class GridCacheMessage implements Message {
}
/**
+ * @param obj Object to marshal.
+ * @param ctx Context.
+ * @return Marshalled collection.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected byte[] marshal(@Nullable Object obj, GridCacheContext ctx) throws IgniteCheckedException {
+ assert ctx != null;
+
+ if (obj == null)
+ return null;
+
+ if (addDepInfo)
+ prepareObject(obj, ctx);
+
+ return CU.marshal(ctx, obj);
+ }
+
+ /**
* @param col Collection to marshal.
* @param ctx Context.
* @return Marshalled collection.
@@ -539,6 +557,19 @@ public abstract class GridCacheMessage implements Message {
}
/**
+ * @param obj Object.
+ * @param ctx Context.
+ * @param ldr Class loader.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ protected final void finishUnmarshalCacheObject(@Nullable CacheObject obj, GridCacheContext ctx, ClassLoader ldr)
+ throws IgniteCheckedException {
+ if (obj != null)
+ obj.finishUnmarshal(ctx.cacheObjectContext(), ldr);
+ }
+
+ /**
* @param col Collection.
* @param ctx Context.
* @param ldr Class loader.
@@ -584,6 +615,24 @@ public abstract class GridCacheMessage implements Message {
}
/**
+ * @param bytes Byte array to unmarshal.
+ * @param ctx Context.
+ * @param ldr Loader.
+ * @return Unmarshalled object.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected <T> T unmarshal(@Nullable byte[] bytes, GridCacheSharedContext ctx, ClassLoader ldr)
+ throws IgniteCheckedException {
+ assert ldr != null;
+ assert ctx != null;
+
+ if (bytes == null)
+ return null;
+
+ return ctx.marshaller().unmarshal(bytes, ldr);
+ }
+
+ /**
* @param byteCol Collection to unmarshal.
* @param ctx Context.
* @param ldr Loader.
http://git-wip-us.apache.org/repos/asf/ignite/blob/52d20cdc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 5de9884..cee662c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -32,7 +31,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -46,8 +44,8 @@ import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import java.io.Externalizable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -91,22 +89,19 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
/** Update operation. */
private GridCacheOperation op;
- /** Keys to update. */
+ /** Key to update. */
@GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> keys;
+ private KeyCacheObject key;
- /** Values to update. */
- @GridDirectCollection(CacheObject.class)
- private List<CacheObject> vals;
+ /** Value to update. */
+ private CacheObject val;
- /** Entry processors. */
+ /** Entry processor. */
@GridDirectTransient
- private List<EntryProcessor<Object, Object, Object>> entryProcessors;
+ private EntryProcessor<Object, Object, Object> entryProc;
- /** Entry processors bytes. */
- @GridDirectCollection(byte[].class)
- private List<byte[]> entryProcessorsBytes;
+ /** Entry processor bytes. */
+ private byte[] entryProcBytes;
/** Optional arguments for entry processor. */
@GridDirectTransient
@@ -115,15 +110,14 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
/** Entry processor arguments bytes. */
private byte[][] invokeArgsBytes;
- /** Conflict versions. */
- @GridDirectCollection(GridCacheVersion.class)
- private List<GridCacheVersion> conflictVers;
+ /** Conflict version. */
+ private GridCacheVersion conflictVer;
- /** Conflict TTLs. */
- private GridLongList conflictTtls;
+ /** Conflict TTL. */
+ private long conflictTtl = CU.TTL_NOT_CHANGED;
- /** Conflict expire times. */
- private GridLongList conflictExpireTimes;
+ /** Conflict expire time. */
+ private long conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
/** Return value flag. */
private boolean retval;
@@ -138,9 +132,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
/** Filter. */
private CacheEntryPredicate[] filter;
- /** Flag indicating whether request contains primary keys. */
- private boolean hasPrimary;
-
/** Subject ID. */
private UUID subjId;
@@ -160,10 +151,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
@GridDirectTransient
private GridNearAtomicUpdateResponse res;
- /** Maximum possible size of inner collections. */
- @GridDirectTransient
- private int initSize;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -193,7 +180,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
* @param keepBinary Keep binary flag.
* @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
- * @param maxEntryCnt Maximum entries count.
*/
public GridNearAtomicSingleUpdateRequest(
int cacheId,
@@ -214,8 +200,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
boolean skipStore,
boolean keepBinary,
boolean clientReq,
- boolean addDepInfo,
- int maxEntryCnt
+ boolean addDepInfo
) {
assert futVer != null;
@@ -239,14 +224,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
this.keepBinary = keepBinary;
this.clientReq = clientReq;
this.addDepInfo = addDepInfo;
-
- // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
- // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
- // participate in request. As such, we know upper bound of all collections in request. If this bound is lower
- // than 10, we use it.
- initSize = Math.min(maxEntryCnt, 10);
-
- keys = new ArrayList<>(initSize);
}
/** {@inheritDoc} */
@@ -372,14 +349,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
* @param conflictTtl Conflict TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
- * @param primary If given key is primary on this mapping.
*/
+ @SuppressWarnings("unchecked")
public void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary) {
+ @Nullable GridCacheVersion conflictVer) {
EntryProcessor<Object, Object, Object> entryProcessor = null;
if (op == TRANSFORM) {
@@ -390,74 +366,37 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
assert val != null || op == DELETE;
- keys.add(key);
+ this.key = key;
- if (entryProcessor != null) {
- if (entryProcessors == null)
- entryProcessors = new ArrayList<>(initSize);
-
- entryProcessors.add(entryProcessor);
- }
+ if (entryProcessor != null)
+ this.entryProc = entryProcessor;
else if (val != null) {
assert val instanceof CacheObject : val;
- if (vals == null)
- vals = new ArrayList<>(initSize);
-
- vals.add((CacheObject)val);
+ this.val = (CacheObject)val;
}
- hasPrimary |= primary;
+ this.conflictVer = conflictVer;
- // In case there is no conflict, do not create the list.
- if (conflictVer != null) {
- if (conflictVers == null) {
- conflictVers = new ArrayList<>(initSize);
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictVers.add(null);
- }
-
- conflictVers.add(conflictVer);
- }
- else if (conflictVers != null)
- conflictVers.add(null);
-
- if (conflictTtl >= 0) {
- if (conflictTtls == null) {
- conflictTtls = new GridLongList(keys.size());
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictTtls.add(CU.TTL_NOT_CHANGED);
- }
-
- conflictTtls.add(conflictTtl);
- }
-
- if (conflictExpireTime >= 0) {
- if (conflictExpireTimes == null) {
- conflictExpireTimes = new GridLongList(keys.size());
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
- }
+ if (conflictTtl >= 0)
+ this.conflictTtl = conflictTtl;
- conflictExpireTimes.add(conflictExpireTime);
- }
+ if (conflictExpireTime >= 0)
+ this.conflictExpireTime = conflictExpireTime;
}
/**
* @return Keys for this update request.
*/
public List<KeyCacheObject> keys() {
- return keys;
+ return Collections.singletonList(key);
}
/**
* @return Values for this update request.
*/
public List<?> values() {
- return op == TRANSFORM ? entryProcessors : vals;
+ return Collections.singletonList(op == TRANSFORM ? entryProc : val);
}
/**
@@ -480,9 +419,10 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
*/
@SuppressWarnings("unchecked")
public CacheObject value(int idx) {
+ assert idx == 0;
assert op == UPDATE : op;
- return vals.get(idx);
+ return val;
}
/**
@@ -491,9 +431,10 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
*/
@SuppressWarnings("unchecked")
public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+ assert idx == 0;
assert op == TRANSFORM : op;
- return entryProcessors.get(idx);
+ return entryProc;
}
/**
@@ -501,17 +442,16 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
* @return Write value - either value, or transform closure.
*/
public CacheObject writeValue(int idx) {
- if (vals != null)
- return vals.get(idx);
+ assert idx == 0;
- return null;
+ return val;
}
/**
* @return Conflict versions.
*/
@Nullable public List<GridCacheVersion> conflictVersions() {
- return conflictVers;
+ return conflictVer == null ? null : Collections.singletonList(conflictVer);
}
/**
@@ -519,13 +459,9 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
* @return Conflict version.
*/
@Nullable public GridCacheVersion conflictVersion(int idx) {
- if (conflictVers != null) {
- assert idx >= 0 && idx < conflictVers.size();
-
- return conflictVers.get(idx);
- }
+ assert idx == 0;
- return null;
+ return conflictVer;
}
/**
@@ -533,13 +469,9 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
* @return Conflict TTL.
*/
public long conflictTtl(int idx) {
- if (conflictTtls != null) {
- assert idx >= 0 && idx < conflictTtls.size();
+ assert idx == 0;
- return conflictTtls.get(idx);
- }
-
- return CU.TTL_NOT_CHANGED;
+ return conflictTtl;
}
/**
@@ -547,20 +479,16 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
* @return Conflict expire time.
*/
public long conflictExpireTime(int idx) {
- if (conflictExpireTimes != null) {
- assert idx >= 0 && idx < conflictExpireTimes.size();
+ assert idx == 0;
- return conflictExpireTimes.get(idx);
- }
-
- return CU.EXPIRE_TIME_CALCULATE;
+ return conflictExpireTime;
}
/**
* @return Flag indicating whether this request contains primary keys.
*/
public boolean hasPrimary() {
- return hasPrimary;
+ return true;
}
/**
@@ -591,7 +519,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
GridCacheContext cctx = ctx.cacheContext(cacheId);
- prepareMarshalCacheObjects(keys, cctx);
+ prepareMarshalCacheObject(key, cctx);
if (filter != null) {
boolean hasFilter = false;
@@ -616,14 +544,14 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
if (!addDepInfo && ctx.deploymentEnabled())
addDepInfo = true;
- if (entryProcessorsBytes == null)
- entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+ if (entryProcBytes == null)
+ entryProcBytes = marshal(entryProc, cctx);
if (invokeArgsBytes == null)
invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
}
else
- prepareMarshalCacheObjects(vals, cctx);
+ prepareMarshalCacheObject(val, cctx);
}
/** {@inheritDoc} */
@@ -632,17 +560,17 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
GridCacheContext cctx = ctx.cacheContext(cacheId);
- finishUnmarshalCacheObjects(keys, cctx, ldr);
+ finishUnmarshalCacheObject(key, cctx, ldr);
if (op == TRANSFORM) {
- if (entryProcessors == null)
- entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+ if (entryProc == null)
+ entryProc = unmarshal(entryProcBytes, ctx, ldr);
if (invokeArgs == null)
invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
}
else
- finishUnmarshalCacheObjects(vals, cctx, ldr);
+ finishUnmarshalCacheObject(val, cctx, ldr);
if (filter != null) {
for (CacheEntryPredicate p : filter) {
@@ -682,25 +610,25 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
writer.incrementState();
case 4:
- if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+ if (!writer.writeLong("conflictExpireTime", conflictExpireTime))
return false;
writer.incrementState();
case 5:
- if (!writer.writeMessage("conflictTtls", conflictTtls))
+ if (!writer.writeLong("conflictTtl", conflictTtl))
return false;
writer.incrementState();
case 6:
- if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("conflictVer", conflictVer))
return false;
writer.incrementState();
case 7:
- if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeByteArray("entryProcBytes", entryProcBytes))
return false;
writer.incrementState();
@@ -730,85 +658,79 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
writer.incrementState();
case 12:
- if (!writer.writeBoolean("hasPrimary", hasPrimary))
+ if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
case 13:
- if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeBoolean("keepBinary", keepBinary))
return false;
writer.incrementState();
case 14:
- if (!writer.writeBoolean("keepBinary", keepBinary))
+ if (!writer.writeMessage("key", key))
return false;
writer.incrementState();
case 15:
- if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 16:
if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
return false;
writer.incrementState();
- case 17:
+ case 16:
if (!writer.writeBoolean("retval", retval))
return false;
writer.incrementState();
- case 18:
+ case 17:
if (!writer.writeBoolean("skipStore", skipStore))
return false;
writer.incrementState();
- case 19:
+ case 18:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 20:
+ case 19:
if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
- case 21:
+ case 20:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 22:
+ case 21:
if (!writer.writeBoolean("topLocked", topLocked))
return false;
writer.incrementState();
- case 23:
+ case 22:
if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
- case 24:
+ case 23:
if (!writer.writeMessage("updateVer", updateVer))
return false;
writer.incrementState();
- case 25:
- if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ case 24:
+ if (!writer.writeMessage("val", val))
return false;
writer.incrementState();
@@ -838,7 +760,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
case 4:
- conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+ conflictExpireTime = reader.readLong("conflictExpireTime");
if (!reader.isLastRead())
return false;
@@ -846,7 +768,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
case 5:
- conflictTtls = reader.readMessage("conflictTtls");
+ conflictTtl = reader.readLong("conflictTtl");
if (!reader.isLastRead())
return false;
@@ -854,7 +776,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
case 6:
- conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+ conflictVer = reader.readMessage("conflictVer");
if (!reader.isLastRead())
return false;
@@ -862,7 +784,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
case 7:
- entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+ entryProcBytes = reader.readByteArray("entryProcBytes");
if (!reader.isLastRead())
return false;
@@ -902,7 +824,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
case 12:
- hasPrimary = reader.readBoolean("hasPrimary");
+ invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
if (!reader.isLastRead())
return false;
@@ -910,7 +832,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
case 13:
- invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+ keepBinary = reader.readBoolean("keepBinary");
if (!reader.isLastRead())
return false;
@@ -918,7 +840,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
case 14:
- keepBinary = reader.readBoolean("keepBinary");
+ key = reader.readMessage("key");
if (!reader.isLastRead())
return false;
@@ -926,14 +848,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
case 15:
- keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
byte opOrd;
opOrd = reader.readByte("op");
@@ -945,7 +859,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
- case 17:
+ case 16:
retval = reader.readBoolean("retval");
if (!reader.isLastRead())
@@ -953,7 +867,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
- case 18:
+ case 17:
skipStore = reader.readBoolean("skipStore");
if (!reader.isLastRead())
@@ -961,7 +875,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
- case 19:
+ case 18:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -969,7 +883,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
- case 20:
+ case 19:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -981,7 +895,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
- case 21:
+ case 20:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -989,7 +903,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
- case 22:
+ case 21:
topLocked = reader.readBoolean("topLocked");
if (!reader.isLastRead())
@@ -997,7 +911,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
- case 23:
+ case 22:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -1005,7 +919,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
- case 24:
+ case 23:
updateVer = reader.readMessage("updateVer");
if (!reader.isLastRead())
@@ -1013,8 +927,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
reader.incrementState();
- case 25:
- vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+ case 24:
+ val = reader.readMessage("val");
if (!reader.isLastRead())
return false;
@@ -1033,7 +947,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 26;
+ return 25;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/52d20cdc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 493c765..38e93ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -1028,15 +1028,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
skipStore,
keepBinary,
cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- 1);
+ cctx.deploymentEnabled());
req.addUpdateEntry(cacheKey,
val,
conflictTtl,
conflictExpireTime,
- conflictVer,
- true);
+ conflictVer);
return req;
}
[3/8] ignite git commit: IGNITE-2532: WIP. Only refactorings for now.
Posted by vo...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/29c2aee6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 8b1673f..2aa510d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -62,8 +62,8 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
/**
* DHT atomic cache near update future.
*/
-public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFuture
- {
+@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFuture {
/** Keys */
private Collection<?> keys;
@@ -79,8 +79,39 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheVersion> conflictRmvVals;
- /** State. */
- private final UpdateState state;
+ /** Current topology version. */
+ private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+
+ /** */
+ private GridCacheVersion updVer;
+
+ /** Topology version when got mapping error. */
+ private AffinityTopologyVersion mapErrTopVer;
+
+ /** Mappings if operations is mapped to more than one node. */
+ @GridToStringInclude
+ private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+
+ /** */
+ private int resCnt;
+
+ /** Error. */
+ private CachePartialUpdateCheckedException err;
+
+ /** Future ID. */
+ private GridCacheVersion futVer;
+
+ /** Completion future for a particular topology version. */
+ private GridFutureAdapter<Void> topCompleteFut;
+
+ /** Keys to remap. */
+ private Collection<KeyCacheObject> remapKeys;
+
+ /** Not null is operation is mapped to single node. */
+ private GridNearAtomicUpdateRequest singleReq;
+
+ /** Operation result. */
+ private GridCacheReturn opRes;
/**
* @param cctx Cache context.
@@ -130,55 +161,22 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
assert vals == null || vals.size() == keys.size();
assert conflictPutVals == null || conflictPutVals.size() == keys.size();
assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
- assert subjId != null;
this.keys = keys;
this.vals = vals;
this.conflictPutVals = conflictPutVals;
this.conflictRmvVals = conflictRmvVals;
-
- state = new UpdateState();
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return state.futureVersion();
}
/** {@inheritDoc} */
- @Override public Collection<?> keys() {
- return keys;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- state.onNodeLeft(nodeId);
-
- return false;
- }
-
- /**
- * Performs future mapping.
- */
- public void map() {
- AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
-
- if (topVer == null)
- mapOnTopology();
- else {
- topLocked = true;
-
- // Cannot remap.
- remapCnt = 1;
-
- state.map(topVer, null);
- }
+ @Override public synchronized GridCacheVersion version() {
+ return futVer;
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
if (waitForPartitionExchange()) {
- GridFutureAdapter<Void> fut = state.completeFuture(topVer);
+ GridFutureAdapter<Void> fut = completeFuture0(topVer);
if (fut != null && isDone()) {
fut.onDone();
@@ -193,6 +191,39 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
/** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ GridNearAtomicUpdateResponse res = null;
+
+ synchronized (this) {
+ GridNearAtomicUpdateRequest req;
+
+ if (singleReq != null)
+ req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+ else
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.response() == null) {
+ res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ nodeId,
+ req.futureVersion(),
+ cctx.deploymentEnabled());
+
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+ "before response is received: " + nodeId);
+
+ e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+ res.addFailedKeys(req.keys(), e);
+ }
+ }
+
+ if (res != null)
+ onResult(nodeId, res, true);
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
@Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
assert res == null || res instanceof GridCacheReturn;
@@ -207,7 +238,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
retval = Collections.emptyMap();
if (super.onDone(retval, err)) {
- GridCacheVersion futVer = state.onFutureDone();
+ GridCacheVersion futVer = onFutureDone();
if (futVer != null)
cctx.mvcc().removeAtomicFuture(futVer);
@@ -219,13 +250,31 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
/**
+ * Performs future mapping.
+ */
+ public void map() {
+ AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
+
+ if (topVer == null)
+ mapOnTopology();
+ else {
+ topLocked = true;
+
+ // Cannot remap.
+ remapCnt = 1;
+
+ map(topVer, null);
+ }
+ }
+
+ /**
* Response callback.
*
* @param nodeId Node ID.
* @param res Update response.
*/
public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
- state.onResult(nodeId, res, false);
+ onResult(nodeId, res, false);
}
/**
@@ -281,7 +330,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
cache.topology().readUnlock();
}
- state.map(topVer, null);
+ map(topVer, null);
}
/**
@@ -310,7 +359,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
onDone(new GridCacheReturn(cctx, true, true, null, true));
}
catch (IgniteCheckedException e) {
- state.onSendError(req, e);
+ onSendError(req, e);
}
}
}
@@ -341,7 +390,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
- state.onSendError(req, e);
+ onSendError(req, e);
}
}
}
@@ -360,610 +409,422 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
/**
- *
+ * @param nodeId Node ID.
+ * @param res Response.
+ * @param nodeErr {@code True} if response was created on node failure.
*/
- private class UpdateState {
- /** Current topology version. */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
-
- /** */
- private GridCacheVersion updVer;
-
- /** Topology version when got mapping error. */
- private AffinityTopologyVersion mapErrTopVer;
-
- /** Mappings if operations is mapped to more than one node. */
- @GridToStringInclude
- private Map<UUID, GridNearAtomicUpdateRequest> mappings;
-
- /** */
- private int resCnt;
-
- /** Error. */
- private CachePartialUpdateCheckedException err;
+ @SuppressWarnings("unchecked")
+ void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+ GridNearAtomicUpdateRequest req;
- /** Future ID. */
- private GridCacheVersion futVer;
+ AffinityTopologyVersion remapTopVer = null;
- /** Completion future for a particular topology version. */
- private GridFutureAdapter<Void> topCompleteFut;
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
- /** Keys to remap. */
- private Collection<KeyCacheObject> remapKeys;
+ boolean rcvAll;
- /** Not null is operation is mapped to single node. */
- private GridNearAtomicUpdateRequest singleReq;
+ GridFutureAdapter<?> fut0 = null;
- /** Operation result. */
- private GridCacheReturn opRes;
-
- /**
- * @return Future version.
- */
- @Nullable synchronized GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /**
- * @param nodeId Left node ID.
- */
- void onNodeLeft(UUID nodeId) {
- GridNearAtomicUpdateResponse res = null;
-
- synchronized (this) {
- GridNearAtomicUpdateRequest req;
-
- if (singleReq != null)
- req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
- else
- req = mappings != null ? mappings.get(nodeId) : null;
+ synchronized (this) {
+ if (!res.futureVersion().equals(futVer))
+ return;
- if (req != null && req.response() == null) {
- res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- nodeId,
- req.futureVersion(),
- cctx.deploymentEnabled());
+ if (singleReq != null) {
+ if (!singleReq.nodeId().equals(nodeId))
+ return;
- ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
- "before response is received: " + nodeId);
+ req = singleReq;
- e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+ singleReq = null;
- res.addFailedKeys(req.keys(), e);
- }
+ rcvAll = true;
}
+ else {
+ req = mappings != null ? mappings.get(nodeId) : null;
- if (res != null)
- onResult(nodeId, res, true);
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- * @param nodeErr {@code True} if response was created on node failure.
- */
- @SuppressWarnings("unchecked")
- void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
- GridNearAtomicUpdateRequest req;
-
- AffinityTopologyVersion remapTopVer = null;
-
- GridCacheReturn opRes0 = null;
- CachePartialUpdateCheckedException err0 = null;
-
- boolean rcvAll;
-
- GridFutureAdapter<?> fut0 = null;
+ if (req != null && req.onResponse(res)) {
+ resCnt++;
- synchronized (this) {
- if (!res.futureVersion().equals(futVer))
+ rcvAll = mappings.size() == resCnt;
+ }
+ else
return;
+ }
- if (singleReq != null) {
- if (!singleReq.nodeId().equals(nodeId))
- return;
+ assert req != null && req.topologyVersion().equals(topVer) : req;
- req = singleReq;
+ if (res.remapKeys() != null) {
+ assert !fastMap || cctx.kernalContext().clientNode();
- singleReq = null;
+ if (remapKeys == null)
+ remapKeys = U.newHashSet(res.remapKeys().size());
- rcvAll = true;
- }
- else {
- req = mappings != null ? mappings.get(nodeId) : null;
+ remapKeys.addAll(res.remapKeys());
- if (req != null && req.onResponse(res)) {
- resCnt++;
+ if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+ mapErrTopVer = req.topologyVersion();
+ }
+ else if (res.error() != null) {
+ if (res.failedKeys() != null)
+ addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+ }
+ else {
+ if (!req.fastMap() || req.hasPrimary()) {
+ GridCacheReturn ret = res.returnValue();
- rcvAll = mappings.size() == resCnt;
+ if (op == TRANSFORM) {
+ if (ret != null)
+ addInvokeResults(ret);
}
else
- return;
+ opRes = ret;
}
+ }
- assert req != null && req.topologyVersion().equals(topVer) : req;
-
- if (res.remapKeys() != null) {
- assert !fastMap || cctx.kernalContext().clientNode();
-
- if (remapKeys == null)
- remapKeys = U.newHashSet(res.remapKeys().size());
-
- remapKeys.addAll(res.remapKeys());
+ if (rcvAll) {
+ if (remapKeys != null) {
+ assert mapErrTopVer != null;
- if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
- mapErrTopVer = req.topologyVersion();
- }
- else if (res.error() != null) {
- if (res.failedKeys() != null)
- addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+ remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
}
else {
- if (!req.fastMap() || req.hasPrimary()) {
- GridCacheReturn ret = res.returnValue();
-
- if (op == TRANSFORM) {
- if (ret != null)
- addInvokeResults(ret);
- }
- else
- opRes = ret;
- }
- }
-
- if (rcvAll) {
- if (remapKeys != null) {
- assert mapErrTopVer != null;
+ if (err != null &&
+ X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+ X.hasCause(err, ClusterTopologyCheckedException.class) &&
+ storeFuture() &&
+ --remapCnt > 0) {
+ ClusterTopologyCheckedException topErr =
+ X.cause(err, ClusterTopologyCheckedException.class);
- remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
- }
- else {
- if (err != null &&
- X.hasCause(err, CachePartialUpdateCheckedException.class) &&
- X.hasCause(err, ClusterTopologyCheckedException.class) &&
- storeFuture() &&
- --remapCnt > 0) {
- ClusterTopologyCheckedException topErr =
- X.cause(err, ClusterTopologyCheckedException.class);
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ CachePartialUpdateCheckedException cause =
+ X.cause(err, CachePartialUpdateCheckedException.class);
- if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
- CachePartialUpdateCheckedException cause =
- X.cause(err, CachePartialUpdateCheckedException.class);
+ assert cause != null && cause.topologyVersion() != null : err;
- assert cause != null && cause.topologyVersion() != null : err;
+ remapTopVer =
+ new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
- remapTopVer =
- new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+ err = null;
- err = null;
+ Collection<Object> failedKeys = cause.failedKeys();
- Collection<Object> failedKeys = cause.failedKeys();
+ remapKeys = new ArrayList<>(failedKeys.size());
- remapKeys = new ArrayList<>(failedKeys.size());
+ for (Object key : failedKeys)
+ remapKeys.add(cctx.toCacheKeyObject(key));
- for (Object key : failedKeys)
- remapKeys.add(cctx.toCacheKeyObject(key));
-
- updVer = null;
- }
+ updVer = null;
}
}
+ }
- if (remapTopVer == null) {
- err0 = err;
- opRes0 = opRes;
- }
- else {
- fut0 = topCompleteFut;
+ if (remapTopVer == null) {
+ err0 = err;
+ opRes0 = opRes;
+ }
+ else {
+ fut0 = topCompleteFut;
- topCompleteFut = null;
+ topCompleteFut = null;
- cctx.mvcc().removeAtomicFuture(futVer);
+ cctx.mvcc().removeAtomicFuture(futVer);
- futVer = null;
- topVer = AffinityTopologyVersion.ZERO;
- }
+ futVer = null;
+ topVer = AffinityTopologyVersion.ZERO;
}
}
+ }
- if (res.error() != null && res.failedKeys() == null) {
- onDone(res.error());
+ if (res.error() != null && res.failedKeys() == null) {
+ onDone(res.error());
- return;
- }
+ return;
+ }
- if (rcvAll && nearEnabled) {
- if (mappings != null) {
- for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = req0.response();
+ if (rcvAll && nearEnabled) {
+ if (mappings != null) {
+ for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+ GridNearAtomicUpdateResponse res0 = req0.response();
- assert res0 != null : req0;
+ assert res0 != null : req0;
- updateNear(req0, res0);
- }
+ updateNear(req0, res0);
}
- else if (!nodeErr)
- updateNear(req, res);
}
+ else if (!nodeErr)
+ updateNear(req, res);
+ }
- if (remapTopVer != null) {
- if (fut0 != null)
- fut0.onDone();
+ if (remapTopVer != null) {
+ if (fut0 != null)
+ fut0.onDone();
- if (!waitTopFut) {
- onDone(new GridCacheTryPutFailedException());
+ if (!waitTopFut) {
+ onDone(new GridCacheTryPutFailedException());
- return;
- }
+ return;
+ }
- if (topLocked) {
- assert !F.isEmpty(remapKeys) : remapKeys;
+ if (topLocked) {
+ assert !F.isEmpty(remapKeys) : remapKeys;
- CachePartialUpdateCheckedException e =
- new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+ CachePartialUpdateCheckedException e =
+ new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
- ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
- "Failed to update keys, topology changed while execute atomic update inside transaction.");
+ ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+ "Failed to update keys, topology changed while execute atomic update inside transaction.");
- cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+ cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
- e.add(remapKeys, cause);
+ e.add(remapKeys, cause);
- onDone(e);
+ onDone(e);
- return;
- }
+ return;
+ }
- IgniteInternalFuture<AffinityTopologyVersion> fut =
- cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+ IgniteInternalFuture<AffinityTopologyVersion> fut =
+ cctx.shared().exchange().affinityReadyFuture(remapTopVer);
- if (fut == null)
- fut = new GridFinishedFuture<>(remapTopVer);
+ if (fut == null)
+ fut = new GridFinishedFuture<>(remapTopVer);
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- try {
- AffinityTopologyVersion topVer = fut.get();
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ AffinityTopologyVersion topVer = fut.get();
- map(topVer, remapKeys);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ map(topVer, remapKeys);
}
- });
- }
- });
-
- return;
- }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ });
- if (rcvAll)
- onDone(opRes0, err0);
+ return;
}
- /**
- * @param req Request.
- * @param e Error.
- */
- void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
- synchronized (this) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- req.nodeId(),
- req.futureVersion(),
- cctx.deploymentEnabled());
+ if (rcvAll)
+ onDone(opRes0, err0);
+ }
- res.addFailedKeys(req.keys(), e);
+ /**
+ * @param req Request.
+ * @param e Error.
+ */
+ void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+ synchronized (this) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.nodeId(),
+ req.futureVersion(),
+ cctx.deploymentEnabled());
- onResult(req.nodeId(), res, true);
- }
+ res.addFailedKeys(req.keys(), e);
+
+ onResult(req.nodeId(), res, true);
}
+ }
- /**
- * @param topVer Topology version.
- * @param remapKeys Keys to remap.
- */
- void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
- Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+ /**
+ * @param topVer Topology version.
+ * @param remapKeys Keys to remap.
+ */
+ void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+ Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
- if (F.isEmpty(topNodes)) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid)."));
+ if (F.isEmpty(topNodes)) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid)."));
- return;
- }
+ return;
+ }
- Exception err = null;
- GridNearAtomicUpdateRequest singleReq0 = null;
- Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
+ Exception err = null;
+ GridNearAtomicUpdateRequest singleReq0 = null;
+ Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
- int size = keys.size();
+ int size = keys.size();
- GridCacheVersion futVer = cctx.versions().next(topVer);
+ GridCacheVersion futVer = cctx.versions().next(topVer);
- GridCacheVersion updVer;
+ GridCacheVersion updVer;
- // Assign version on near node in CLOCK ordering mode even if fastMap is false.
- if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
- updVer = this.updVer;
+ // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+ if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ updVer = this.updVer;
- if (updVer == null) {
- updVer = cctx.versions().next(topVer);
+ if (updVer == null) {
+ updVer = cctx.versions().next(topVer);
- if (log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
- }
+ if (log.isDebugEnabled())
+ log.debug("Assigned fast-map version for update on near node: " + updVer);
}
- else
- updVer = null;
+ }
+ else
+ updVer = null;
- try {
- if (size == 1 && !fastMap) {
- assert remapKeys == null || remapKeys.size() == 1;
+ try {
+ if (size == 1 && !fastMap) {
+ assert remapKeys == null || remapKeys.size() == 1;
- singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
- }
+ singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+ }
+ else {
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+ topVer,
+ futVer,
+ updVer,
+ remapKeys);
+
+ if (pendingMappings.size() == 1)
+ singleReq0 = F.firstValue(pendingMappings);
else {
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
- topVer,
- futVer,
- updVer,
- remapKeys);
+ if (syncMode == PRIMARY_SYNC) {
+ mappings0 = U.newHashMap(pendingMappings.size());
- if (pendingMappings.size() == 1)
- singleReq0 = F.firstValue(pendingMappings);
- else {
- if (syncMode == PRIMARY_SYNC) {
- mappings0 = U.newHashMap(pendingMappings.size());
-
- for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
- if (req.hasPrimary())
- mappings0.put(req.nodeId(), req);
- }
+ for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+ if (req.hasPrimary())
+ mappings0.put(req.nodeId(), req);
}
- else
- mappings0 = pendingMappings;
-
- assert !mappings0.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
}
+ else
+ mappings0 = pendingMappings;
+
+ assert !mappings0.isEmpty() || size == 0 : this;
}
+ }
- synchronized (this) {
- assert this.futVer == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
+ synchronized (this) {
+ assert this.futVer == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
- this.topVer = topVer;
- this.updVer = updVer;
- this.futVer = futVer;
+ this.topVer = topVer;
+ this.updVer = updVer;
+ this.futVer = futVer;
- resCnt = 0;
+ resCnt = 0;
- singleReq = singleReq0;
- mappings = mappings0;
+ singleReq = singleReq0;
+ mappings = mappings0;
- this.remapKeys = null;
- }
- }
- catch (Exception e) {
- err = e;
+ this.remapKeys = null;
}
+ }
+ catch (Exception e) {
+ err = e;
+ }
- if (err != null) {
- onDone(err);
+ if (err != null) {
+ onDone(err);
- return;
- }
+ return;
+ }
- if (storeFuture()) {
- if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) {
- assert isDone() : GridNearAtomicUpdateFuture.this;
+ if (storeFuture()) {
+ if (!cctx.mvcc().addAtomicFuture(futVer, this)) {
+ assert isDone() : this;
- return;
- }
+ return;
}
+ }
- // Optimize mapping for single key.
- if (singleReq0 != null)
- mapSingle(singleReq0.nodeId(), singleReq0);
- else {
- assert mappings0 != null;
+ // Optimize mapping for single key.
+ if (singleReq0 != null)
+ mapSingle(singleReq0.nodeId(), singleReq0);
+ else {
+ assert mappings0 != null;
- if (size == 0)
- onDone(new GridCacheReturn(cctx, true, true, null, true));
- else
- doUpdate(mappings0);
- }
+ if (size == 0)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ else
+ doUpdate(mappings0);
}
+ }
- /**
- * @param topVer Topology version.
- * @return Future.
- */
- @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
- if (this.topVer == AffinityTopologyVersion.ZERO)
- return null;
-
- if (this.topVer.compareTo(topVer) < 0) {
- if (topCompleteFut == null)
- topCompleteFut = new GridFutureAdapter<>();
+ /**
+ * @param topVer Topology version.
+ * @return Future.
+ */
+ @Nullable private synchronized GridFutureAdapter<Void> completeFuture0(AffinityTopologyVersion topVer) {
+ if (this.topVer == AffinityTopologyVersion.ZERO)
+ return null;
- return topCompleteFut;
- }
+ if (this.topVer.compareTo(topVer) < 0) {
+ if (topCompleteFut == null)
+ topCompleteFut = new GridFutureAdapter<>();
- return null;
+ return topCompleteFut;
}
- /**
- * @return Future version.
- */
- GridCacheVersion onFutureDone() {
- GridCacheVersion ver0;
-
- GridFutureAdapter<Void> fut0;
+ return null;
+ }
- synchronized (this) {
- fut0 = topCompleteFut;
+ /**
+ * @return Future version.
+ */
+ private GridCacheVersion onFutureDone() {
+ GridCacheVersion ver0;
- topCompleteFut = null;
+ GridFutureAdapter<Void> fut0;
- ver0 = futVer;
+ synchronized (this) {
+ fut0 = topCompleteFut;
- futVer = null;
- }
+ topCompleteFut = null;
- if (fut0 != null)
- fut0.onDone();
+ ver0 = futVer;
- return ver0;
+ futVer = null;
}
- /**
- * @param topNodes Cache nodes.
- * @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
- * @param remapKeys Keys to remap.
- * @return Mapping.
- * @throws Exception If failed.
- */
- private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
- AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer,
- @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
- Iterator<?> it = null;
-
- if (vals != null)
- it = vals.iterator();
-
- Iterator<GridCacheDrInfo> conflictPutValsIt = null;
-
- if (conflictPutVals != null)
- conflictPutValsIt = conflictPutVals.iterator();
-
- Iterator<GridCacheVersion> conflictRmvValsIt = null;
-
- if (conflictRmvVals != null)
- conflictRmvValsIt = conflictRmvVals.iterator();
-
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
-
- // Create mappings first, then send messages.
- for (Object key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
-
- Object val;
- GridCacheVersion conflictVer;
- long conflictTtl;
- long conflictExpireTime;
-
- if (vals != null) {
- val = it.next();
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-
- if (val == null)
- throw new NullPointerException("Null value.");
- }
- else if (conflictPutVals != null) {
- GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
-
- val = conflictPutVal.valueEx();
- conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
- conflictExpireTime = conflictPutVal.expireTime();
- }
- else if (conflictRmvVals != null) {
- val = null;
- conflictVer = conflictRmvValsIt.next();
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else {
- val = null;
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
-
- if (val == null && op != GridCacheOperation.DELETE)
- continue;
-
- KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+ if (fut0 != null)
+ fut0.onDone();
- if (remapKeys != null && !remapKeys.contains(cacheKey))
- continue;
+ return ver0;
+ }
- if (op != TRANSFORM)
- val = cctx.toCacheObject(val);
+ /**
+ * @param topNodes Cache nodes.
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @param remapKeys Keys to remap.
+ * @return Mapping.
+ * @throws Exception If failed.
+ */
+ private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+ AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer,
+ @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+ Iterator<?> it = null;
- Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+ if (vals != null)
+ it = vals.iterator();
- if (affNodes.isEmpty())
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
+ Iterator<GridCacheDrInfo> conflictPutValsIt = null;
- int i = 0;
-
- for (ClusterNode affNode : affNodes) {
- if (affNode == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
-
- UUID nodeId = affNode.id();
-
- GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
-
- if (mapped == null) {
- mapped = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- nodeId,
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- keys.size());
-
- pendingMappings.put(nodeId, mapped);
- }
+ if (conflictPutVals != null)
+ conflictPutValsIt = conflictPutVals.iterator();
- mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+ Iterator<GridCacheVersion> conflictRmvValsIt = null;
- i++;
- }
- }
+ if (conflictRmvVals != null)
+ conflictRmvValsIt = conflictRmvVals.iterator();
- return pendingMappings;
- }
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
- /**
- * @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
- * @return Request.
- * @throws Exception If failed.
- */
- private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer) throws Exception {
- Object key = F.first(keys);
+ // Create mappings first, then send messages.
+ for (Object key : keys) {
+ if (key == null)
+ throw new NullPointerException("Null key.");
Object val;
GridCacheVersion conflictVer;
@@ -971,127 +832,231 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
long conflictExpireTime;
if (vals != null) {
- // Regular PUT.
- val = F.first(vals);
+ val = it.next();
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+ if (val == null)
+ throw new NullPointerException("Null value.");
}
else if (conflictPutVals != null) {
- // Conflict PUT.
- GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+ GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
+ conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
}
else if (conflictRmvVals != null) {
- // Conflict REMOVE.
val = null;
- conflictVer = F.first(conflictRmvVals);
+ conflictVer = conflictRmvValsIt.next();
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
else {
- // Regular REMOVE.
val = null;
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
- // We still can get here if user pass map with single element.
- if (key == null)
- throw new NullPointerException("Null key.");
-
if (val == null && op != GridCacheOperation.DELETE)
- throw new NullPointerException("Null value.");
+ continue;
KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+ if (remapKeys != null && !remapKeys.contains(cacheKey))
+ continue;
+
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
- ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
-
- if (primary == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid).");
-
- GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- primary.id(),
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- 1);
-
- req.addUpdateEntry(cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer,
- true);
-
- return req;
- }
+ Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
- /**
- * @param ret Result from single node.
- */
- @SuppressWarnings("unchecked")
- private void addInvokeResults(GridCacheReturn ret) {
- assert op == TRANSFORM : op;
- assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
- if (ret.value() != null) {
- if (opRes != null)
- opRes.mergeEntryProcessResults(ret);
- else
- opRes = ret;
- }
- }
+ if (affNodes.isEmpty())
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
+
+ int i = 0;
- /**
- * @param failedKeys Failed keys.
- * @param topVer Topology version for failed update.
- * @param err Error cause.
- */
- private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
- AffinityTopologyVersion topVer,
- Throwable err) {
- CachePartialUpdateCheckedException err0 = this.err;
+ for (ClusterNode affNode : affNodes) {
+ if (affNode == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
- if (err0 == null)
- err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+ UUID nodeId = affNode.id();
- Collection<Object> keys = new ArrayList<>(failedKeys.size());
+ GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+
+ if (mapped == null) {
+ mapped = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ keys.size());
+
+ pendingMappings.put(nodeId, mapped);
+ }
- for (KeyCacheObject key : failedKeys)
- keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+ mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
- err0.add(keys, err, topVer);
+ i++;
+ }
}
- /** {@inheritDoc} */
- @Override public synchronized String toString() {
- return S.toString(UpdateState.class, this);
+ return pendingMappings;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @return Request.
+ * @throws Exception If failed.
+ */
+ private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer) throws Exception {
+ Object key = F.first(keys);
+
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
+
+ if (vals != null) {
+ // Regular PUT.
+ val = F.first(vals);
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else if (conflictPutVals != null) {
+ // Conflict PUT.
+ GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+
+ val = conflictPutVal.valueEx();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ // Conflict REMOVE.
+ val = null;
+ conflictVer = F.first(conflictRmvVals);
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
+ else {
+ // Regular REMOVE.
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ // We still can get here if user pass map with single element.
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
+ if (val == null && op != GridCacheOperation.DELETE)
+ throw new NullPointerException("Null value.");
+
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
+
+ ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+
+ if (primary == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid).");
+
+ GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ 1);
+
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
+
+ return req;
+ }
+
+ /**
+ * @param ret Result from single node.
+ */
+ @SuppressWarnings("unchecked")
+ private void addInvokeResults(GridCacheReturn ret) {
+ assert op == TRANSFORM : op;
+ assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+ if (ret.value() != null) {
+ if (opRes != null)
+ opRes.mergeEntryProcessResults(ret);
+ else
+ opRes = ret;
+ }
+ }
+
+ /**
+ * @param failedKeys Failed keys.
+ * @param topVer Topology version for failed update.
+ * @param err Error cause.
+ */
+ private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+ AffinityTopologyVersion topVer,
+ Throwable err) {
+ CachePartialUpdateCheckedException err0 = this.err;
+
+ if (err0 == null)
+ err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+ Collection<Object> keys = new ArrayList<>(failedKeys.size());
+
+ for (KeyCacheObject key : failedKeys)
+ keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+
+ err0.add(keys, err, topVer);
}
/** {@inheritDoc} */
[7/8] ignite git commit: IGNITE-2532: WIP on single message
optimization.
Posted by vo...@apache.org.
IGNITE-2532: WIP on single message optimization.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89c80744
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89c80744
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89c80744
Branch: refs/heads/ignite-2523
Commit: 89c8074452b4bc209eb03d758e534f0ef8365d46
Parents: 52d20cd
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 12:08:18 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 12:08:18 2016 +0300
----------------------------------------------------------------------
.../GridNearAtomicSingleUpdateRequest.java | 81 ++++++++++----------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 15 ++--
2 files changed, 45 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c80744/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index cee662c..9ef0b6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -161,6 +161,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
/**
* Constructor.
*
+ * @param key Key.
+ * @param val Value.
+ * @param conflictTtl Conflict TTL (optional).
+ * @param conflictExpireTime Conflict expire time (optional).
+ * @param conflictVer Conflict version (optional).
* @param cacheId Cache ID.
* @param nodeId Node ID.
* @param futVer Future version.
@@ -181,7 +186,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
* @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
*/
+ @SuppressWarnings("unchecked")
public GridNearAtomicSingleUpdateRequest(
+ KeyCacheObject key,
+ @Nullable Object val,
+ long conflictTtl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer,
int cacheId,
UUID nodeId,
GridCacheVersion futVer,
@@ -204,6 +215,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
) {
assert futVer != null;
+ this.key = key;
+
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futVer = futVer;
@@ -224,6 +237,32 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
this.keepBinary = keepBinary;
this.clientReq = clientReq;
this.addDepInfo = addDepInfo;
+
+ EntryProcessor<Object, Object, Object> entryProc = null;
+
+ if (op == TRANSFORM) {
+ assert val instanceof EntryProcessor : val;
+
+ entryProc = (EntryProcessor<Object, Object, Object>) val;
+ }
+
+ assert val != null || op == DELETE;
+
+ if (entryProc != null)
+ this.entryProc = entryProc;
+ else if (val != null) {
+ assert val instanceof CacheObject : val;
+
+ this.val = (CacheObject)val;
+ }
+
+ this.conflictVer = conflictVer;
+
+ if (conflictTtl >= 0)
+ this.conflictTtl = conflictTtl;
+
+ if (conflictExpireTime >= 0)
+ this.conflictExpireTime = conflictExpireTime;
}
/** {@inheritDoc} */
@@ -344,48 +383,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
}
/**
- * @param key Key to add.
- * @param val Optional update value.
- * @param conflictTtl Conflict TTL (optional).
- * @param conflictExpireTime Conflict expire time (optional).
- * @param conflictVer Conflict version (optional).
- */
- @SuppressWarnings("unchecked")
- public void addUpdateEntry(KeyCacheObject key,
- @Nullable Object val,
- long conflictTtl,
- long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer) {
- EntryProcessor<Object, Object, Object> entryProcessor = null;
-
- if (op == TRANSFORM) {
- assert val instanceof EntryProcessor : val;
-
- entryProcessor = (EntryProcessor<Object, Object, Object>) val;
- }
-
- assert val != null || op == DELETE;
-
- this.key = key;
-
- if (entryProcessor != null)
- this.entryProc = entryProcessor;
- else if (val != null) {
- assert val instanceof CacheObject : val;
-
- this.val = (CacheObject)val;
- }
-
- this.conflictVer = conflictVer;
-
- if (conflictTtl >= 0)
- this.conflictTtl = conflictTtl;
-
- if (conflictExpireTime >= 0)
- this.conflictExpireTime = conflictExpireTime;
- }
-
- /**
* @return Keys for this update request.
*/
public List<KeyCacheObject> keys() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c80744/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 38e93ec..c8550f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -1009,7 +1009,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
if (optimize) {
- GridNearAtomicSingleUpdateRequest req = new GridNearAtomicSingleUpdateRequest(
+ return new GridNearAtomicSingleUpdateRequest(
+ cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
cctx.cacheId(),
primary.id(),
futVer,
@@ -1029,14 +1034,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
keepBinary,
cctx.kernalContext().clientNode(),
cctx.deploymentEnabled());
-
- req.addUpdateEntry(cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer);
-
- return req;
}
else {
GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
[8/8] ignite git commit: IGNITE-2532: Reverting changes to
GridNearAtomicUpdateFuture.
Posted by vo...@apache.org.
IGNITE-2532: Reverting changes to GridNearAtomicUpdateFuture.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07c23931
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07c23931
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07c23931
Branch: refs/heads/ignite-2523
Commit: 07c23931f9758497db50bf0851af5d6c0fb8eaa4
Parents: 89c8074
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 12:45:42 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 12:45:42 2016 +0300
----------------------------------------------------------------------
.../GridNearAbstractAtomicUpdateFuture.java | 252 ----
.../dht/atomic/GridNearAtomicUpdateFuture.java | 1400 ++++++++++--------
2 files changed, 798 insertions(+), 854 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/07c23931/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
deleted file mode 100644
index f8c6810..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
-import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
-
-/**
- * Base class for near atomic update futures.
- */
-public abstract class GridNearAbstractAtomicUpdateFuture extends GridFutureAdapter<Object>
- implements GridCacheAtomicFuture<Object> {
- /** */
- public static final IgniteProductVersion SINGLE_PUT_MSG_SINCE = IgniteProductVersion.fromString("1.6.0");
-
- /** Logger reference. */
- protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- protected static IgniteLogger log;
-
- /** Optional arguments for entry processor. */
- protected Object[] invokeArgs;
-
- /** Cache context. */
- protected final GridCacheContext cctx;
-
- /** Cache. */
- protected final GridDhtAtomicCache cache;
-
- /** Update operation. */
- protected final GridCacheOperation op;
-
- /** Return value require flag. */
- protected final boolean retval;
-
- /** Expiry policy. */
- protected final ExpiryPolicy expiryPlc;
-
- /** Optional filter. */
- protected final CacheEntryPredicate[] filter;
-
- /** Write synchronization mode. */
- protected final CacheWriteSynchronizationMode syncMode;
-
- /** Raw return value flag. */
- protected final boolean rawRetval;
-
- /** Fast map flag. */
- protected final boolean fastMap;
-
- /** Near cache flag. */
- protected final boolean nearEnabled;
-
- /** Subject ID. */
- protected final UUID subjId;
-
- /** Task name hash. */
- protected final int taskNameHash;
-
- /** Skip store flag. */
- protected final boolean skipStore;
-
- /** */
- protected final boolean keepBinary;
-
- /** Wait for topology future flag. */
- protected final boolean waitTopFut;
-
- /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
- protected boolean topLocked;
-
- /** Remap count. */
- protected int remapCnt;
-
- /**
- * @param cctx Cache context.
- * @param cache Cache instance.
- * @param syncMode Write synchronization mode.
- * @param op Update operation.
- * @param invokeArgs Optional arguments for entry processor.
- * @param retval Return value require flag.
- * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
- * @param expiryPlc Expiry policy explicitly specified for cache operation.
- * @param filter Entry filter.
- * @param subjId Subject ID.
- * @param taskNameHash Task name hash code.
- * @param skipStore Skip store flag.
- * @param keepBinary Keep binary flag.
- * @param remapCnt Maximum number of retries.
- * @param waitTopFut If {@code false} does not wait for affinity change future.
- */
- public GridNearAbstractAtomicUpdateFuture(
- GridCacheContext cctx,
- GridDhtAtomicCache cache,
- CacheWriteSynchronizationMode syncMode,
- GridCacheOperation op,
- @Nullable Object[] invokeArgs,
- final boolean retval,
- final boolean rawRetval,
- @Nullable ExpiryPolicy expiryPlc,
- final CacheEntryPredicate[] filter,
- UUID subjId,
- int taskNameHash,
- boolean skipStore,
- boolean keepBinary,
- int remapCnt,
- boolean waitTopFut
- ) {
- this.rawRetval = rawRetval;
-
- assert subjId != null;
-
- this.cctx = cctx;
- this.cache = cache;
- this.syncMode = syncMode;
- this.op = op;
- this.invokeArgs = invokeArgs;
- this.retval = retval;
- this.expiryPlc = expiryPlc;
- this.filter = filter;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.skipStore = skipStore;
- this.keepBinary = keepBinary;
- this.waitTopFut = waitTopFut;
-
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
-
- fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
- cctx.config().getAtomicWriteOrderMode() == CLOCK &&
- !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
-
- nearEnabled = CU.isNearEnabled(cctx);
-
- if (!waitTopFut)
- remapCnt = 1;
-
- this.remapCnt = remapCnt;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- // No-op.
- }
-
- /**
- * @return {@code True} if this future should block partition map exchange.
- */
- protected boolean waitForPartitionExchange() {
- // Wait fast-map near atomic update futures in CLOCK mode.
- return fastMap;
- }
-
- /**
- * Updates near cache.
- *
- * @param req Update request.
- * @param res Update response.
- */
- protected void updateNear(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
- assert nearEnabled;
-
- if (res.remapKeys() != null || !req.hasPrimary())
- return;
-
- GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
- near.processNearAtomicUpdateResponse(req, res);
- }
-
- /**
- * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
- */
- protected boolean storeFuture() {
- return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
- }
-
- /**
- * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
- * node and send updates in parallel to all participating nodes.
- *
- * @param key Key to map.
- * @param topVer Topology version to map.
- * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
- * @return Collection of nodes to which key is mapped.
- */
- protected Collection<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer, boolean fastMap
- ) {
- GridCacheAffinityManager affMgr = cctx.affinity();
-
- // If we can send updates in parallel - do it.
- return fastMap ?
- cctx.topology().nodes(affMgr.partition(key), topVer) :
- Collections.singletonList(affMgr.primary(key, topVer));
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/07c23931/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index c8550f3..149d277 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -17,7 +17,16 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -26,12 +35,16 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -44,26 +57,38 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
/**
* DHT atomic cache near update future.
*/
-@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFuture {
+public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implements GridCacheAtomicFuture<Object>{
+ /** Version where single-put optimization appeared.*/
+ public static final IgniteProductVersion SINGLE_PUT_MSG_SINCE = IgniteProductVersion.fromString("1.6.0");
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
+ /** Cache context. */
+ private final GridCacheContext cctx;
+
+ /** Cache. */
+ private GridDhtAtomicCache cache;
+
+ /** Update operation. */
+ private final GridCacheOperation op;
+
/** Keys */
private Collection<?> keys;
@@ -71,6 +96,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<?> vals;
+ /** Optional arguments for entry processor. */
+ private Object[] invokeArgs;
+
/** Conflict put values. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheDrInfo> conflictPutVals;
@@ -79,39 +107,50 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheVersion> conflictRmvVals;
- /** Current topology version. */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+ /** Return value require flag. */
+ private final boolean retval;
- /** */
- private GridCacheVersion updVer;
+ /** Expiry policy. */
+ private final ExpiryPolicy expiryPlc;
- /** Topology version when got mapping error. */
- private AffinityTopologyVersion mapErrTopVer;
+ /** Optional filter. */
+ private final CacheEntryPredicate[] filter;
- /** Mappings if operations is mapped to more than one node. */
- @GridToStringInclude
- private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+ /** Write synchronization mode. */
+ private final CacheWriteSynchronizationMode syncMode;
- /** */
- private int resCnt;
+ /** Raw return value flag. */
+ private final boolean rawRetval;
+
+ /** Fast map flag. */
+ private final boolean fastMap;
+
+ /** Near cache flag. */
+ private final boolean nearEnabled;
- /** Error. */
- private CachePartialUpdateCheckedException err;
+ /** Subject ID. */
+ private final UUID subjId;
- /** Future ID. */
- private GridCacheVersion futVer;
+ /** Task name hash. */
+ private final int taskNameHash;
- /** Completion future for a particular topology version. */
- private GridFutureAdapter<Void> topCompleteFut;
+ /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
+ private boolean topLocked;
+
+ /** Skip store flag. */
+ private final boolean skipStore;
+
+ /** */
+ private final boolean keepBinary;
- /** Keys to remap. */
- private Collection<KeyCacheObject> remapKeys;
+ /** Wait for topology future flag. */
+ private final boolean waitTopFut;
- /** Not null is operation is mapped to single node. */
- private GridNearAtomicUpdateRequestInterface singleReq;
+ /** Remap count. */
+ private int remapCnt;
- /** Operation result. */
- private GridCacheReturn opRes;
+ /** State. */
+ private final UpdateState state;
/**
* @param cctx Cache context.
@@ -155,72 +194,116 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
int remapCnt,
boolean waitTopFut
) {
- super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
- skipStore, keepBinary, remapCnt, waitTopFut);
+ this.rawRetval = rawRetval;
assert vals == null || vals.size() == keys.size();
assert conflictPutVals == null || conflictPutVals.size() == keys.size();
assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
+ assert subjId != null;
+ this.cctx = cctx;
+ this.cache = cache;
+ this.syncMode = syncMode;
+ this.op = op;
this.keys = keys;
this.vals = vals;
+ this.invokeArgs = invokeArgs;
this.conflictPutVals = conflictPutVals;
this.conflictRmvVals = conflictRmvVals;
+ this.retval = retval;
+ this.expiryPlc = expiryPlc;
+ this.filter = filter;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.skipStore = skipStore;
+ this.keepBinary = keepBinary;
+ this.waitTopFut = waitTopFut;
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+
+ fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+ cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+ !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+ nearEnabled = CU.isNearEnabled(cctx);
+
+ if (!waitTopFut)
+ remapCnt = 1;
+
+ this.remapCnt = remapCnt;
+
+ state = new UpdateState();
}
/** {@inheritDoc} */
- @Override public synchronized GridCacheVersion version() {
- return futVer;
+ @Override public IgniteUuid futureId() {
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
- if (waitForPartitionExchange()) {
- GridFutureAdapter<Void> fut = completeFuture0(topVer);
+ @Override public GridCacheVersion version() {
+ return state.futureVersion();
+ }
- if (fut != null && isDone()) {
- fut.onDone();
+ /**
+ * @return {@code True} if this future should block partition map exchange.
+ */
+ private boolean waitForPartitionExchange() {
+ // Wait fast-map near atomic update futures in CLOCK mode.
+ return fastMap;
+ }
- return null;
- }
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ state.onNodeLeft(nodeId);
- return fut;
- }
+ return false;
+ }
- return null;
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return true;
}
/** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- GridNearAtomicUpdateResponse res = null;
+ @Override public void markNotTrackable() {
+ // No-op.
+ }
- synchronized (this) {
- GridNearAtomicUpdateRequestInterface req;
+ /**
+ * Performs future mapping.
+ */
+ public void map() {
+ AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
- if (singleReq != null)
- req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
- else
- req = mappings != null ? mappings.get(nodeId) : null;
+ if (topVer == null)
+ mapOnTopology();
+ else {
+ topLocked = true;
- if (req != null && req.response() == null) {
- res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- nodeId,
- req.futureVersion(),
- cctx.deploymentEnabled());
+ // Cannot remap.
+ remapCnt = 1;
- ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
- "before response is received: " + nodeId);
+ state.map(topVer, null);
+ }
+ }
- e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (waitForPartitionExchange()) {
+ GridFutureAdapter<Void> fut = state.completeFuture(topVer);
- res.addFailedKeys(req.keys(), e);
+ if (fut != null && isDone()) {
+ fut.onDone();
+
+ return null;
}
- }
- if (res != null)
- onResult(nodeId, res, true);
+ return fut;
+ }
- return false;
+ return null;
}
/** {@inheritDoc} */
@@ -238,7 +321,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
retval = Collections.emptyMap();
if (super.onDone(retval, err)) {
- GridCacheVersion futVer = onFutureDone();
+ GridCacheVersion futVer = state.onFutureDone();
if (futVer != null)
cctx.mvcc().removeAtomicFuture(futVer);
@@ -250,31 +333,30 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
/**
- * Performs future mapping.
+ * Response callback.
+ *
+ * @param nodeId Node ID.
+ * @param res Update response.
*/
- public void map() {
- AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
-
- if (topVer == null)
- mapOnTopology();
- else {
- topLocked = true;
-
- // Cannot remap.
- remapCnt = 1;
-
- map(topVer, null);
- }
+ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ state.onResult(nodeId, res, false);
}
/**
- * Response callback.
+ * Updates near cache.
*
- * @param nodeId Node ID.
+ * @param req Update request.
* @param res Update response.
*/
- public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
- onResult(nodeId, res, false);
+ private void updateNear(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
+ assert nearEnabled;
+
+ if (res.remapKeys() != null || !req.hasPrimary())
+ return;
+
+ GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+ near.processNearAtomicUpdateResponse(req, res);
}
/**
@@ -330,7 +412,36 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
cache.topology().readUnlock();
}
- map(topVer, null);
+ state.map(topVer, null);
+ }
+
+ /**
+ * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+ */
+ private boolean storeFuture() {
+ return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+ }
+
+ /**
+ * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+ * node and send updates in parallel to all participating nodes.
+ *
+ * @param key Key to map.
+ * @param topVer Topology version to map.
+ * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
+ * @return Collection of nodes to which key is mapped.
+ */
+ private Collection<ClusterNode> mapKey(
+ KeyCacheObject key,
+ AffinityTopologyVersion topVer,
+ boolean fastMap
+ ) {
+ GridCacheAffinityManager affMgr = cctx.affinity();
+
+ // If we can send updates in parallel - do it.
+ return fastMap ?
+ cctx.topology().nodes(affMgr.partition(key), topVer) :
+ Collections.singletonList(affMgr.primary(key, topVer));
}
/**
@@ -343,7 +454,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(nodeId, req,
new CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
+ @Override public void apply(GridNearAtomicUpdateRequestInterface req,
+ GridNearAtomicUpdateResponse res) {
onResult(res.nodeId(), res);
}
});
@@ -365,7 +477,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
onDone(new GridCacheReturn(cctx, true, true, null, true));
}
catch (IgniteCheckedException e) {
- onSendError(req, e);
+ state.onSendError(req, e);
}
}
}
@@ -378,7 +490,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
UUID locNodeId = cctx.localNodeId();
- GridNearAtomicUpdateRequestInterface locUpdate = null;
+ GridNearAtomicUpdateRequest locUpdate = null;
// Send messages to remote nodes first, then run local update.
for (GridNearAtomicUpdateRequest req : mappings.values()) {
@@ -396,7 +508,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
- onSendError(req, e);
+ state.onSendError(req, e);
}
}
}
@@ -415,423 +527,611 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
/**
- * @param nodeId Node ID.
- * @param res Response.
- * @param nodeErr {@code True} if response was created on node failure.
+ *
*/
- @SuppressWarnings("unchecked")
- void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
- GridNearAtomicUpdateRequestInterface req;
+ private class UpdateState {
+ /** Current topology version. */
+ private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
- AffinityTopologyVersion remapTopVer = null;
+ /** */
+ private GridCacheVersion updVer;
- GridCacheReturn opRes0 = null;
- CachePartialUpdateCheckedException err0 = null;
+ /** Topology version when got mapping error. */
+ private AffinityTopologyVersion mapErrTopVer;
- boolean rcvAll;
+ /** Mappings if operations is mapped to more than one node. */
+ @GridToStringInclude
+ private Map<UUID, GridNearAtomicUpdateRequest> mappings;
- GridFutureAdapter<?> fut0 = null;
+ /** */
+ private int resCnt;
- synchronized (this) {
- if (!res.futureVersion().equals(futVer))
- return;
+ /** Error. */
+ private CachePartialUpdateCheckedException err;
- if (singleReq != null) {
- if (!singleReq.nodeId().equals(nodeId))
- return;
+ /** Future ID. */
+ private GridCacheVersion futVer;
- req = singleReq;
+ /** Completion future for a particular topology version. */
+ private GridFutureAdapter<Void> topCompleteFut;
- singleReq = null;
+ /** Keys to remap. */
+ private Collection<KeyCacheObject> remapKeys;
- rcvAll = true;
- }
- else {
- req = mappings != null ? mappings.get(nodeId) : null;
+ /** Not null is operation is mapped to single node. */
+ private GridNearAtomicUpdateRequestInterface singleReq;
- if (req != null && req.onResponse(res)) {
- resCnt++;
+ /** Operation result. */
+ private GridCacheReturn opRes;
- rcvAll = mappings.size() == resCnt;
- }
+ /**
+ * @return Future version.
+ */
+ @Nullable synchronized GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /**
+ * @param nodeId Left node ID.
+ */
+ void onNodeLeft(UUID nodeId) {
+ GridNearAtomicUpdateResponse res = null;
+
+ synchronized (this) {
+ GridNearAtomicUpdateRequestInterface req;
+
+ if (singleReq != null)
+ req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
else
- return;
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.response() == null) {
+ res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ nodeId,
+ req.futureVersion(),
+ cctx.deploymentEnabled());
+
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+ "before response is received: " + nodeId);
+
+ e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+ res.addFailedKeys(req.keys(), e);
+ }
}
- assert req != null && req.topologyVersion().equals(topVer) : req;
+ if (res != null)
+ onResult(nodeId, res, true);
+ }
- if (res.remapKeys() != null) {
- assert !fastMap || cctx.kernalContext().clientNode();
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ * @param nodeErr {@code True} if response was created on node failure.
+ */
+ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+ void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+ GridNearAtomicUpdateRequestInterface req;
- if (remapKeys == null)
- remapKeys = U.newHashSet(res.remapKeys().size());
+ AffinityTopologyVersion remapTopVer = null;
- remapKeys.addAll(res.remapKeys());
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
- if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
- mapErrTopVer = req.topologyVersion();
- }
- else if (res.error() != null) {
- if (res.failedKeys() != null)
- addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
- }
- else {
- if (!req.fastMap() || req.hasPrimary()) {
- GridCacheReturn ret = res.returnValue();
+ boolean rcvAll;
+
+ GridFutureAdapter<?> fut0 = null;
+
+ synchronized (this) {
+ if (!res.futureVersion().equals(futVer))
+ return;
+
+ if (singleReq != null) {
+ if (!singleReq.nodeId().equals(nodeId))
+ return;
+
+ req = singleReq;
+
+ singleReq = null;
+
+ rcvAll = true;
+ }
+ else {
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.onResponse(res)) {
+ resCnt++;
- if (op == TRANSFORM) {
- if (ret != null)
- addInvokeResults(ret);
+ rcvAll = mappings.size() == resCnt;
}
else
- opRes = ret;
+ return;
}
- }
- if (rcvAll) {
- if (remapKeys != null) {
- assert mapErrTopVer != null;
+ assert req != null && req.topologyVersion().equals(topVer) : req;
+
+ if (res.remapKeys() != null) {
+ assert !fastMap || cctx.kernalContext().clientNode();
+
+ if (remapKeys == null)
+ remapKeys = U.newHashSet(res.remapKeys().size());
- remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
+ remapKeys.addAll(res.remapKeys());
+
+ if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+ mapErrTopVer = req.topologyVersion();
+ }
+ else if (res.error() != null) {
+ if (res.failedKeys() != null)
+ addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
}
else {
- if (err != null &&
- X.hasCause(err, CachePartialUpdateCheckedException.class) &&
- X.hasCause(err, ClusterTopologyCheckedException.class) &&
- storeFuture() &&
- --remapCnt > 0) {
- ClusterTopologyCheckedException topErr =
- X.cause(err, ClusterTopologyCheckedException.class);
+ if (!req.fastMap() || req.hasPrimary()) {
+ GridCacheReturn ret = res.returnValue();
+
+ if (op == TRANSFORM) {
+ if (ret != null)
+ addInvokeResults(ret);
+ }
+ else
+ opRes = ret;
+ }
+ }
+
+ if (rcvAll) {
+ if (remapKeys != null) {
+ assert mapErrTopVer != null;
+
+ remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
+ }
+ else {
+ if (err != null &&
+ X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+ X.hasCause(err, ClusterTopologyCheckedException.class) &&
+ storeFuture() &&
+ --remapCnt > 0) {
+ ClusterTopologyCheckedException topErr =
+ X.cause(err, ClusterTopologyCheckedException.class);
- if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
- CachePartialUpdateCheckedException cause =
- X.cause(err, CachePartialUpdateCheckedException.class);
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ CachePartialUpdateCheckedException cause =
+ X.cause(err, CachePartialUpdateCheckedException.class);
- assert cause != null && cause.topologyVersion() != null : err;
+ assert cause != null && cause.topologyVersion() != null : err;
- remapTopVer =
- new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+ remapTopVer =
+ new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
- err = null;
+ err = null;
- Collection<Object> failedKeys = cause.failedKeys();
+ Collection<Object> failedKeys = cause.failedKeys();
- remapKeys = new ArrayList<>(failedKeys.size());
+ remapKeys = new ArrayList<>(failedKeys.size());
- for (Object key : failedKeys)
- remapKeys.add(cctx.toCacheKeyObject(key));
+ for (Object key : failedKeys)
+ remapKeys.add(cctx.toCacheKeyObject(key));
- updVer = null;
+ updVer = null;
+ }
}
}
- }
- if (remapTopVer == null) {
- err0 = err;
- opRes0 = opRes;
- }
- else {
- fut0 = topCompleteFut;
+ if (remapTopVer == null) {
+ err0 = err;
+ opRes0 = opRes;
+ }
+ else {
+ fut0 = topCompleteFut;
- topCompleteFut = null;
+ topCompleteFut = null;
- cctx.mvcc().removeAtomicFuture(futVer);
+ cctx.mvcc().removeAtomicFuture(futVer);
- futVer = null;
- topVer = AffinityTopologyVersion.ZERO;
+ futVer = null;
+ topVer = AffinityTopologyVersion.ZERO;
+ }
}
}
- }
- if (res.error() != null && res.failedKeys() == null) {
- onDone(res.error());
+ if (res.error() != null && res.failedKeys() == null) {
+ onDone(res.error());
- return;
- }
+ return;
+ }
- if (rcvAll && nearEnabled) {
- if (mappings != null) {
- for (GridNearAtomicUpdateRequestInterface req0 : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = req0.response();
+ if (rcvAll && nearEnabled) {
+ if (mappings != null) {
+ for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+ GridNearAtomicUpdateResponse res0 = req0.response();
- assert res0 != null : req0;
+ assert res0 != null : req0;
- updateNear(req0, res0);
+ updateNear(req0, res0);
+ }
}
+ else if (!nodeErr)
+ updateNear(req, res);
}
- else if (!nodeErr)
- updateNear(req, res);
- }
- if (remapTopVer != null) {
- if (fut0 != null)
- fut0.onDone();
+ if (remapTopVer != null) {
+ if (fut0 != null)
+ fut0.onDone();
- if (!waitTopFut) {
- onDone(new GridCacheTryPutFailedException());
+ if (!waitTopFut) {
+ onDone(new GridCacheTryPutFailedException());
- return;
- }
+ return;
+ }
- if (topLocked) {
- assert !F.isEmpty(remapKeys) : remapKeys;
+ if (topLocked) {
+ assert !F.isEmpty(remapKeys) : remapKeys;
- CachePartialUpdateCheckedException e =
- new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+ CachePartialUpdateCheckedException e =
+ new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
- ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
- "Failed to update keys, topology changed while execute atomic update inside transaction.");
+ ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+ "Failed to update keys, topology changed while execute atomic update inside transaction.");
- cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+ cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
- e.add(remapKeys, cause);
+ e.add(remapKeys, cause);
- onDone(e);
+ onDone(e);
- return;
- }
+ return;
+ }
- IgniteInternalFuture<AffinityTopologyVersion> fut =
- cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+ IgniteInternalFuture<AffinityTopologyVersion> fut =
+ cctx.shared().exchange().affinityReadyFuture(remapTopVer);
- if (fut == null)
- fut = new GridFinishedFuture<>(remapTopVer);
+ if (fut == null)
+ fut = new GridFinishedFuture<>(remapTopVer);
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- try {
- AffinityTopologyVersion topVer = fut.get();
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ AffinityTopologyVersion topVer = fut.get();
- map(topVer, remapKeys);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ map(topVer, remapKeys);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
}
- }
- });
- }
- });
+ });
+ }
+ });
- return;
- }
+ return;
+ }
- if (rcvAll)
- onDone(opRes0, err0);
- }
+ if (rcvAll)
+ onDone(opRes0, err0);
+ }
- /**
- * @param req Request.
- * @param e Error.
- */
- void onSendError(GridNearAtomicUpdateRequestInterface req, IgniteCheckedException e) {
- synchronized (this) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- req.nodeId(),
- req.futureVersion(),
- cctx.deploymentEnabled());
+ /**
+ * @param req Request.
+ * @param e Error.
+ */
+ void onSendError(GridNearAtomicUpdateRequestInterface req, IgniteCheckedException e) {
+ synchronized (this) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.nodeId(),
+ req.futureVersion(),
+ cctx.deploymentEnabled());
- res.addFailedKeys(req.keys(), e);
+ res.addFailedKeys(req.keys(), e);
- onResult(req.nodeId(), res, true);
+ onResult(req.nodeId(), res, true);
+ }
}
- }
- /**
- * @param topVer Topology version.
- * @param remapKeys Keys to remap.
- */
- void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
- Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+ /**
+ * @param topVer Topology version.
+ * @param remapKeys Keys to remap.
+ */
+ void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+ Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
- if (F.isEmpty(topNodes)) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid)."));
+ if (F.isEmpty(topNodes)) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid)."));
- return;
- }
+ return;
+ }
- Exception err = null;
- GridNearAtomicUpdateRequestInterface singleReq0 = null;
- Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
+ Exception err = null;
+ GridNearAtomicUpdateRequestInterface singleReq0 = null;
+ Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
- int size = keys.size();
+ int size = keys.size();
- GridCacheVersion futVer = cctx.versions().next(topVer);
+ GridCacheVersion futVer = cctx.versions().next(topVer);
- GridCacheVersion updVer;
+ GridCacheVersion updVer;
- // Assign version on near node in CLOCK ordering mode even if fastMap is false.
- if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
- updVer = this.updVer;
+ // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+ if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ updVer = this.updVer;
- if (updVer == null) {
- updVer = cctx.versions().next(topVer);
+ if (updVer == null) {
+ updVer = cctx.versions().next(topVer);
- if (log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
+ if (log.isDebugEnabled())
+ log.debug("Assigned fast-map version for update on near node: " + updVer);
+ }
}
- }
- else
- updVer = null;
-
- try {
- if (size == 1 && !fastMap) {
- assert remapKeys == null || remapKeys.size() == 1;
+ else
+ updVer = null;
- singleReq0 = mapSingleUpdate(topVer, topNodes, futVer, updVer);
- }
- else {
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
- topVer,
- futVer,
- updVer,
- remapKeys);
+ try {
+ if (size == 1 && !fastMap) {
+ assert remapKeys == null || remapKeys.size() == 1;
- if (pendingMappings.size() == 1)
- singleReq0 = F.firstValue(pendingMappings);
+ singleReq0 = mapSingleUpdate(topVer, topNodes, futVer, updVer);
+ }
else {
- if (syncMode == PRIMARY_SYNC) {
- mappings0 = U.newHashMap(pendingMappings.size());
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+ topVer,
+ futVer,
+ updVer,
+ remapKeys);
- for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
- if (req.hasPrimary())
- mappings0.put(req.nodeId(), req);
+ if (pendingMappings.size() == 1)
+ singleReq0 = F.firstValue(pendingMappings);
+ else {
+ if (syncMode == PRIMARY_SYNC) {
+ mappings0 = U.newHashMap(pendingMappings.size());
+
+ for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+ if (req.hasPrimary())
+ mappings0.put(req.nodeId(), req);
+ }
}
- }
- else
- mappings0 = pendingMappings;
+ else
+ mappings0 = pendingMappings;
- assert !mappings0.isEmpty() || size == 0 : this;
+ assert !mappings0.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
+ }
}
- }
- synchronized (this) {
- assert this.futVer == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
+ synchronized (this) {
+ assert this.futVer == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
- this.topVer = topVer;
- this.updVer = updVer;
- this.futVer = futVer;
+ this.topVer = topVer;
+ this.updVer = updVer;
+ this.futVer = futVer;
- resCnt = 0;
+ resCnt = 0;
- singleReq = singleReq0;
- mappings = mappings0;
+ singleReq = singleReq0;
+ mappings = mappings0;
- this.remapKeys = null;
+ this.remapKeys = null;
+ }
+ }
+ catch (Exception e) {
+ err = e;
}
- }
- catch (Exception e) {
- err = e;
- }
- if (err != null) {
- onDone(err);
+ if (err != null) {
+ onDone(err);
- return;
- }
+ return;
+ }
- if (storeFuture()) {
- if (!cctx.mvcc().addAtomicFuture(futVer, this)) {
- assert isDone() : this;
+ if (storeFuture()) {
+ if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) {
+ assert isDone() : GridNearAtomicUpdateFuture.this;
- return;
+ return;
+ }
}
- }
- // Optimize mapping for single key.
- if (singleReq0 != null)
- mapSingle(singleReq0.nodeId(), singleReq0);
- else {
- assert mappings0 != null;
+ // Optimize mapping for single key.
+ if (singleReq0 != null)
+ mapSingle(singleReq0.nodeId(), singleReq0);
+ else {
+ assert mappings0 != null;
- if (size == 0)
- onDone(new GridCacheReturn(cctx, true, true, null, true));
- else
- doUpdate(mappings0);
+ if (size == 0)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ else
+ doUpdate(mappings0);
+ }
}
- }
- /**
- * @param topVer Topology version.
- * @return Future.
- */
- @Nullable private synchronized GridFutureAdapter<Void> completeFuture0(AffinityTopologyVersion topVer) {
- if (this.topVer == AffinityTopologyVersion.ZERO)
- return null;
+ /**
+ * @param topVer Topology version.
+ * @return Future.
+ */
+ @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (this.topVer == AffinityTopologyVersion.ZERO)
+ return null;
- if (this.topVer.compareTo(topVer) < 0) {
- if (topCompleteFut == null)
- topCompleteFut = new GridFutureAdapter<>();
+ if (this.topVer.compareTo(topVer) < 0) {
+ if (topCompleteFut == null)
+ topCompleteFut = new GridFutureAdapter<>();
- return topCompleteFut;
+ return topCompleteFut;
+ }
+
+ return null;
}
- return null;
- }
+ /**
+ * @return Future version.
+ */
+ GridCacheVersion onFutureDone() {
+ GridCacheVersion ver0;
- /**
- * @return Future version.
- */
- private GridCacheVersion onFutureDone() {
- GridCacheVersion ver0;
+ GridFutureAdapter<Void> fut0;
+
+ synchronized (this) {
+ fut0 = topCompleteFut;
- GridFutureAdapter<Void> fut0;
+ topCompleteFut = null;
- synchronized (this) {
- fut0 = topCompleteFut;
+ ver0 = futVer;
- topCompleteFut = null;
+ futVer = null;
+ }
- ver0 = futVer;
+ if (fut0 != null)
+ fut0.onDone();
- futVer = null;
+ return ver0;
}
- if (fut0 != null)
- fut0.onDone();
+ /**
+ * @param topNodes Cache nodes.
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @param remapKeys Keys to remap.
+ * @return Mapping.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+ AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer,
+ @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+ Iterator<?> it = null;
+
+ if (vals != null)
+ it = vals.iterator();
+
+ Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+
+ if (conflictPutVals != null)
+ conflictPutValsIt = conflictPutVals.iterator();
+
+ Iterator<GridCacheVersion> conflictRmvValsIt = null;
+
+ if (conflictRmvVals != null)
+ conflictRmvValsIt = conflictRmvVals.iterator();
+
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+
+ // Create mappings first, then send messages.
+ for (Object key : keys) {
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
+
+ if (vals != null) {
+ val = it.next();
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+ if (val == null)
+ throw new NullPointerException("Null value.");
+ }
+ else if (conflictPutVals != null) {
+ GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
- return ver0;
- }
+ val = conflictPutVal.valueEx();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ val = null;
+ conflictVer = conflictRmvValsIt.next();
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else {
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
- /**
- * @param topNodes Cache nodes.
- * @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
- * @param remapKeys Keys to remap.
- * @return Mapping.
- * @throws Exception If failed.
- */
- @SuppressWarnings("ConstantConditions")
- private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
- AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer,
- @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
- Iterator<?> it = null;
+ if (val == null && op != GridCacheOperation.DELETE)
+ continue;
- if (vals != null)
- it = vals.iterator();
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
- Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+ if (remapKeys != null && !remapKeys.contains(cacheKey))
+ continue;
- if (conflictPutVals != null)
- conflictPutValsIt = conflictPutVals.iterator();
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
- Iterator<GridCacheVersion> conflictRmvValsIt = null;
+ Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
- if (conflictRmvVals != null)
- conflictRmvValsIt = conflictRmvVals.iterator();
+ if (affNodes.isEmpty())
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+ int i = 0;
+
+ for (ClusterNode affNode : affNodes) {
+ if (affNode == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
+
+ UUID nodeId = affNode.id();
+
+ GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+
+ if (mapped == null) {
+ mapped = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ keys.size());
+
+ pendingMappings.put(nodeId, mapped);
+ }
- // Create mappings first, then send messages.
- for (Object key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
+ mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+
+ i++;
+ }
+ }
+
+ return pendingMappings;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @return Request.
+ * @throws Exception If failed.
+ */
+ private GridNearAtomicUpdateRequestInterface mapSingleUpdate(AffinityTopologyVersion topVer,
+ Collection<ClusterNode> topNodes, GridCacheVersion futVer, @Nullable GridCacheVersion updVer)
+ throws Exception {
+ Object key = F.first(keys);
Object val;
GridCacheVersion conflictVer;
@@ -839,273 +1139,169 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
long conflictExpireTime;
if (vals != null) {
- val = it.next();
+ // Regular PUT.
+ val = F.first(vals);
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-
- if (val == null)
- throw new NullPointerException("Null value.");
}
else if (conflictPutVals != null) {
- GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
+ // Conflict PUT.
+ GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
+ conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
}
else if (conflictRmvVals != null) {
+ // Conflict REMOVE.
val = null;
- conflictVer = conflictRmvValsIt.next();
+ conflictVer = F.first(conflictRmvVals);
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
else {
+ // Regular REMOVE.
val = null;
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
+ // We still can get here if user pass map with single element.
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
if (val == null && op != GridCacheOperation.DELETE)
- continue;
+ throw new NullPointerException("Null value.");
KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
- if (remapKeys != null && !remapKeys.contains(cacheKey))
- continue;
-
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
- Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+ ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
- if (affNodes.isEmpty())
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
+ if (primary == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid).");
- int i = 0;
+ // Decide whether we will use optimzied version of update request.
+ boolean optimize = true;
- for (ClusterNode affNode : affNodes) {
- if (affNode == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
+ for (ClusterNode topNode : topNodes) {
+ if (topNode.version().compareTo(SINGLE_PUT_MSG_SINCE) < 0) {
+ optimize = false;
- UUID nodeId = affNode.id();
-
- GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
-
- if (mapped == null) {
- mapped = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- nodeId,
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- keys.size());
-
- pendingMappings.put(nodeId, mapped);
+ break;
}
+ }
- mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
-
- i++;
+ if (optimize) {
+ return new GridNearAtomicSingleUpdateRequest(
+ cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled());
+ }
+ else {
+ GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ 1);
+
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
+
+ return req;
}
}
- return pendingMappings;
- }
-
- /**
- * @param topVer Topology version.
- * @param topNodes Topology nodes.
- * @param futVer Future version.
- * @param updVer Update version.
- * @return Request.
- * @throws Exception If failed.
- */
- private GridNearAtomicUpdateRequestInterface mapSingleUpdate(AffinityTopologyVersion topVer,
- Collection<ClusterNode> topNodes, GridCacheVersion futVer, @Nullable GridCacheVersion updVer) throws Exception {
- Object key = F.first(keys);
-
- Object val;
- GridCacheVersion conflictVer;
- long conflictTtl;
- long conflictExpireTime;
-
- if (vals != null) {
- // Regular PUT.
- val = F.first(vals);
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else if (conflictPutVals != null) {
- // Conflict PUT.
- GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
-
- val = conflictPutVal.valueEx();
- conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
- conflictExpireTime = conflictPutVal.expireTime();
- }
- else if (conflictRmvVals != null) {
- // Conflict REMOVE.
- val = null;
- conflictVer = F.first(conflictRmvVals);
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else {
- // Regular REMOVE.
- val = null;
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ /**
+ * @param ret Result from single node.
+ */
+ @SuppressWarnings("unchecked")
+ private void addInvokeResults(GridCacheReturn ret) {
+ assert op == TRANSFORM : op;
+ assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+ if (ret.value() != null) {
+ if (opRes != null)
+ opRes.mergeEntryProcessResults(ret);
+ else
+ opRes = ret;
+ }
}
- // We still can get here if user pass map with single element.
- if (key == null)
- throw new NullPointerException("Null key.");
+ /**
+ * @param failedKeys Failed keys.
+ * @param topVer Topology version for failed update.
+ * @param err Error cause.
+ */
+ private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+ AffinityTopologyVersion topVer,
+ Throwable err) {
+ CachePartialUpdateCheckedException err0 = this.err;
- if (val == null && op != GridCacheOperation.DELETE)
- throw new NullPointerException("Null value.");
+ if (err0 == null)
+ err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
- KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+ Collection<Object> keys = new ArrayList<>(failedKeys.size());
- if (op != TRANSFORM)
- val = cctx.toCacheObject(val);
+ for (KeyCacheObject key : failedKeys)
+ keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
- ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
-
- if (primary == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid).");
-
- // Decide whether we will use optimzied version of update request.
- boolean optimize = true;
-
- for (ClusterNode topNode : topNodes) {
- if (topNode.version().compareTo(SINGLE_PUT_MSG_SINCE) < 0) {
- optimize = false;
-
- break;
- }
+ err0.add(keys, err, topVer);
}
- if (optimize) {
- return new GridNearAtomicSingleUpdateRequest(
- cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer,
- cctx.cacheId(),
- primary.id(),
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled());
- }
- else {
- GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- primary.id(),
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- 1);
-
- req.addUpdateEntry(cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer,
- true);
-
- return req;
- }
- }
-
- /**
- * @param ret Result from single node.
- */
- @SuppressWarnings("unchecked")
- private void addInvokeResults(GridCacheReturn ret) {
- assert op == TRANSFORM : op;
- assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
- if (ret.value() != null) {
- if (opRes != null)
- opRes.mergeEntryProcessResults(ret);
- else
- opRes = ret;
+ /** {@inheritDoc} */
+ @Override public synchronized String toString() {
+ return S.toString(UpdateState.class, this);
}
}
- /**
- * @param failedKeys Failed keys.
- * @param topVer Topology version for failed update.
- * @param err Error cause.
- */
- private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
- AffinityTopologyVersion topVer,
- Throwable err) {
- CachePartialUpdateCheckedException err0 = this.err;
-
- if (err0 == null)
- err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
-
- Collection<Object> keys = new ArrayList<>(failedKeys.size());
-
- for (KeyCacheObject key : failedKeys)
- keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
-
- err0.add(keys, err, topVer);
- }
-
/** {@inheritDoc} */
public String toString() {
return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());