You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/11/22 11:40:21 UTC
[32/50] [abbrv] ignite git commit: IGNITE-2523 "single put" NEAR
update request
http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
deleted file mode 100644
index 5057fbb..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ /dev/null
@@ -1,1092 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
-
-/**
- * Lite DHT cache update request sent from near node to primary node.
- */
-public class GridNearAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Message index. */
- public static final int CACHE_MSG_IDX = nextIndexId();
-
- /** Target node ID. */
- @GridDirectTransient
- private UUID nodeId;
-
- /** Future version. */
- private GridCacheVersion futVer;
-
- /** Fast map flag. */
- private boolean fastMap;
-
- /** Update version. Set to non-null if fastMap is {@code true}. */
- private GridCacheVersion updateVer;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
- private boolean topLocked;
-
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
-
- /** Update operation. */
- private GridCacheOperation op;
-
- /** Keys to update. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> keys;
-
- /** Values to update. */
- @GridDirectCollection(CacheObject.class)
- private List<CacheObject> vals;
-
- /** Partitions of keys. */
- @GridDirectCollection(int.class)
- private List<Integer> partIds;
-
- /** Entry processors. */
- @GridDirectTransient
- private List<EntryProcessor<Object, Object, Object>> entryProcessors;
-
- /** Entry processors bytes. */
- @GridDirectCollection(byte[].class)
- private List<byte[]> entryProcessorsBytes;
-
- /** Optional arguments for entry processor. */
- @GridDirectTransient
- private Object[] invokeArgs;
-
- /** Entry processor arguments bytes. */
- private byte[][] invokeArgsBytes;
-
- /** Conflict versions. */
- @GridDirectCollection(GridCacheVersion.class)
- private List<GridCacheVersion> conflictVers;
-
- /** Conflict TTLs. */
- private GridLongList conflictTtls;
-
- /** Conflict expire times. */
- private GridLongList conflictExpireTimes;
-
- /** Return value flag. */
- private boolean retval;
-
- /** Expiry policy. */
- @GridDirectTransient
- private ExpiryPolicy expiryPlc;
-
- /** Expiry policy bytes. */
- private byte[] expiryPlcBytes;
-
- /** Filter. */
- private CacheEntryPredicate[] filter;
-
- /** Flag indicating whether request contains primary keys. */
- private boolean hasPrimary;
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** Skip write-through to a persistent storage. */
- private boolean skipStore;
-
- /** */
- private boolean clientReq;
-
- /** Keep binary flag. */
- private boolean keepBinary;
-
- /** */
- @GridDirectTransient
- private GridNearAtomicUpdateResponse res;
-
- /** Maximum possible size of inner collections. */
- @GridDirectTransient
- private int initSize;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public GridNearAtomicUpdateRequest() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param cacheId Cache ID.
- * @param nodeId Node ID.
- * @param futVer Future version.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
- * @param topVer Topology version.
- * @param topLocked Topology locked flag.
- * @param syncMode Synchronization mode.
- * @param op Cache update operation.
- * @param retval Return value required flag.
- * @param expiryPlc Expiry policy.
- * @param invokeArgs Optional arguments for entry processor.
- * @param filter Optional filter for atomic check.
- * @param subjId Subject ID.
- * @param taskNameHash Task name hash code.
- * @param skipStore Skip write-through to a persistent storage.
- * @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
- * @param addDepInfo Deployment info flag.
- * @param maxEntryCnt Maximum entries count.
- */
- public GridNearAtomicUpdateRequest(
- int cacheId,
- UUID nodeId,
- GridCacheVersion futVer,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
- @NotNull AffinityTopologyVersion topVer,
- boolean topLocked,
- CacheWriteSynchronizationMode syncMode,
- GridCacheOperation op,
- boolean retval,
- @Nullable ExpiryPolicy expiryPlc,
- @Nullable Object[] invokeArgs,
- @Nullable CacheEntryPredicate[] filter,
- @Nullable UUID subjId,
- int taskNameHash,
- boolean skipStore,
- boolean keepBinary,
- boolean clientReq,
- boolean addDepInfo,
- int maxEntryCnt
- ) {
- assert futVer != null;
-
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futVer = futVer;
- this.fastMap = fastMap;
- this.updateVer = updateVer;
-
- this.topVer = topVer;
- this.topLocked = topLocked;
- this.syncMode = syncMode;
- this.op = op;
- this.retval = retval;
- this.expiryPlc = expiryPlc;
- this.invokeArgs = invokeArgs;
- this.filter = filter;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.skipStore = skipStore;
- this.keepBinary = keepBinary;
- this.clientReq = clientReq;
- this.addDepInfo = addDepInfo;
-
- // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
- // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
- // participate in request. As such, we know upper bound of all collections in request. If this bound is lower
- // than 10, we use it.
- initSize = Math.min(maxEntryCnt, 10);
-
- keys = new ArrayList<>(initSize);
-
- partIds = new ArrayList<>(initSize);
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /**
- * @return Mapped node ID.
- */
- public UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @param nodeId Node ID.
- */
- public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /**
- * @return Subject ID.
- */
- public UUID subjectId() {
- return subjId;
- }
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return Future version.
- */
- public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /**
- * @return Flag indicating whether this is fast-map udpate.
- */
- public boolean fastMap() {
- return fastMap;
- }
-
- /**
- * @return Update version for fast-map request.
- */
- public GridCacheVersion updateVersion() {
- return updateVer;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Topology locked flag.
- */
- public boolean topologyLocked() {
- return topLocked;
- }
-
- /**
- * @return {@code True} if request sent from client node.
- */
- public boolean clientRequest() {
- return clientReq;
- }
-
- /**
- * @return Cache write synchronization mode.
- */
- public CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
- }
-
- /**
- * @return Expiry policy.
- */
- public ExpiryPolicy expiry() {
- return expiryPlc;
- }
-
- /**
- * @return Return value flag.
- */
- public boolean returnValue() {
- return retval;
- }
-
- /**
- * @return Filter.
- */
- @Nullable public CacheEntryPredicate[] filter() {
- return filter;
- }
-
- /**
- * @return Skip write-through to a persistent storage.
- */
- public boolean skipStore() {
- return skipStore;
- }
-
- /**
- * @return Keep binary flag.
- */
- public boolean keepBinary() {
- return keepBinary;
- }
-
- /**
- * @param key Key to add.
- * @param val Optional update value.
- * @param conflictTtl Conflict TTL (optional).
- * @param conflictExpireTime Conflict expire time (optional).
- * @param conflictVer Conflict version (optional).
- * @param primary If given key is primary on this mapping.
- */
- public void addUpdateEntry(KeyCacheObject key,
- @Nullable Object val,
- long conflictTtl,
- long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary) {
- EntryProcessor<Object, Object, Object> entryProcessor = null;
-
- if (op == TRANSFORM) {
- assert val instanceof EntryProcessor : val;
-
- entryProcessor = (EntryProcessor<Object, Object, Object>)val;
- }
-
- assert val != null || op == DELETE;
-
- keys.add(key);
- partIds.add(key.partition());
-
- if (entryProcessor != null) {
- if (entryProcessors == null)
- entryProcessors = new ArrayList<>(initSize);
-
- entryProcessors.add(entryProcessor);
- }
- else if (val != null) {
- assert val instanceof CacheObject : val;
-
- if (vals == null)
- vals = new ArrayList<>(initSize);
-
- vals.add((CacheObject)val);
- }
-
- hasPrimary |= primary;
-
- // In case there is no conflict, do not create the list.
- if (conflictVer != null) {
- if (conflictVers == null) {
- conflictVers = new ArrayList<>(initSize);
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictVers.add(null);
- }
-
- conflictVers.add(conflictVer);
- }
- else if (conflictVers != null)
- conflictVers.add(null);
-
- if (conflictTtl >= 0) {
- if (conflictTtls == null) {
- conflictTtls = new GridLongList(keys.size());
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictTtls.add(CU.TTL_NOT_CHANGED);
- }
-
- conflictTtls.add(conflictTtl);
- }
-
- if (conflictExpireTime >= 0) {
- if (conflictExpireTimes == null) {
- conflictExpireTimes = new GridLongList(keys.size());
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
- }
-
- conflictExpireTimes.add(conflictExpireTime);
- }
- }
-
- /**
- * @return Keys for this update request.
- */
- public List<KeyCacheObject> keys() {
- return keys;
- }
-
- /**
- * @return Values for this update request.
- */
- public List<?> values() {
- return op == TRANSFORM ? entryProcessors : vals;
- }
-
- /**
- * @return Update operation.
- */
- public GridCacheOperation operation() {
- return op;
- }
-
- /**
- * @return Optional arguments for entry processor.
- */
- @Nullable public Object[] invokeArguments() {
- return invokeArgs;
- }
-
- /**
- * @param idx Key index.
- * @return Value.
- */
- @SuppressWarnings("unchecked")
- public CacheObject value(int idx) {
- assert op == UPDATE : op;
-
- return vals.get(idx);
- }
-
- /**
- * @param idx Key index.
- * @return Entry processor.
- */
- @SuppressWarnings("unchecked")
- public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
- assert op == TRANSFORM : op;
-
- return entryProcessors.get(idx);
- }
-
- /**
- * @param idx Index to get.
- * @return Write value - either value, or transform closure.
- */
- public CacheObject writeValue(int idx) {
- if (vals != null)
- return vals.get(idx);
-
- return null;
- }
-
- /**
- * @return Conflict versions.
- */
- @Nullable public List<GridCacheVersion> conflictVersions() {
- return conflictVers;
- }
-
- /**
- * @param idx Index.
- * @return Conflict version.
- */
- @Nullable public GridCacheVersion conflictVersion(int idx) {
- if (conflictVers != null) {
- assert idx >= 0 && idx < conflictVers.size();
-
- return conflictVers.get(idx);
- }
-
- return null;
- }
-
- /**
- * @param idx Index.
- * @return Conflict TTL.
- */
- public long conflictTtl(int idx) {
- if (conflictTtls != null) {
- assert idx >= 0 && idx < conflictTtls.size();
-
- return conflictTtls.get(idx);
- }
-
- return CU.TTL_NOT_CHANGED;
- }
-
- /**
- * @param idx Index.
- * @return Conflict expire time.
- */
- public long conflictExpireTime(int idx) {
- if (conflictExpireTimes != null) {
- assert idx >= 0 && idx < conflictExpireTimes.size();
-
- return conflictExpireTimes.get(idx);
- }
-
- return CU.EXPIRE_TIME_CALCULATE;
- }
-
- /**
- * @return Flag indicating whether this request contains primary keys.
- */
- public boolean hasPrimary() {
- return hasPrimary;
- }
-
- /**
- * @param res Response.
- * @return {@code True} if current response was {@code null}.
- */
- public boolean onResponse(GridNearAtomicUpdateResponse res) {
- if (this.res == null) {
- this.res = res;
-
- return true;
- }
-
- return false;
- }
-
- /**
- * @return Response.
- */
- @Nullable public GridNearAtomicUpdateResponse response() {
- return res;
- }
-
- /** {@inheritDoc}
- * @param ctx*/
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- prepareMarshalCacheObjects(keys, cctx);
-
- if (filter != null) {
- boolean hasFilter = false;
-
- for (CacheEntryPredicate p : filter) {
- if (p != null) {
- hasFilter = true;
-
- p.prepareMarshal(cctx);
- }
- }
-
- if (!hasFilter)
- filter = null;
- }
-
- if (expiryPlc != null && expiryPlcBytes == null)
- expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
-
- if (op == TRANSFORM) {
- // force addition of deployment info for entry processors if P2P is enabled globally.
- if (!addDepInfo && ctx.deploymentEnabled())
- addDepInfo = true;
-
- if (entryProcessorsBytes == null)
- entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
-
- if (invokeArgsBytes == null)
- invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
- }
- else
- prepareMarshalCacheObjects(vals, cctx);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- finishUnmarshalCacheObjects(keys, cctx, ldr);
-
- if (op == TRANSFORM) {
- if (entryProcessors == null)
- entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
-
- if (invokeArgs == null)
- invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
- }
- else
- finishUnmarshalCacheObjects(vals, cctx, ldr);
-
- if (filter != null) {
- for (CacheEntryPredicate p : filter) {
- if (p != null)
- p.finishUnmarshal(cctx, ldr);
- }
- }
-
- if (expiryPlcBytes != null && expiryPlc == null)
- expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
-
- if (partIds != null && !partIds.isEmpty()) {
- assert partIds.size() == keys.size();
-
- for (int i = 0; i < keys.size(); i++)
- keys.get(i).partition(partIds.get(i));
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
- return ctx.atomicMessageLogger();
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeBoolean("clientReq", clientReq))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage("conflictTtls", conflictTtls))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeBoolean("fastMap", fastMap))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMessage("futVer", futVer))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeBoolean("hasPrimary", hasPrimary))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeBoolean("keepBinary", keepBinary))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 17:
- if (!writer.writeBoolean("retval", retval))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeBoolean("skipStore", skipStore))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 21:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeBoolean("topLocked", topLocked))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeMessage("updateVer", updateVer))
- return false;
-
- writer.incrementState();
-
- case 25:
- if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 26:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- clientReq = reader.readBoolean("clientReq");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- conflictExpireTimes = reader.readMessage("conflictExpireTimes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- conflictTtls = reader.readMessage("conflictTtls");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- fastMap = reader.readBoolean("fastMap");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- futVer = reader.readMessage("futVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- hasPrimary = reader.readBoolean("hasPrimary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- keepBinary = reader.readBoolean("keepBinary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- byte opOrd;
-
- opOrd = reader.readByte("op");
-
- if (!reader.isLastRead())
- return false;
-
- op = GridCacheOperation.fromOrdinal(opOrd);
-
- reader.incrementState();
-
- case 17:
- retval = reader.readBoolean("retval");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- skipStore = reader.readBoolean("skipStore");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 21:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- topLocked = reader.readBoolean("topLocked");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 23:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- updateVer = reader.readMessage("updateVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 25:
- vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 26:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearAtomicUpdateRequest.class);
- }
-
- /**
- * Cleanup values.
- *
- * @param clearKeys If {@code true} clears keys.
- */
- public void cleanup(boolean clearKeys) {
- vals = null;
- entryProcessors = null;
- entryProcessorsBytes = null;
- invokeArgs = null;
- invokeArgsBytes = null;
-
- if (clearKeys)
- keys = null;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 40;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 27;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearAtomicUpdateRequest.class, this, "filter", Arrays.toString(filter),
- "parent", super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 09aec81..a9245f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -661,6 +662,15 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
return true;
}
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req,
+ AffinityTopologyVersion topVer) {
+ if (!needForceKeys())
+ return null;
+
+ return request0(req.keys(), topVer);
+ }
+
/**
* @param keys Keys to request.
* @return Future for request.
@@ -670,6 +680,16 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
if (!needForceKeys())
return null;
+ return request0(keys, topVer);
+ }
+
+ /**
+ * @param keys Keys to request.
+ * @param topVer Topology version.
+ * @return Future for request.
+ */
+ @SuppressWarnings({"unchecked", "RedundantCast"})
+ private GridDhtFuture<Object> request0(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 3e0e392..b5b2c72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -44,7 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -126,10 +126,10 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
* @param res Update response.
*/
public void processNearAtomicUpdateResponse(
- GridNearAtomicUpdateRequest req,
+ GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res
) {
- if (F.size(res.failedKeys()) == req.keys().size())
+ if (F.size(res.failedKeys()) == req.size())
return;
/*
@@ -152,11 +152,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
- for (int i = 0; i < req.keys().size(); i++) {
+ for (int i = 0; i < req.size(); i++) {
if (F.contains(skipped, i))
continue;
- KeyCacheObject key = req.keys().get(i);
+ KeyCacheObject key = req.key(i);
if (F.contains(failed, key))
continue;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 0d291cc..71d236f 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -75,13 +75,11 @@ org.apache.ignite.cache.eviction.sorted.SortedEvictionPolicy$HolderComparator
org.apache.ignite.cache.query.CacheQueryEntryEvent
org.apache.ignite.cache.query.ContinuousQuery
org.apache.ignite.cache.query.Query
-org.apache.ignite.cache.query.QueryCancelledException
org.apache.ignite.cache.query.ScanQuery
org.apache.ignite.cache.query.SpiQuery
org.apache.ignite.cache.query.SqlFieldsQuery
org.apache.ignite.cache.query.SqlQuery
org.apache.ignite.cache.query.TextQuery
-org.apache.ignite.cache.store.jdbc.CacheAbstractJdbcStore$2
org.apache.ignite.cache.store.jdbc.CacheAbstractJdbcStore$EntryMapping$1
org.apache.ignite.cache.store.jdbc.CacheAbstractJdbcStore$EntryMapping$2
org.apache.ignite.cache.store.jdbc.CacheAbstractJdbcStore$TypeKind
@@ -368,7 +366,6 @@ org.apache.ignite.internal.processors.cache.CacheWeakQueryIteratorsHolder$WeakQu
org.apache.ignite.internal.processors.cache.CacheWeakQueryIteratorsHolder$WeakReferenceCloseableIterator
org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch
org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest
-org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy
org.apache.ignite.internal.processors.cache.GridCacheAdapter
org.apache.ignite.internal.processors.cache.GridCacheAdapter$10
org.apache.ignite.internal.processors.cache.GridCacheAdapter$11
@@ -376,9 +373,9 @@ org.apache.ignite.internal.processors.cache.GridCacheAdapter$12
org.apache.ignite.internal.processors.cache.GridCacheAdapter$13
org.apache.ignite.internal.processors.cache.GridCacheAdapter$14
org.apache.ignite.internal.processors.cache.GridCacheAdapter$15
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$16
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$16$1
org.apache.ignite.internal.processors.cache.GridCacheAdapter$17
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$18$1
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$18
org.apache.ignite.internal.processors.cache.GridCacheAdapter$2
org.apache.ignite.internal.processors.cache.GridCacheAdapter$26$1
org.apache.ignite.internal.processors.cache.GridCacheAdapter$28
@@ -536,7 +533,6 @@ org.apache.ignite.internal.processors.cache.GridCacheUtils$8
org.apache.ignite.internal.processors.cache.GridCacheUtils$9
org.apache.ignite.internal.processors.cache.GridCacheValueCollection
org.apache.ignite.internal.processors.cache.GridCacheValueCollection$1
-org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender$DeferredAckMessageBuffer
org.apache.ignite.internal.processors.cache.IgniteCacheProxy
org.apache.ignite.internal.processors.cache.IgniteCacheProxy$1
org.apache.ignite.internal.processors.cache.IgniteCacheProxy$10
@@ -549,13 +545,12 @@ org.apache.ignite.internal.processors.cache.IgniteCacheProxy$8
org.apache.ignite.internal.processors.cache.IgniteCacheProxy$9
org.apache.ignite.internal.processors.cache.KeyCacheObject
org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl
-org.apache.ignite.internal.processors.cache.QueryCursorImpl$State
org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy
org.apache.ignite.internal.processors.cache.binary.BinaryMetadataKey
org.apache.ignite.internal.processors.cache.binary.CacheDefaultBinaryAffinityKeyMapper
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl$1
+org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl$4
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl$5
-org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl$6
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl$MetaDataEntryFilter
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl$MetaDataPredicate
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl$MetadataProcessor
@@ -635,7 +630,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal$2
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter$1
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter$2
-org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture$2
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture$3
@@ -676,24 +670,26 @@ org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomic
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$27
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$28
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$29
+org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$3
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$30
-org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$31
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$4
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$5
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$6
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$7
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$8
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$9
+org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$DeferredResponseBuffer
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse
-org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFuture$2
+org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFuture$3
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateFuture$2
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateFuture$3
-org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest
+org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateFuture$4
+org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse
org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache
org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache$2
@@ -764,7 +760,6 @@ org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture$
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture$2
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture$3
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture$4
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture$LockTimeoutObject$1
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture$MiniFuture$1
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse
@@ -777,7 +772,6 @@ org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticS
org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture$1
org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture$2
org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture$3
-org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture$4
org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture$MiniFuture$1
org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFutureAdapter$1
org.apache.ignite.internal.processors.cache.distributed.near.GridNearPessimisticTxPrepareFuture$1
@@ -827,23 +821,22 @@ org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManag
org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager$7
org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter$1
org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter$ScanQueryFallbackClosableIterator
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryDetailMetricsAdapter
org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter$1
org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter$2
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$10
+org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$1$1
+org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$1$2
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$11
+org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$12
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$13
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$14
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$15
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$16
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$17$1
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$3$1
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$3$2
+org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$15$1
+org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$2
+org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$3
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$4
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$5
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$6
-org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$7
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$8
+org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$9
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$CacheSqlIndexMetadata
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$CacheSqlMetadata
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager$CachedResult$QueueIterator
@@ -903,8 +896,6 @@ org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$12
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$13
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$14
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$15
-org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$16
-org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$17
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$2
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$3
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$4
@@ -938,9 +929,9 @@ org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter$Po
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter$PostLockClosure1$4
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter$PostLockClosure2
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter$PostMissClosure
+org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$2
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$3
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$4
-org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$5
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$CommitListener
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$CommittedVersion
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$NodeFailureTimeoutObject$1
@@ -1014,6 +1005,7 @@ org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$1
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$4
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5
+org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$6
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer$1
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer$2
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$DataStreamerPda
@@ -1092,7 +1084,6 @@ org.apache.ignite.internal.processors.datastructures.GridCacheSetProxy
org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate
org.apache.ignite.internal.processors.dr.GridDrType
org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater
-org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo
org.apache.ignite.internal.processors.hadoop.HadoopFileBlock
org.apache.ignite.internal.processors.hadoop.HadoopInputSplit
org.apache.ignite.internal.processors.hadoop.HadoopJobId
@@ -1128,6 +1119,7 @@ org.apache.ignite.internal.processors.igfs.IgfsFragmentizerManager$IdentityHashS
org.apache.ignite.internal.processors.igfs.IgfsFragmentizerRequest
org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse
org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse
+org.apache.ignite.internal.processors.igfs.IgfsImpl$12$1
org.apache.ignite.internal.processors.igfs.IgfsImpl$IgfsGlobalSpaceTask
org.apache.ignite.internal.processors.igfs.IgfsImpl$IgfsGlobalSpaceTask$1
org.apache.ignite.internal.processors.igfs.IgfsInputStreamDescriptor
@@ -1163,7 +1155,6 @@ org.apache.ignite.internal.processors.igfs.client.IgfsClientSummaryCallable
org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallable
org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaIdsForPathCallable
org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaInfoForPathCallable
-org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaUnlockCallable
org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor
org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryCreateProcessor
org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingAddProcessor
@@ -1181,8 +1172,6 @@ org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdateTimesProcessor
org.apache.ignite.internal.processors.job.GridJobProcessor$5
org.apache.ignite.internal.processors.job.GridJobWorker$3
org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor$SnapshotReducer
-org.apache.ignite.internal.processors.odbc.OdbcProtocolVersion
-org.apache.ignite.internal.processors.odbc.escape.OdbcEscapeType
org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure
org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate
org.apache.ignite.internal.processors.platform.PlatformEventFilterListener
@@ -1191,7 +1180,7 @@ org.apache.ignite.internal.processors.platform.PlatformExtendedException
org.apache.ignite.internal.processors.platform.PlatformJavaObjectFactoryProxy
org.apache.ignite.internal.processors.platform.PlatformNativeException
org.apache.ignite.internal.processors.platform.PlatformNoCallbackException
-org.apache.ignite.internal.processors.platform.cache.PlatformCache$5
+org.apache.ignite.internal.processors.platform.cache.PlatformCache$1
org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter
org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilterImpl
org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryProcessor
@@ -1222,8 +1211,6 @@ org.apache.ignite.internal.processors.platform.cpp.PlatformCppConfigurationClosu
org.apache.ignite.internal.processors.platform.datastreamer.PlatformStreamReceiver
org.apache.ignite.internal.processors.platform.datastreamer.PlatformStreamReceiverImpl
org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore$1
-org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore$10
-org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore$11
org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore$2
org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore$3
org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore$4
@@ -1231,7 +1218,6 @@ org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore$5
org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore$6
org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore$7
org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore$8
-org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore$9
org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetConfigurationClosure
org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetService
org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl
@@ -1246,24 +1232,19 @@ org.apache.ignite.internal.processors.platform.transactions.PlatformTransactions
org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils$1
org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils$FutureListenable$1
org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils$InternalFutureListenable$1
-org.apache.ignite.internal.processors.platform.websession.PlatformDotNetSessionLockProcessor
-org.apache.ignite.internal.processors.platform.websession.PlatformDotNetSessionSetAndUnlockProcessor
org.apache.ignite.internal.processors.query.GridQueryFieldMetadata
org.apache.ignite.internal.processors.query.GridQueryIndexType
+org.apache.ignite.internal.processors.query.GridQueryProcessor$2
org.apache.ignite.internal.processors.query.GridQueryProcessor$3
org.apache.ignite.internal.processors.query.GridQueryProcessor$4
org.apache.ignite.internal.processors.query.GridQueryProcessor$5
org.apache.ignite.internal.processors.query.GridQueryProcessor$6
-org.apache.ignite.internal.processors.query.GridQueryProcessor$7
-org.apache.ignite.internal.processors.query.GridQueryProcessor$8
org.apache.ignite.internal.processors.query.GridQueryProcessor$IndexType
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest
-org.apache.ignite.internal.processors.resource.GridResourceIoc$AnnotationSet
-org.apache.ignite.internal.processors.resource.GridResourceIoc$ResourceAnnotation
org.apache.ignite.internal.processors.rest.GridRestCommand
org.apache.ignite.internal.processors.rest.GridRestProcessor$2$1
org.apache.ignite.internal.processors.rest.GridRestProcessor$3
@@ -1351,7 +1332,7 @@ org.apache.ignite.internal.processors.service.ServiceDescriptorImpl
org.apache.ignite.internal.processors.task.GridTaskProcessor$1
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey
org.apache.ignite.internal.processors.task.GridTaskWorker$3
-org.apache.ignite.internal.processors.task.GridTaskWorker$5
+org.apache.ignite.internal.processors.task.GridTaskWorker$4
org.apache.ignite.internal.processors.task.GridTaskWorker$State
org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException
org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException
@@ -1375,7 +1356,6 @@ org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap
org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet
org.apache.ignite.internal.util.GridBoundedLinkedHashMap
org.apache.ignite.internal.util.GridBoundedLinkedHashSet
-org.apache.ignite.internal.util.GridBoundedPriorityQueue
org.apache.ignite.internal.util.GridByteArrayList
org.apache.ignite.internal.util.GridCloseableIteratorAdapter
org.apache.ignite.internal.util.GridCloseableIteratorAdapterEx
@@ -1427,14 +1407,14 @@ org.apache.ignite.internal.util.IgniteUtils$11
org.apache.ignite.internal.util.IgniteUtils$12
org.apache.ignite.internal.util.IgniteUtils$13
org.apache.ignite.internal.util.IgniteUtils$14
-org.apache.ignite.internal.util.IgniteUtils$15
-org.apache.ignite.internal.util.IgniteUtils$17
+org.apache.ignite.internal.util.IgniteUtils$16
+org.apache.ignite.internal.util.IgniteUtils$2
+org.apache.ignite.internal.util.IgniteUtils$22
org.apache.ignite.internal.util.IgniteUtils$23
org.apache.ignite.internal.util.IgniteUtils$24
org.apache.ignite.internal.util.IgniteUtils$25
org.apache.ignite.internal.util.IgniteUtils$26
org.apache.ignite.internal.util.IgniteUtils$27
-org.apache.ignite.internal.util.IgniteUtils$28
org.apache.ignite.internal.util.IgniteUtils$3
org.apache.ignite.internal.util.IgniteUtils$4
org.apache.ignite.internal.util.IgniteUtils$5
@@ -1667,19 +1647,12 @@ org.apache.ignite.internal.visor.cache.VisorCachePartitionsTask
org.apache.ignite.internal.visor.cache.VisorCachePartitionsTask$VisorCachePartitionsJob
org.apache.ignite.internal.visor.cache.VisorCacheQueryConfiguration
org.apache.ignite.internal.visor.cache.VisorCacheQueryConfigurationV2
-org.apache.ignite.internal.visor.cache.VisorCacheQueryDetailMetrics
-org.apache.ignite.internal.visor.cache.VisorCacheQueryDetailMetricsCollectorTask
-org.apache.ignite.internal.visor.cache.VisorCacheQueryDetailMetricsCollectorTask$VisorCacheQueryDetailMetricsCollectorJob
org.apache.ignite.internal.visor.cache.VisorCacheQueryMetrics
org.apache.ignite.internal.visor.cache.VisorCacheRebalanceConfiguration
org.apache.ignite.internal.visor.cache.VisorCacheRebalanceTask
org.apache.ignite.internal.visor.cache.VisorCacheRebalanceTask$VisorCachesRebalanceJob
org.apache.ignite.internal.visor.cache.VisorCacheResetMetricsTask
org.apache.ignite.internal.visor.cache.VisorCacheResetMetricsTask$VisorCacheResetMetricsJob
-org.apache.ignite.internal.visor.cache.VisorCacheResetQueryDetailMetricsTask
-org.apache.ignite.internal.visor.cache.VisorCacheResetQueryDetailMetricsTask$VisorCacheResetQueryDetailMetricsJob
-org.apache.ignite.internal.visor.cache.VisorCacheResetQueryMetricsTask
-org.apache.ignite.internal.visor.cache.VisorCacheResetQueryMetricsTask$VisorCacheResetQueryMetricsJob
org.apache.ignite.internal.visor.cache.VisorCacheStartTask
org.apache.ignite.internal.visor.cache.VisorCacheStartTask$VisorCacheStartArg
org.apache.ignite.internal.visor.cache.VisorCacheStartTask$VisorCacheStartJob
@@ -1709,7 +1682,6 @@ org.apache.ignite.internal.visor.debug.VisorThreadLockInfo
org.apache.ignite.internal.visor.debug.VisorThreadMonitorInfo
org.apache.ignite.internal.visor.event.VisorGridDeploymentEvent
org.apache.ignite.internal.visor.event.VisorGridDiscoveryEvent
-org.apache.ignite.internal.visor.event.VisorGridDiscoveryEventV2
org.apache.ignite.internal.visor.event.VisorGridEvent
org.apache.ignite.internal.visor.event.VisorGridEventsLost
org.apache.ignite.internal.visor.event.VisorGridJobEvent
@@ -1797,7 +1769,6 @@ org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException
org.apache.ignite.internal.visor.util.VisorEventMapper
org.apache.ignite.internal.visor.util.VisorExceptionWrapper
org.apache.ignite.internal.visor.util.VisorTaskUtils$4
-org.apache.ignite.internal.visor.util.VisorTaskUtils$5
org.apache.ignite.internal.websession.WebSessionAttributeProcessor
org.apache.ignite.internal.websession.WebSessionEntity
org.apache.ignite.lang.IgniteBiClosure
http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java
new file mode 100644
index 0000000..8822115
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAtomicSingleMessageCountSelfTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Tests single / transform messages being sent between nodes in ATOMIC mode.
+ */
+public class CacheAtomicSingleMessageCountSelfTest extends GridCommonAbstractTest {
+ /** VM ip finder for TCP discovery. */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Starting grid index. */
+ private int idx;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setForceServerMode(true);
+ discoSpi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ CacheConfiguration cCfg = new CacheConfiguration();
+
+ cCfg.setCacheMode(PARTITIONED);
+ cCfg.setBackups(1);
+ cCfg.setWriteSynchronizationMode(FULL_SYNC);
+ cCfg.setAtomicWriteOrderMode(PRIMARY);
+
+ cfg.setCacheConfiguration(cCfg);
+
+ if (idx++ == 0)
+ cfg.setClientMode(true);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSingleMessage() throws Exception {
+ startGrids(2);
+
+ try {
+ awaitPartitionMapExchange();
+
+ TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi();
+ commSpi.resetCount();
+
+ commSpi.registerMessage(GridNearAtomicFullUpdateRequest.class);
+ commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class);
+ commSpi.registerMessage(GridNearAtomicSingleUpdateInvokeRequest.class);
+ commSpi.registerMessage(GridNearAtomicSingleUpdateFilterRequest.class);
+
+ int putCnt = 15;
+
+ for (int i = 0; i < putCnt; i++)
+ jcache(0).put(i, i);
+
+ assertEquals(0, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
+ assertEquals(putCnt, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
+ assertEquals(0, commSpi.messageCount(GridNearAtomicSingleUpdateInvokeRequest.class));
+ assertEquals(0, commSpi.messageCount(GridNearAtomicSingleUpdateFilterRequest.class));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSingleTransformMessage() throws Exception {
+ startGrids(2);
+
+ int cacheId = ((IgniteKernal)grid(0)).internalCache(null).context().cacheId();
+
+ try {
+ awaitPartitionMapExchange();
+
+ TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi();
+
+ commSpi.resetCount();
+ commSpi.filterCacheId(cacheId);
+
+ commSpi.registerMessage(GridNearAtomicFullUpdateRequest.class);
+ commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class);
+ commSpi.registerMessage(GridNearAtomicSingleUpdateInvokeRequest.class);
+ commSpi.registerMessage(GridNearAtomicSingleUpdateFilterRequest.class);
+
+ int putCnt = 15;
+
+ for (int i = 0; i < putCnt; i++) {
+ jcache(0).invoke(i, new CacheEntryProcessor<Object, Object, Object>() {
+ @Override public Object process(MutableEntry<Object, Object> entry,
+ Object... objects) throws EntryProcessorException {
+ return 2;
+ }
+ });
+ }
+
+ assertEquals(0, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
+ assertEquals(0, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
+ assertEquals(putCnt, commSpi.messageCount(GridNearAtomicSingleUpdateInvokeRequest.class));
+ assertEquals(0, commSpi.messageCount(GridNearAtomicSingleUpdateFilterRequest.class));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSingleFilterMessage() throws Exception {
+ startGrids(2);
+
+ try {
+ awaitPartitionMapExchange();
+
+ TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi();
+
+ commSpi.resetCount();
+
+ commSpi.registerMessage(GridNearAtomicFullUpdateRequest.class);
+ commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class);
+ commSpi.registerMessage(GridNearAtomicSingleUpdateInvokeRequest.class);
+ commSpi.registerMessage(GridNearAtomicSingleUpdateFilterRequest.class);
+
+ int putCnt = 15;
+
+ for (int i = 0; i < putCnt; i++)
+ jcache(0).putIfAbsent(i, i);
+
+ assertEquals(0, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
+ assertEquals(0, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
+ assertEquals(0, commSpi.messageCount(GridNearAtomicSingleUpdateInvokeRequest.class));
+ assertEquals(putCnt, commSpi.messageCount(GridNearAtomicSingleUpdateFilterRequest.class));
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Test communication SPI.
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** Counters map. */
+ private Map<Class<?>, AtomicInteger> cntMap = new HashMap<>();
+
+ /** Cache id to filter */
+ private volatile Integer filterCacheId;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
+ throws IgniteSpiException {
+
+ if (((GridIoMessage)msg).message() instanceof GridCacheMessage) {
+ int msgCacheId = ((GridCacheMessage)((GridIoMessage)msg).message()).cacheId();
+
+ if (filterCacheId == null || filterCacheId == msgCacheId) {
+ AtomicInteger cntr = cntMap.get(((GridIoMessage)msg).message().getClass());
+
+ if (cntr != null)
+ cntr.incrementAndGet();
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /**
+ * Registers message for counting.
+ *
+ * @param cls Class to count.
+ */
+ void registerMessage(Class<?> cls) {
+ AtomicInteger cntr = cntMap.get(cls);
+
+ if (cntr == null)
+ cntMap.put(cls, new AtomicInteger());
+ }
+
+ /**
+ * @param cls Message type to get count.
+ * @return Number of messages of given class.
+ */
+ int messageCount(Class<?> cls) {
+ AtomicInteger cntr = cntMap.get(cls);
+
+ return cntr == null ? 0 : cntr.get();
+ }
+
+ /**
+ * Resets counter to zero.
+ */
+ void resetCount() {
+ cntMap.clear();
+ filterCacheId = null;
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ */
+ void filterCacheId(int cacheId) {
+ filterCacheId = cacheId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index 0e17102..a6d612a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -28,7 +28,8 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
@@ -137,11 +138,13 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi();
- commSpi.registerMessage(GridNearAtomicUpdateRequest.class);
+ commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class);
+ commSpi.registerMessage(GridNearAtomicFullUpdateRequest.class);
commSpi.registerMessage(GridDhtAtomicUpdateRequest.class);
int putCnt = 15;
+ int expNearSingleCnt = 0;
int expNearCnt = 0;
int expDhtCnt = 0;
@@ -160,20 +163,22 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
if (affinity.isPrimary(locNode, i))
expDhtCnt++;
else
- expNearCnt ++;
+ expNearSingleCnt++;
}
jcache(0).put(i, i);
}
- assertEquals(expNearCnt, commSpi.messageCount(GridNearAtomicUpdateRequest.class));
+ assertEquals(expNearCnt, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
+ assertEquals(expNearSingleCnt, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
assertEquals(expDhtCnt, commSpi.messageCount(GridDhtAtomicUpdateRequest.class));
if (writeOrderMode == CLOCK) {
for (int i = 1; i < 4; i++) {
commSpi = (TestCommunicationSpi)grid(i).configuration().getCommunicationSpi();
- assertEquals(0, commSpi.messageCount(GridNearAtomicUpdateRequest.class));
+ assertEquals(0, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
+ assertEquals(0, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
assertEquals(0, commSpi.messageCount(GridDhtAtomicUpdateRequest.class));
}
}
@@ -181,7 +186,8 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
for (int i = 1; i < 4; i++) {
commSpi = (TestCommunicationSpi)grid(i).configuration().getCommunicationSpi();
- assertEquals(0, commSpi.messageCount(GridNearAtomicUpdateRequest.class));
+ assertEquals(0, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
+ assertEquals(0, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
index cdb7907..281397a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
/**
* Stopped node when client operations are executing.
@@ -31,28 +31,28 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu
/** {@inheritDoc} */
@Override public void testPut() throws Exception {
- bannedMsg.set(GridNearAtomicUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
}
/** {@inheritDoc} */
@Override public void testPutBatch() throws Exception {
- bannedMsg.set(GridNearAtomicUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
}
/** {@inheritDoc} */
@Override public void testPutAsync() throws Exception {
- bannedMsg.set(GridNearAtomicUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
}
/** {@inheritDoc} */
@Override public void testRemove() throws Exception {
- bannedMsg.set(GridNearAtomicUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
index f9608e1..fb2d0de 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -148,7 +148,7 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
* @throws Exception If failed.
*/
public void testResponseMessageOnUnmarshallingFailed() throws Exception {
- // GridNearAtomicUpdateRequest unmarshalling failed test.
+ // GridNearAtomicFullUpdateRequest unmarshalling failed test.
readCnt.set(1);
failAtomicPut(++key);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 49c3289..b4ef11a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -61,7 +61,7 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContex
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
@@ -230,8 +230,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
// Block messages requests for both nodes.
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearAtomicFullUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicFullUpdateRequest.class, ignite1.localNode().id());
final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
@@ -272,7 +272,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
map.put(i, i + 1);
// Block messages requests for single node.
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicFullUpdateRequest.class, ignite0.localNode().id());
putFut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
@@ -361,11 +361,11 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
// Block messages requests for both nodes.
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite2.localNode().id());
+ spi.blockMessages(GridNearAtomicFullUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicFullUpdateRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearAtomicFullUpdateRequest.class, ignite2.localNode().id());
- spi.record(GridNearAtomicUpdateRequest.class);
+ spi.record(GridNearAtomicFullUpdateRequest.class);
final IgniteCache<Integer, Integer> cache = ignite3.cache(null);
@@ -402,7 +402,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
assertEquals(3, msgs.size());
for (Object msg : msgs)
- assertTrue(((GridNearAtomicUpdateRequest)msg).clientRequest());
+ assertTrue(((GridNearAtomicFullUpdateRequest)msg).clientRequest());
map.put(primaryKey(ignite0.cache(null)), 3);
map.put(primaryKey(ignite1.cache(null)), 4);
@@ -459,8 +459,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
// Block messages requests for both nodes.
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearAtomicFullUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicFullUpdateRequest.class, ignite1.localNode().id());
final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 74d2d09..0899423 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -478,7 +478,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
Object origMsg = msg.message();
return delay &&
- ((origMsg instanceof GridNearAtomicUpdateRequest) || (origMsg instanceof GridDhtAtomicUpdateRequest));
+ ((origMsg instanceof GridNearAtomicAbstractUpdateRequest) || (origMsg instanceof GridDhtAtomicUpdateRequest));
}
}
}
\ No newline at end of file