You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:31 UTC
[39/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 0000000,e703dff..e095e1f
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@@ -1,0 -1,1127 +1,1115 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import javax.cache.expiry.*;
+ import javax.cache.processor.*;
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
+
+ /**
+ * Lite DHT cache update request sent from near node to primary node.
+ */
+ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> implements GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Target node ID. */
+ @GridDirectTransient
+ private UUID nodeId;
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Fast map flag. */
+ private boolean fastMap;
+
+ /** Update version. Set to non-null if fastMap is {@code true}. */
+ private GridCacheVersion updateVer;
+
+ /** Topology version. */
+ private long topVer;
+
+ /** Write synchronization mode. */
+ private CacheWriteSynchronizationMode syncMode;
+
+ /** Update operation. */
+ private GridCacheOperation op;
+
+ /** Keys to update. */
+ @GridDirectTransient
+ @GridToStringInclude
+ private List<K> keys;
+
+ /** Key bytes. */
+ @GridDirectCollection(byte[].class)
+ private List<byte[]> keyBytes;
+
+ /** Values to update. */
+ @GridDirectTransient
+ private List<Object> vals;
+
+ /** Value bytes. */
+ @GridDirectCollection(GridCacheValueBytes.class)
+ private List<GridCacheValueBytes> valBytes;
+
+ /** Optional arguments for entry processor. */
+ @GridDirectTransient
+ private Object[] invokeArgs;
+
+ /** Entry processor arguments bytes. */
+ private byte[][] invokeArgsBytes;
+
+ /** DR versions. */
+ @GridDirectCollection(GridCacheVersion.class)
+ private List<GridCacheVersion> drVers;
+
+ /** DR TTLs. */
+ private GridLongList drTtls;
+
+ /** DR TTLs. */
+ private GridLongList drExpireTimes;
+
+ /** Return value flag. */
+ private boolean retval;
+
+ /** Expiry policy. */
+ @GridDirectTransient
+ private ExpiryPolicy expiryPlc;
+
+ /** Expiry policy bytes. */
+ private byte[] expiryPlcBytes;
+
+ /** Filter. */
+ @GridDirectTransient
+ private IgnitePredicate<CacheEntry<K, V>>[] filter;
+
+ /** Filter bytes. */
+ private byte[][] filterBytes;
+
+ /** Flag indicating whether request contains primary keys. */
+ private boolean hasPrimary;
+
+ /** Force transform backups flag. */
+ @GridDirectVersion(2)
+ private boolean forceTransformBackups;
+
+ /** Subject ID. */
+ @GridDirectVersion(3)
+ private UUID subjId;
+
+ /** Task name hash. */
+ @GridDirectVersion(4)
+ private int taskNameHash;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridNearAtomicUpdateRequest() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID.
+ * @param futVer Future version.
+ * @param fastMap Fast map scheme flag.
+ * @param updateVer Update version set if fast map is performed.
+ * @param topVer Topology version.
+ * @param syncMode Synchronization mode.
+ * @param op Cache update operation.
+ * @param retval Return value required flag.
+ * @param forceTransformBackups Force transform backups flag.
+ * @param expiryPlc Expiry policy.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @param filter Optional filter for atomic check.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ */
+ public GridNearAtomicUpdateRequest(
+ int cacheId,
+ UUID nodeId,
+ GridCacheVersion futVer,
+ boolean fastMap,
+ @Nullable GridCacheVersion updateVer,
+ long topVer,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ boolean retval,
+ boolean forceTransformBackups,
+ @Nullable ExpiryPolicy expiryPlc,
+ @Nullable Object[] invokeArgs,
+ @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter,
+ @Nullable UUID subjId,
+ int taskNameHash
+ ) {
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futVer = futVer;
+ this.fastMap = fastMap;
+ this.updateVer = updateVer;
+
+ this.topVer = topVer;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.retval = retval;
+ this.forceTransformBackups = forceTransformBackups;
+ this.expiryPlc = expiryPlc;
+ this.invokeArgs = invokeArgs;
+ this.filter = filter;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+
+ keys = new ArrayList<>();
+ vals = new ArrayList<>();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /**
+ * @return Mapped node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Subject ID.
+ */
+ public UUID subjectId() {
+ return subjId;
+ }
+
+ /**
+ * @return Task name hash.
+ */
+ public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return Future version.
+ */
+ public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /**
+ * @return Flag indicating whether this is fast-map udpate.
+ */
+ public boolean fastMap() {
+ return fastMap;
+ }
+
+ /**
+ * @return Update version for fast-map request.
+ */
+ public GridCacheVersion updateVersion() {
+ return updateVer;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public long topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Cache write synchronization mode.
+ */
+ public CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return syncMode;
+ }
+
+ /**
+ * @return Expiry policy.
+ */
+ public ExpiryPolicy expiry() {
+ return expiryPlc;
+ }
+
+ /**
+ * @return Return value flag.
+ */
+ public boolean returnValue() {
+ return retval;
+ }
+
+ /**
+ * @return Filter.
+ */
+ @Nullable public IgnitePredicate<CacheEntry<K, V>>[] filter() {
+ return filter;
+ }
+
+ /**
+ * @param key Key to add.
+ * @param val Optional update value.
+ * @param drTtl DR TTL (optional).
+ * @param drExpireTime DR expire time (optional).
+ * @param drVer DR version (optional).
+ * @param primary If given key is primary on this mapping.
+ */
+ public void addUpdateEntry(K key,
+ @Nullable Object val,
+ long drTtl,
+ long drExpireTime,
+ @Nullable GridCacheVersion drVer,
+ boolean primary) {
+ assert val != null || op == DELETE;
+ assert op != TRANSFORM || val instanceof EntryProcessor;
+
+ keys.add(key);
+ vals.add(val);
+
+ hasPrimary |= primary;
+
+ // In case there is no DR, do not create the list.
+ if (drVer != null) {
+ if (drVers == null) {
+ drVers = new ArrayList<>();
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ drVers.add(null);
+ }
+
+ drVers.add(drVer);
+ }
+ else if (drVers != null)
+ drVers.add(drVer);
+
+ if (drTtl >= 0) {
+ if (drTtls == null) {
+ drTtls = new GridLongList(keys.size());
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ drTtls.add(-1);
+ }
+
+ drTtls.add(drTtl);
+ }
+
+ if (drExpireTime >= 0) {
+ if (drExpireTimes == null) {
+ drExpireTimes = new GridLongList(keys.size());
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ drExpireTimes.add(-1);
+ }
+
+ drExpireTimes.add(drExpireTime);
+ }
+ }
+
+ /**
+ * @return Keys for this update request.
+ */
+ public List<K> keys() {
+ return keys;
+ }
+
+ /**
+ * @return Values for this update request.
+ */
+ public List<Object> values() {
+ return vals;
+ }
+
+ /**
+ * @return Update operation.
+ */
+ public GridCacheOperation operation() {
+ return op;
+ }
+
+ /**
+ * @return Optional arguments for entry processor.
+ */
+ @Nullable public Object[] invokeArguments() {
+ return invokeArgs;
+ }
+
+ /**
+ * @param idx Key index.
+ * @return Value.
+ */
+ public V value(int idx) {
+ assert op == UPDATE : op;
+
+ return (V)vals.get(idx);
+ }
+
+ /**
+ * @param idx Key index.
+ * @return Entry processor.
+ */
+ public EntryProcessor<K, V, ?> entryProcessor(int idx) {
+ assert op == TRANSFORM : op;
+
+ return (EntryProcessor<K, V, ?>)vals.get(idx);
+ }
+
+ /**
+ * @param idx Index to get.
+ * @return Write value - either value, or transform closure.
+ */
+ public Object writeValue(int idx) {
+ if (vals != null) {
+ Object val = vals.get(idx);
+
+ if (val != null)
+ return val;
+ }
+
+ if (valBytes != null) {
+ GridCacheValueBytes valBytesTuple = valBytes.get(idx);
+
+ if (valBytesTuple != null && valBytesTuple.isPlain())
+ return valBytesTuple.get();
+ }
+
+ return null;
+ }
+
+ /**
+ * @param idx Key index.
+ * @return Value bytes.
+ */
+ public byte[] valueBytes(int idx) {
+ if (op != TRANSFORM && valBytes != null) {
+ GridCacheValueBytes valBytesTuple = valBytes.get(idx);
+
+ if (valBytesTuple != null && !valBytesTuple.isPlain())
+ return valBytesTuple.get();
+ }
+
+ return null;
+ }
+
+ /**
+ * @return DR versions.
+ */
+ @Nullable public List<GridCacheVersion> drVersions() {
+ return drVers;
+ }
+
+ /**
+ * @param idx Index.
+ * @return DR version.
+ */
+ @Nullable public GridCacheVersion drVersion(int idx) {
+ if (drVers != null) {
+ assert idx >= 0 && idx < drVers.size();
+
+ return drVers.get(idx);
+ }
+
+ return null;
+ }
+
+ /**
+ * @return DR TTLs.
+ */
+ @Nullable public GridLongList drTtls() {
+ return drTtls;
+ }
+
+ /**
+ * @param idx Index.
+ * @return DR TTL.
+ */
+ public long drTtl(int idx) {
+ if (drTtls != null) {
+ assert idx >= 0 && idx < drTtls.size();
+
+ return drTtls.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
+ * @return DR TTLs.
+ */
+ @Nullable public GridLongList drExpireTimes() {
+ return drExpireTimes;
+ }
+
+ /**
+ * @param idx Index.
+ * @return DR TTL.
+ */
+ public long drExpireTime(int idx) {
+ if (drExpireTimes != null) {
+ assert idx >= 0 && idx < drExpireTimes.size();
+
+ return drExpireTimes.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
+ * @return Flag indicating whether this request contains primary keys.
+ */
+ public boolean hasPrimary() {
+ return hasPrimary;
+ }
+
+ /**
+ * @return Force transform backups flag.
+ */
+ public boolean forceTransformBackups() {
+ return forceTransformBackups;
+ }
+
+ /**
+ * @param forceTransformBackups Force transform backups flag.
+ */
+ public void forceTransformBackups(boolean forceTransformBackups) {
+ this.forceTransformBackups = forceTransformBackups;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ keyBytes = marshalCollection(keys, ctx);
+ valBytes = marshalValuesCollection(vals, ctx);
+ filterBytes = marshalFilter(filter, ctx);
+ invokeArgsBytes = marshalInvokeArguments(invokeArgs, ctx);
+
+ if (expiryPlc != null)
+ expiryPlcBytes = CU.marshal(ctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ keys = unmarshalCollection(keyBytes, ctx, ldr);
+ vals = unmarshalValueBytesCollection(valBytes, ctx, ldr);
+ filter = unmarshalFilter(filterBytes, ctx, ldr);
+ invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+
+ if (expiryPlcBytes != null)
+ expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridNearAtomicUpdateRequest _clone = new GridNearAtomicUpdateRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridNearAtomicUpdateRequest _clone = (GridNearAtomicUpdateRequest)_msg;
+
+ _clone.nodeId = nodeId;
+ _clone.futVer = futVer;
+ _clone.fastMap = fastMap;
+ _clone.updateVer = updateVer;
+ _clone.topVer = topVer;
+ _clone.syncMode = syncMode;
+ _clone.op = op;
+ _clone.keys = keys;
+ _clone.keyBytes = keyBytes;
+ _clone.vals = vals;
+ _clone.valBytes = valBytes;
+ _clone.invokeArgs = invokeArgs;
+ _clone.invokeArgsBytes = invokeArgsBytes;
+ _clone.drVers = drVers;
+ _clone.drTtls = drTtls;
+ _clone.drExpireTimes = drExpireTimes;
+ _clone.retval = retval;
+ _clone.expiryPlc = expiryPlc;
+ _clone.expiryPlcBytes = expiryPlcBytes;
+ _clone.filter = filter;
+ _clone.filterBytes = filterBytes;
+ _clone.hasPrimary = hasPrimary;
+ _clone.forceTransformBackups = forceTransformBackups;
+ _clone.subjId = subjId;
+ _clone.taskNameHash = taskNameHash;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 3:
- if (!commState.putLongList(drExpireTimes))
++ if (!commState.putLongList("drExpireTimes", drExpireTimes))
+ return false;
+
+ commState.idx++;
+
+ case 4:
- if (!commState.putLongList(drTtls))
++ if (!commState.putLongList("drTtls", drTtls))
+ return false;
+
+ commState.idx++;
+
+ case 5:
+ if (drVers != null) {
+ if (commState.it == null) {
- if (!commState.putInt(drVers.size()))
++ if (!commState.putInt(null, drVers.size()))
+ return false;
+
+ commState.it = drVers.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putCacheVersion((GridCacheVersion)commState.cur))
++ if (!commState.putCacheVersion(null, (GridCacheVersion)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 6:
- if (!commState.putByteArray(expiryPlcBytes))
++ if (!commState.putByteArray("expiryPlcBytes", expiryPlcBytes))
+ return false;
+
+ commState.idx++;
+
+ case 7:
- if (!commState.putBoolean(fastMap))
++ if (!commState.putBoolean("fastMap", fastMap))
+ return false;
+
+ commState.idx++;
+
+ case 8:
+ if (filterBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(filterBytes.length))
++ if (!commState.putInt(null, filterBytes.length))
+ return false;
+
+ commState.it = arrayIterator(filterBytes);
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putCacheVersion(futVer))
++ if (!commState.putCacheVersion("futVer", futVer))
+ return false;
+
+ commState.idx++;
+
+ case 10:
- if (!commState.putBoolean(hasPrimary))
++ if (!commState.putBoolean("hasPrimary", hasPrimary))
+ return false;
+
+ commState.idx++;
+
+ case 11:
+ if (invokeArgsBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(invokeArgsBytes.length))
++ if (!commState.putInt(null, invokeArgsBytes.length))
+ return false;
+
+ commState.it = arrayIterator(invokeArgsBytes);
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 12:
+ if (keyBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(keyBytes.size()))
++ if (!commState.putInt(null, keyBytes.size()))
+ return false;
+
+ commState.it = keyBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 13:
- if (!commState.putEnum(op))
++ if (!commState.putEnum("op", op))
+ return false;
+
+ commState.idx++;
+
+ case 14:
- if (!commState.putBoolean(retval))
++ if (!commState.putBoolean("retval", retval))
+ return false;
+
+ commState.idx++;
+
+ case 15:
- if (!commState.putEnum(syncMode))
++ if (!commState.putEnum("syncMode", syncMode))
+ return false;
+
+ commState.idx++;
+
+ case 16:
- if (!commState.putLong(topVer))
++ if (!commState.putLong("topVer", topVer))
+ return false;
+
+ commState.idx++;
+
+ case 17:
- if (!commState.putCacheVersion(updateVer))
++ if (!commState.putCacheVersion("updateVer", updateVer))
+ return false;
+
+ commState.idx++;
+
+ case 18:
+ if (valBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(valBytes.size()))
++ if (!commState.putInt(null, valBytes.size()))
+ return false;
+
+ commState.it = valBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putValueBytes((GridCacheValueBytes)commState.cur))
++ if (!commState.putValueBytes(null, (GridCacheValueBytes)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 19:
- if (!commState.putBoolean(forceTransformBackups))
++ if (!commState.putBoolean("forceTransformBackups", forceTransformBackups))
+ return false;
+
+ commState.idx++;
+
+ case 20:
- if (!commState.putUuid(subjId))
++ if (!commState.putUuid("subjId", subjId))
+ return false;
+
+ commState.idx++;
+
+ case 21:
- if (!commState.putInt(taskNameHash))
++ if (!commState.putInt("taskNameHash", taskNameHash))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 3:
- GridLongList drExpireTimes0 = commState.getLongList();
++ drExpireTimes = commState.getLongList("drExpireTimes");
+
- if (drExpireTimes0 == LONG_LIST_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- drExpireTimes = drExpireTimes0;
-
+ commState.idx++;
+
+ case 4:
- GridLongList drTtls0 = commState.getLongList();
++ drTtls = commState.getLongList("drTtls");
+
- if (drTtls0 == LONG_LIST_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- drTtls = drTtls0;
-
+ commState.idx++;
+
+ case 5:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (drVers == null)
+ drVers = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- GridCacheVersion _val = commState.getCacheVersion();
++ GridCacheVersion _val = commState.getCacheVersion(null);
+
- if (_val == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ drVers.add((GridCacheVersion)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 6:
- byte[] expiryPlcBytes0 = commState.getByteArray();
++ byte[] expiryPlcBytes = commState.getByteArray("expiryPlcBytes");
+
- if (expiryPlcBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- expiryPlcBytes = expiryPlcBytes0;
-
+ commState.idx++;
+
+ case 7:
- if (buf.remaining() < 1)
- return false;
++ fastMap = commState.getBoolean("fastMap");
+
- fastMap = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 8:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (filterBytes == null)
+ filterBytes = new byte[commState.readSize][];
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ filterBytes[i] = (byte[])_val;
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 9:
- GridCacheVersion futVer0 = commState.getCacheVersion();
++ futVer = commState.getCacheVersion("futVer");
+
- if (futVer0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futVer = futVer0;
-
+ commState.idx++;
+
+ case 10:
- if (buf.remaining() < 1)
- return false;
++ hasPrimary = commState.getBoolean("hasPrimary");
+
- hasPrimary = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 11:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (invokeArgsBytes == null)
+ invokeArgsBytes = new byte[commState.readSize][];
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
+ byte[] _val = commState.getByteArray();
+
+ if (_val == BYTE_ARR_NOT_READ)
+ return false;
+
+ invokeArgsBytes[i] = (byte[])_val;
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 12:
+ if (commState.readSize == -1) {
+ if (buf.remaining() < 4)
+ return false;
+
+ commState.readSize = commState.getInt();
+ }
+
+ if (commState.readSize >= 0) {
+ if (keyBytes == null)
+ keyBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ keyBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 13:
- if (buf.remaining() < 1)
- return false;
++ byte op0 = commState.getByte("op");
+
- byte op0 = commState.getByte();
++ if (!commState.lastRead())
++ return false;
+
+ op = GridCacheOperation.fromOrdinal(op0);
+
+ commState.idx++;
+
+ case 14:
- if (buf.remaining() < 1)
- return false;
++ retval = commState.getBoolean("retval");
+
- retval = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 15:
- if (buf.remaining() < 1)
- return false;
++ byte syncMode0 = commState.getByte("syncMode");
+
- byte syncMode0 = commState.getByte();
++ if (!commState.lastRead())
++ return false;
+
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncMode0);
+
+ commState.idx++;
+
+ case 16:
- if (buf.remaining() < 8)
- return false;
++ topVer = commState.getLong("topVer");
+
- topVer = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 17:
- GridCacheVersion updateVer0 = commState.getCacheVersion();
++ updateVer = commState.getCacheVersion("updateVer");
+
- if (updateVer0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- updateVer = updateVer0;
-
+ commState.idx++;
+
+ case 18:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (valBytes == null)
+ valBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- GridCacheValueBytes _val = commState.getValueBytes();
++ GridCacheValueBytes _val = commState.getValueBytes(null);
+
- if (_val == VAL_BYTES_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ valBytes.add((GridCacheValueBytes)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 19:
- if (buf.remaining() < 1)
- return false;
++ forceTransformBackups = commState.getBoolean("forceTransformBackups");
+
- forceTransformBackups = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 20:
- UUID subjId0 = commState.getUuid();
++ subjId = commState.getUuid("subjId");
+
- if (subjId0 == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- subjId = subjId0;
-
+ commState.idx++;
+
+ case 21:
- if (buf.remaining() < 4)
- return false;
++ taskNameHash = commState.getInt("taskNameHash");
+
- taskNameHash = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 39;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicUpdateRequest.class, this, "filter", Arrays.toString(filter),
+ "parent", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 0000000,e88de2e..42ee3db
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@@ -1,0 -1,798 +1,782 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+
+ /**
+ * DHT atomic cache near update response.
+ */
+ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> implements GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Node ID this reply should be sent to. */
+ @GridDirectTransient
+ private UUID nodeId;
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Update error. */
+ @GridDirectTransient
+ private volatile IgniteCheckedException err;
+
+ /** Serialized error. */
+ private byte[] errBytes;
+
+ /** Return value. */
+ @GridDirectTransient
+ private GridCacheReturn<Object> retVal;
+
+ /** Serialized return value. */
+ private byte[] retValBytes;
+
+ /** Failed keys. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private volatile Collection<K> failedKeys;
+
+ /** Serialized failed keys. */
+ private byte[] failedKeysBytes;
+
+ /** Keys that should be remapped. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Collection<K> remapKeys;
+
+ /** Serialized keys that should be remapped. */
+ private byte[] remapKeysBytes;
+
+ /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
+ @GridDirectCollection(int.class)
+ @GridDirectVersion(1)
+ private List<Integer> nearValsIdxs;
+
+ /** Indexes of keys for which update was skipped (used if originating node has near cache). */
+ @GridDirectCollection(int.class)
+ @GridDirectVersion(1)
+ private List<Integer> nearSkipIdxs;
+
+ /** Values generated on primary node which should be put to originating node's near cache. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private List<V> nearVals;
+
+ /** Serialized values generated on primary node which should be put to originating node's near cache. */
+ @GridToStringInclude
+ @GridDirectCollection(GridCacheValueBytes.class)
+ @GridDirectVersion(1)
+ private List<GridCacheValueBytes> nearValBytes;
+
+ /** Version generated on primary node to be used for originating node's near cache update. */
+ @GridDirectVersion(1)
+ private GridCacheVersion nearVer;
+
+ /** Near TTLs. */
+ private GridLongList nearTtls;
+
+ /** Near expire times. */
+ private GridLongList nearExpireTimes;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridNearAtomicUpdateResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID this reply should be sent to.
+ * @param futVer Future version.
+ */
+ public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) {
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futVer = futVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /**
+ * @return Node ID this response should be sent to.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Future version.
+ */
+ public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /**
+ * @return Update error, if any.
+ */
+ public Throwable error() {
+ return err;
+ }
+
+ /**
+ * @return Collection of failed keys.
+ */
+ public Collection<K> failedKeys() {
+ return failedKeys;
+ }
+
+ /**
+ * @return Return value.
+ */
+ public GridCacheReturn<Object> returnValue() {
+ return retVal;
+ }
+
+ /**
+ * @param retVal Return value.
+ */
+ public void returnValue(GridCacheReturn<Object> retVal) {
+ this.retVal = retVal;
+ }
+
+ /**
+ * @param remapKeys Remap keys.
+ */
+ public void remapKeys(Collection<K> remapKeys) {
+ this.remapKeys = remapKeys;
+ }
+
+ /**
+ * @return Remap keys.
+ */
+ public Collection<K> remapKeys() {
+ return remapKeys;
+ }
+
+ /**
+ * Adds value to be put in near cache on originating node.
+ *
+ * @param keyIdx Key index.
+ * @param val Value.
+ * @param valBytes Value bytes.
+ * @param ttl TTL for near cache update.
+ * @param expireTime Expire time for near cache update.
+ */
+ public void addNearValue(int keyIdx,
+ @Nullable V val,
+ @Nullable byte[] valBytes,
+ long ttl,
+ long expireTime) {
+ if (nearValsIdxs == null) {
+ nearValsIdxs = new ArrayList<>();
+ nearValBytes = new ArrayList<>();
+ nearVals = new ArrayList<>();
+ }
+
+ addNearTtl(keyIdx, ttl, expireTime);
+
+ nearValsIdxs.add(keyIdx);
+ nearVals.add(val);
+ nearValBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null);
+ }
+
+ /**
+ * @param keyIdx Key index.
+ * @param ttl TTL for near cache update.
+ * @param expireTime Expire time for near cache update.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ public void addNearTtl(int keyIdx, long ttl, long expireTime) {
+ if (ttl >= 0) {
+ if (nearTtls == null) {
+ nearTtls = new GridLongList(16);
+
+ for (int i = 0; i < keyIdx; i++)
+ nearTtls.add(-1L);
+ }
+ }
+
+ if (nearTtls != null)
+ nearTtls.add(ttl);
+
+ if (expireTime >= 0) {
+ if (nearExpireTimes == null) {
+ nearExpireTimes = new GridLongList(16);
+
+ for (int i = 0; i < keyIdx; i++)
+ nearExpireTimes.add(-1);
+ }
+ }
+
+ if (nearExpireTimes != null)
+ nearExpireTimes.add(expireTime);
+ }
+
+ /**
+ * @param idx Index.
+ * @return Expire time for near cache update.
+ */
+ public long nearExpireTime(int idx) {
+ if (nearExpireTimes != null) {
+ assert idx >= 0 && idx < nearExpireTimes.size();
+
+ return nearExpireTimes.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
+ * @param idx Index.
+ * @return TTL for near cache update.
+ */
+ public long nearTtl(int idx) {
+ if (nearTtls != null) {
+ assert idx >= 0 && idx < nearTtls.size();
+
+ return nearTtls.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
+ * @param nearVer Version generated on primary node to be used for originating node's near cache update.
+ */
+ public void nearVersion(GridCacheVersion nearVer) {
+ this.nearVer = nearVer;
+ }
+
+ /**
+ * @return Version generated on primary node to be used for originating node's near cache update.
+ */
+ public GridCacheVersion nearVersion() {
+ return nearVer;
+ }
+
+ /**
+ * @param keyIdx Index of key for which update was skipped
+ */
+ public void addSkippedIndex(int keyIdx) {
+ if (nearSkipIdxs == null)
+ nearSkipIdxs = new ArrayList<>();
+
+ nearSkipIdxs.add(keyIdx);
+
+ addNearTtl(keyIdx, -1L, -1L);
+ }
+
+ /**
+ * @return Indexes of keys for which update was skipped
+ */
+ @Nullable public List<Integer> skippedIndexes() {
+ return nearSkipIdxs;
+ }
+
+ /**
+ * @return Indexes of keys for which values were generated on primary node.
+ */
+ @Nullable public List<Integer> nearValuesIndexes() {
+ return nearValsIdxs;
+ }
+
+ /**
+ * @param idx Index.
+ * @return Value generated on primary node which should be put to originating node's near cache.
+ */
+ @Nullable public V nearValue(int idx) {
+ return nearVals.get(idx);
+ }
+
+ /**
+ * @param idx Index.
+ * @return Serialized value generated on primary node which should be put to originating node's near cache.
+ */
+ @Nullable public byte[] nearValueBytes(int idx) {
+ if (nearValBytes != null) {
+ GridCacheValueBytes valBytes0 = nearValBytes.get(idx);
+
+ if (valBytes0 != null && !valBytes0.isPlain())
+ return valBytes0.get();
+ }
+
+ return null;
+ }
+
+ /**
+ * Adds key to collection of failed keys.
+ *
+ * @param key Key to add.
+ * @param e Error cause.
+ */
+ public synchronized void addFailedKey(K key, Throwable e) {
+ if (failedKeys == null)
+ failedKeys = new ConcurrentLinkedQueue<>();
+
+ failedKeys.add(key);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /**
+ * Adds keys to collection of failed keys.
+ *
+ * @param keys Key to add.
+ * @param e Error cause.
+ */
+ public synchronized void addFailedKeys(Collection<K> keys, Throwable e) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>(keys.size());
+
+ failedKeys.addAll(keys);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (err != null)
+ errBytes = ctx.marshaller().marshal(err);
+
+ if (retVal != null)
+ retValBytes = ctx.marshaller().marshal(retVal);
+
+ if (failedKeys != null)
+ failedKeysBytes = ctx.marshaller().marshal(failedKeys);
+
+ if (remapKeys != null)
+ remapKeysBytes = ctx.marshaller().marshal(remapKeys);
+
+ nearValBytes = marshalValuesCollection(nearVals, ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (errBytes != null)
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
+
+ if (retValBytes != null)
+ retVal = ctx.marshaller().unmarshal(retValBytes, ldr);
+
+ if (failedKeysBytes != null)
+ failedKeys = ctx.marshaller().unmarshal(failedKeysBytes, ldr);
+
+ if (remapKeysBytes != null)
+ remapKeys = ctx.marshaller().unmarshal(remapKeysBytes, ldr);
+
+ nearVals = unmarshalValueBytesCollection(nearValBytes, ctx, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridNearAtomicUpdateResponse _clone = new GridNearAtomicUpdateResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridNearAtomicUpdateResponse _clone = (GridNearAtomicUpdateResponse)_msg;
+
+ _clone.nodeId = nodeId;
+ _clone.futVer = futVer;
+ _clone.err = err;
+ _clone.errBytes = errBytes;
+ _clone.retVal = retVal;
+ _clone.retValBytes = retValBytes;
+ _clone.failedKeys = failedKeys;
+ _clone.failedKeysBytes = failedKeysBytes;
+ _clone.remapKeys = remapKeys;
+ _clone.remapKeysBytes = remapKeysBytes;
+ _clone.nearValsIdxs = nearValsIdxs;
+ _clone.nearSkipIdxs = nearSkipIdxs;
+ _clone.nearVals = nearVals;
+ _clone.nearValBytes = nearValBytes;
+ _clone.nearVer = nearVer;
+ _clone.nearTtls = nearTtls;
+ _clone.nearExpireTimes = nearExpireTimes;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("fallthrough")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 3:
- if (!commState.putByteArray(errBytes))
++ if (!commState.putByteArray("errBytes", errBytes))
+ return false;
+
+ commState.idx++;
+
+ case 4:
- if (!commState.putByteArray(failedKeysBytes))
++ if (!commState.putByteArray("failedKeysBytes", failedKeysBytes))
+ return false;
+
+ commState.idx++;
+
+ case 5:
- if (!commState.putCacheVersion(futVer))
++ if (!commState.putCacheVersion("futVer", futVer))
+ return false;
+
+ commState.idx++;
+
+ case 6:
- if (!commState.putByteArray(remapKeysBytes))
++ if (!commState.putByteArray("remapKeysBytes", remapKeysBytes))
+ return false;
+
+ commState.idx++;
+
+ case 7:
- if (!commState.putByteArray(retValBytes))
++ if (!commState.putByteArray("retValBytes", retValBytes))
+ return false;
+
+ commState.idx++;
+
+ case 8:
+ if (nearSkipIdxs != null) {
+ if (commState.it == null) {
- if (!commState.putInt(nearSkipIdxs.size()))
++ if (!commState.putInt(null, nearSkipIdxs.size()))
+ return false;
+
+ commState.it = nearSkipIdxs.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putInt((int)commState.cur))
++ if (!commState.putInt(null, (int)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 9:
+ if (nearValBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(nearValBytes.size()))
++ if (!commState.putInt(null, nearValBytes.size()))
+ return false;
+
+ commState.it = nearValBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putValueBytes((GridCacheValueBytes)commState.cur))
++ if (!commState.putValueBytes(null, (GridCacheValueBytes)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 10:
+ if (nearValsIdxs != null) {
+ if (commState.it == null) {
- if (!commState.putInt(nearValsIdxs.size()))
++ if (!commState.putInt(null, nearValsIdxs.size()))
+ return false;
+
+ commState.it = nearValsIdxs.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putInt((int)commState.cur))
++ if (!commState.putInt(null, (int)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 11:
- if (!commState.putCacheVersion(nearVer))
++ if (!commState.putCacheVersion("nearVer", nearVer))
+ return false;
+
+ commState.idx++;
+
+ case 12:
- if (!commState.putLongList(nearExpireTimes))
++ if (!commState.putLongList("nearExpireTimes", nearExpireTimes))
+ return false;
+
+ commState.idx++;
+
+ case 13:
- if (!commState.putLongList(nearTtls))
++ if (!commState.putLongList("nearTtls", nearTtls))
+ return false;
+
+ commState.idx++;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("fallthrough")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 3:
- byte[] errBytes0 = commState.getByteArray();
++ errBytes = commState.getByteArray("errBytes");
+
- if (errBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- errBytes = errBytes0;
-
+ commState.idx++;
+
+ case 4:
- byte[] failedKeysBytes0 = commState.getByteArray();
++ failedKeysBytes = commState.getByteArray("failedKeysBytes");
+
- if (failedKeysBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- failedKeysBytes = failedKeysBytes0;
-
+ commState.idx++;
+
+ case 5:
- GridCacheVersion futVer0 = commState.getCacheVersion();
++ futVer = commState.getCacheVersion("futVer");
+
- if (futVer0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futVer = futVer0;
-
+ commState.idx++;
+
+ case 6:
- byte[] remapKeysBytes0 = commState.getByteArray();
++ remapKeysBytes = commState.getByteArray("remapKeysBytes");
+
- if (remapKeysBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- remapKeysBytes = remapKeysBytes0;
-
+ commState.idx++;
+
+ case 7:
- byte[] retValBytes0 = commState.getByteArray();
++ retValBytes = commState.getByteArray("retValBytes");
+
- if (retValBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- retValBytes = retValBytes0;
-
+ commState.idx++;
+
+ case 8:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (nearSkipIdxs == null)
+ nearSkipIdxs = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- if (buf.remaining() < 4)
- return false;
++ int _val = commState.getInt(null);
+
- int _val = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ nearSkipIdxs.add((Integer)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 9:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (nearValBytes == null)
+ nearValBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- GridCacheValueBytes _val = commState.getValueBytes();
++ GridCacheValueBytes _val = commState.getValueBytes(null);
+
- if (_val == VAL_BYTES_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ nearValBytes.add((GridCacheValueBytes)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 10:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (nearValsIdxs == null)
+ nearValsIdxs = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- if (buf.remaining() < 4)
- return false;
++ int _val = commState.getInt(null);
+
- int _val = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ nearValsIdxs.add((Integer)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 11:
- GridCacheVersion nearVer0 = commState.getCacheVersion();
++ nearVer = commState.getCacheVersion("nearVer");
+
- if (nearVer0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- nearVer = nearVer0;
-
+ commState.idx++;
+
+ case 12:
- GridLongList nearExpireTimes0 = commState.getLongList();
++ nearExpireTimes = commState.getLongList("nearExpireTimes");
+
- if (nearExpireTimes0 == LONG_LIST_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- nearExpireTimes = nearExpireTimes0;
-
+ commState.idx++;
+
+ case 13:
- GridLongList nearTtls0 = commState.getLongList();
++ nearTtls = commState.getLongList("nearTtls");
+
- if (nearTtls0 == LONG_LIST_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- nearTtls = nearTtls0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 40;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicUpdateResponse.class, this, "parent");
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
index 0000000,e7fde44..1d94c2e
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
@@@ -1,0 -1,333 +1,329 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Force keys request. This message is sent by node while preloading to force
+ * another node to put given keys into the next batch of transmitting entries.
+ */
+ public class GridDhtForceKeysRequest<K, V> extends GridCacheMessage<K, V> implements GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini-future ID. */
+ private IgniteUuid miniId;
+
+ /** Serialized keys. */
+ @GridDirectCollection(byte[].class)
+ private Collection<byte[]> keyBytes;
+
+ /** Keys to request. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Collection<K> keys;
+
+ /** Topology version for which keys are requested. */
+ private long topVer;
+
+ /**
+ * @param cacheId Cache ID.
+ * @param futId Future ID.
+ * @param miniId Mini-future ID.
+ * @param keys Keys.
+ * @param topVer Topology version.
+ */
+ GridDhtForceKeysRequest(
+ int cacheId,
+ IgniteUuid futId,
+ IgniteUuid miniId,
+ Collection<K> keys,
+ long topVer
+ ) {
+ assert futId != null;
+ assert miniId != null;
+ assert !F.isEmpty(keys);
+
+ this.cacheId = cacheId;
+ this.futId = futId;
+ this.miniId = miniId;
+ this.keys = keys;
+ this.topVer = topVer;
+ }
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public GridDhtForceKeysRequest() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allowForStartup() {
+ return true;
+ }
+
+ /**
+ * @param keys Collection of keys.
+ */
+ public GridDhtForceKeysRequest(Collection<K> keys) {
+ assert !F.isEmpty(keys);
+
+ this.keys = keys;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini-future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return Collection of serialized keys.
+ */
+ public Collection<byte[]> keyBytes() {
+ return keyBytes;
+ }
+
+ /**
+ * @return Keys.
+ */
+ public Collection<K> keys() {
+ return keys;
+ }
+
+ /**
+ * @return Topology version for which keys are requested.
+ */
+ @Override public long topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (keyBytes == null)
+ keyBytes = marshalCollection(keys, ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (keys == null)
+ keys = unmarshalCollection(keyBytes, ctx, ldr);
+ }
+
+ /**
+ * @return Key count.
+ */
+ private int keyCount() {
+ return keyBytes == null ? keys.size() : keyBytes.size();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridDhtForceKeysRequest _clone = new GridDhtForceKeysRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDhtForceKeysRequest _clone = (GridDhtForceKeysRequest)_msg;
+
+ _clone.futId = futId;
+ _clone.miniId = miniId;
+ _clone.keyBytes = keyBytes;
+ _clone.keys = keys;
+ _clone.topVer = topVer;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 3:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid("futId", futId))
+ return false;
+
+ commState.idx++;
+
+ case 4:
+ if (keyBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(keyBytes.size()))
++ if (!commState.putInt(null, keyBytes.size()))
+ return false;
+
+ commState.it = keyBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 5:
- if (!commState.putGridUuid(miniId))
++ if (!commState.putGridUuid("miniId", miniId))
+ return false;
+
+ commState.idx++;
+
+ case 6:
- if (!commState.putLong(topVer))
++ if (!commState.putLong("topVer", topVer))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 3:
- IgniteUuid futId0 = commState.getGridUuid();
++ futId = commState.getGridUuid("futId");
+
- if (futId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futId = futId0;
-
+ commState.idx++;
+
+ case 4:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (keyBytes == null)
+ keyBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ keyBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 5:
- IgniteUuid miniId0 = commState.getGridUuid();
++ miniId = commState.getGridUuid("miniId");
+
- if (miniId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- miniId = miniId0;
-
+ commState.idx++;
+
+ case 6:
- if (buf.remaining() < 8)
- return false;
++ topVer = commState.getLong("topVer");
+
- topVer = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 41;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtForceKeysRequest.class, this, "keyCnt", keyCount(), "super", super.toString());
+ }
+ }