You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/25 13:31:42 UTC
[43/51] [abbrv] ignite git commit: Review.
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
new file mode 100644
index 0000000..74cdc6e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * DHT atomic cache backup update response.
+ */
+public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridDhtAtomicUpdateResponseInterface {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Failed keys. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> failedKeys;
+
+ /** Update error. */
+ @GridDirectTransient
+ private IgniteCheckedException err;
+
+ /** Serialized update error. */
+ private byte[] errBytes;
+
+ /** Evicted readers. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> nearEvicted;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridDhtAtomicUpdateResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param futVer Future version.
+ * @param addDepInfo Deployment info.
+ */
+ public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer, boolean addDepInfo) {
+ this.cacheId = cacheId;
+ this.futVer = futVer;
+ this.addDepInfo = addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /**
+ * @return Future version.
+ */
+ @Override public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /**
+ * Sets update error.
+ *
+ * @param err Error.
+ */
+ @Override public void onError(IgniteCheckedException err) {
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
+ return err;
+ }
+
+ /**
+ * @return Failed keys.
+ */
+ @Override public Collection<KeyCacheObject> failedKeys() {
+ return failedKeys;
+ }
+
+ /**
+ * Adds key to collection of failed keys.
+ *
+ * @param key Key to add.
+ * @param e Error cause.
+ */
+ @Override public void addFailedKey(KeyCacheObject key, Throwable e) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>();
+
+ failedKeys.add(key);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /**
+ * @return Evicted readers.
+ */
+ @Override public Collection<KeyCacheObject> nearEvicted() {
+ return nearEvicted;
+ }
+
+ /**
+ * Adds near evicted key..
+ *
+ * @param key Evicted key.
+ */
+ @Override public void addNearEvicted(KeyCacheObject key) {
+ if (nearEvicted == null)
+ nearEvicted = new ArrayList<>();
+
+ nearEvicted.add(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalCacheObjects(failedKeys, cctx);
+
+ prepareMarshalCacheObjects(nearEvicted, cctx);
+
+ if (errBytes == null)
+ errBytes = ctx.marshaller().marshal(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+
+ finishUnmarshalCacheObjects(nearEvicted, cctx, ldr);
+
+ if (errBytes != null && err == null)
+ err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeMessage("futVer", futVer))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ futVer = reader.readMessage("futVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtAtomicUpdateResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 39;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 7;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAtomicUpdateResponse.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
deleted file mode 100644
index d1c3654..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
+++ /dev/null
@@ -1,989 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
-
-/**
- * Lite DHT cache update request sent from near node to primary node.
- */
-public class GridNearAtomicMultipleUpdateRequest extends GridCacheMessage
- implements GridNearAtomicUpdateRequestInterface, GridCacheDeployable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Message index. */
- public static final int CACHE_MSG_IDX = nextIndexId();
-
- /** Target node ID. */
- @GridDirectTransient
- private UUID nodeId;
-
- /** Future version. */
- private GridCacheVersion futVer;
-
- /** Fast map flag. */
- private boolean fastMap;
-
- /** Update version. Set to non-null if fastMap is {@code true}. */
- private GridCacheVersion updateVer;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
- private boolean topLocked;
-
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
-
- /** Update operation. */
- private GridCacheOperation op;
-
- /** Keys to update. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> keys;
-
- /** Values to update. */
- @GridDirectCollection(CacheObject.class)
- private List<CacheObject> vals;
-
- /** Entry processors. */
- @GridDirectTransient
- private List<EntryProcessor<Object, Object, Object>> entryProcessors;
-
- /** Entry processors bytes. */
- @GridDirectCollection(byte[].class)
- private List<byte[]> entryProcessorsBytes;
-
- /** Optional arguments for entry processor. */
- @GridDirectTransient
- private Object[] invokeArgs;
-
- /** Entry processor arguments bytes. */
- private byte[][] invokeArgsBytes;
-
- /** Conflict versions. */
- @GridDirectCollection(GridCacheVersion.class)
- private List<GridCacheVersion> conflictVers;
-
- /** Conflict TTLs. */
- private GridLongList conflictTtls;
-
- /** Conflict expire times. */
- private GridLongList conflictExpireTimes;
-
- /** Return value flag. */
- private boolean retval;
-
- /** Expiry policy. */
- @GridDirectTransient
- private ExpiryPolicy expiryPlc;
-
- /** Expiry policy bytes. */
- private byte[] expiryPlcBytes;
-
- /** Filter. */
- private CacheEntryPredicate[] filter;
-
- /** Flag indicating whether request contains primary keys. */
- private boolean hasPrimary;
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** Skip write-through to a persistent storage. */
- private boolean skipStore;
-
- /** */
- private boolean clientReq;
-
- /** Keep binary flag. */
- private boolean keepBinary;
-
- /** */
- @GridDirectTransient
- private GridNearAtomicUpdateResponseInterface res;
-
- /** Maximum possible size of inner collections. */
- @GridDirectTransient
- private int initSize;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public GridNearAtomicMultipleUpdateRequest() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param cacheId Cache ID.
- * @param nodeId Node ID.
- * @param futVer Future version.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
- * @param topVer Topology version.
- * @param topLocked Topology locked flag.
- * @param syncMode Synchronization mode.
- * @param op Cache update operation.
- * @param retval Return value required flag.
- * @param expiryPlc Expiry policy.
- * @param invokeArgs Optional arguments for entry processor.
- * @param filter Optional filter for atomic check.
- * @param subjId Subject ID.
- * @param taskNameHash Task name hash code.
- * @param skipStore Skip write-through to a persistent storage.
- * @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
- * @param addDepInfo Deployment info flag.
- * @param maxEntryCnt Maximum entries count.
- */
- public GridNearAtomicMultipleUpdateRequest(
- int cacheId,
- UUID nodeId,
- GridCacheVersion futVer,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
- @NotNull AffinityTopologyVersion topVer,
- boolean topLocked,
- CacheWriteSynchronizationMode syncMode,
- GridCacheOperation op,
- boolean retval,
- @Nullable ExpiryPolicy expiryPlc,
- @Nullable Object[] invokeArgs,
- @Nullable CacheEntryPredicate[] filter,
- @Nullable UUID subjId,
- int taskNameHash,
- boolean skipStore,
- boolean keepBinary,
- boolean clientReq,
- boolean addDepInfo,
- int maxEntryCnt
- ) {
- assert futVer != null;
-
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futVer = futVer;
- this.fastMap = fastMap;
- this.updateVer = updateVer;
-
- this.topVer = topVer;
- this.topLocked = topLocked;
- this.syncMode = syncMode;
- this.op = op;
- this.retval = retval;
- this.expiryPlc = expiryPlc;
- this.invokeArgs = invokeArgs;
- this.filter = filter;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.skipStore = skipStore;
- this.keepBinary = keepBinary;
- this.clientReq = clientReq;
- this.addDepInfo = addDepInfo;
-
- // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
- // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
- // participate in request. As such, we know upper bound of all collections in request. If this bound is lower
- // than 10, we use it.
- initSize = Math.min(maxEntryCnt, 10);
-
- keys = new ArrayList<>(initSize);
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /** {@inheritDoc} */
- @Override public UUID nodeId() {
- return nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public UUID subjectId() {
- return subjId;
- }
-
- /** {@inheritDoc} */
- @Override public int taskNameHash() {
- return taskNameHash;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /** {@inheritDoc} */
- @Override public boolean fastMap() {
- return fastMap;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion updateVersion() {
- return updateVer;
- }
-
- /** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /** {@inheritDoc} */
- @Override public boolean topologyLocked() {
- return topLocked;
- }
-
- /** {@inheritDoc} */
- @Override public boolean clientRequest() {
- return clientReq;
- }
-
- /** {@inheritDoc} */
- @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
- }
-
- /** {@inheritDoc} */
- @Override public ExpiryPolicy expiry() {
- return expiryPlc;
- }
-
- /** {@inheritDoc} */
- @Override public boolean returnValue() {
- return retval;
- }
-
- /** {@inheritDoc} */
- @Nullable public CacheEntryPredicate[] filter() {
- return filter;
- }
-
- /** {@inheritDoc} */
- @Override public boolean skipStore() {
- return skipStore;
- }
-
- /** {@inheritDoc} */
- @Override public boolean keepBinary() {
- return keepBinary;
- }
-
- /**
- * @param key Key to add.
- * @param val Optional update value.
- * @param conflictTtl Conflict TTL (optional).
- * @param conflictExpireTime Conflict expire time (optional).
- * @param conflictVer Conflict version (optional).
- * @param primary If given key is primary on this mapping.
- */
- @SuppressWarnings("unchecked")
- public void addUpdateEntry(KeyCacheObject key,
- @Nullable Object val,
- long conflictTtl,
- long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary) {
- EntryProcessor<Object, Object, Object> entryProcessor = null;
-
- if (op == TRANSFORM) {
- assert val instanceof EntryProcessor : val;
-
- entryProcessor = (EntryProcessor<Object, Object, Object>)val;
- }
-
- assert val != null || op == DELETE;
-
- keys.add(key);
-
- if (entryProcessor != null) {
- if (entryProcessors == null)
- entryProcessors = new ArrayList<>(initSize);
-
- entryProcessors.add(entryProcessor);
- }
- else if (val != null) {
- assert val instanceof CacheObject : val;
-
- if (vals == null)
- vals = new ArrayList<>(initSize);
-
- vals.add((CacheObject)val);
- }
-
- hasPrimary |= primary;
-
- // In case there is no conflict, do not create the list.
- if (conflictVer != null) {
- if (conflictVers == null) {
- conflictVers = new ArrayList<>(initSize);
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictVers.add(null);
- }
-
- conflictVers.add(conflictVer);
- }
- else if (conflictVers != null)
- conflictVers.add(null);
-
- if (conflictTtl >= 0) {
- if (conflictTtls == null) {
- conflictTtls = new GridLongList(keys.size());
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictTtls.add(CU.TTL_NOT_CHANGED);
- }
-
- conflictTtls.add(conflictTtl);
- }
-
- if (conflictExpireTime >= 0) {
- if (conflictExpireTimes == null) {
- conflictExpireTimes = new GridLongList(keys.size());
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
- }
-
- conflictExpireTimes.add(conflictExpireTime);
- }
- }
-
- /** {@inheritDoc} */
- @Override public List<KeyCacheObject> keys() {
- return keys;
- }
-
- /** {@inheritDoc} */
- @Override public List<?> values() {
- return op == TRANSFORM ? entryProcessors : vals;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheOperation operation() {
- return op;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public Object[] invokeArguments() {
- return invokeArgs;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public CacheObject value(int idx) {
- assert op == UPDATE : op;
-
- return vals.get(idx);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
- assert op == TRANSFORM : op;
-
- return entryProcessors.get(idx);
- }
-
- /** {@inheritDoc} */
- @Override public CacheObject writeValue(int idx) {
- if (vals != null)
- return vals.get(idx);
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public List<GridCacheVersion> conflictVersions() {
- return conflictVers;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
- if (conflictVers != null) {
- assert idx >= 0 && idx < conflictVers.size();
-
- return conflictVers.get(idx);
- }
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public long conflictTtl(int idx) {
- if (conflictTtls != null) {
- assert idx >= 0 && idx < conflictTtls.size();
-
- return conflictTtls.get(idx);
- }
-
- return CU.TTL_NOT_CHANGED;
- }
-
- /** {@inheritDoc} */
- @Override public long conflictExpireTime(int idx) {
- if (conflictExpireTimes != null) {
- assert idx >= 0 && idx < conflictExpireTimes.size();
-
- return conflictExpireTimes.get(idx);
- }
-
- return CU.EXPIRE_TIME_CALCULATE;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasPrimary() {
- return hasPrimary;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onResponse(GridNearAtomicUpdateResponseInterface res) {
- if (this.res == null) {
- this.res = res;
-
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public GridNearAtomicUpdateResponseInterface response() {
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- prepareMarshalCacheObjects(keys, cctx);
-
- if (filter != null) {
- boolean hasFilter = false;
-
- for (CacheEntryPredicate p : filter) {
- if (p != null) {
- hasFilter = true;
-
- p.prepareMarshal(cctx);
- }
- }
-
- if (!hasFilter)
- filter = null;
- }
-
- if (expiryPlc != null && expiryPlcBytes == null)
- expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
-
- if (op == TRANSFORM) {
- // force addition of deployment info for entry processors if P2P is enabled globally.
- if (!addDepInfo && ctx.deploymentEnabled())
- addDepInfo = true;
-
- if (entryProcessorsBytes == null)
- entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
-
- if (invokeArgsBytes == null)
- invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
- }
- else
- prepareMarshalCacheObjects(vals, cctx);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- finishUnmarshalCacheObjects(keys, cctx, ldr);
-
- if (op == TRANSFORM) {
- if (entryProcessors == null)
- entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
-
- if (invokeArgs == null)
- invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
- }
- else
- finishUnmarshalCacheObjects(vals, cctx, ldr);
-
- if (filter != null) {
- for (CacheEntryPredicate p : filter) {
- if (p != null)
- p.finishUnmarshal(cctx, ldr);
- }
- }
-
- if (expiryPlcBytes != null && expiryPlc == null)
- expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeBoolean("clientReq", clientReq))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage("conflictTtls", conflictTtls))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeBoolean("fastMap", fastMap))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMessage("futVer", futVer))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeBoolean("hasPrimary", hasPrimary))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeBoolean("keepBinary", keepBinary))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 17:
- if (!writer.writeBoolean("retval", retval))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeBoolean("skipStore", skipStore))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 21:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeBoolean("topLocked", topLocked))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeMessage("updateVer", updateVer))
- return false;
-
- writer.incrementState();
-
- case 25:
- if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- clientReq = reader.readBoolean("clientReq");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- conflictExpireTimes = reader.readMessage("conflictExpireTimes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- conflictTtls = reader.readMessage("conflictTtls");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- fastMap = reader.readBoolean("fastMap");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- futVer = reader.readMessage("futVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- hasPrimary = reader.readBoolean("hasPrimary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- keepBinary = reader.readBoolean("keepBinary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- byte opOrd;
-
- opOrd = reader.readByte("op");
-
- if (!reader.isLastRead())
- return false;
-
- op = GridCacheOperation.fromOrdinal(opOrd);
-
- reader.incrementState();
-
- case 17:
- retval = reader.readBoolean("retval");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- skipStore = reader.readBoolean("skipStore");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 21:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- topLocked = reader.readBoolean("topLocked");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 23:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- updateVer = reader.readMessage("updateVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 25:
- vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearAtomicMultipleUpdateRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public void cleanup(boolean clearKeys) {
- vals = null;
- entryProcessors = null;
- entryProcessorsBytes = null;
- invokeArgs = null;
- invokeArgsBytes = null;
-
- if (clearKeys)
- keys = null;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 40;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 26;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearAtomicMultipleUpdateRequest.class, this, "filter", Arrays.toString(filter),
- "parent", super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java
deleted file mode 100644
index 24e7c3a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java
+++ /dev/null
@@ -1,642 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * DHT atomic cache near update response.
- */
-public class GridNearAtomicMultipleUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridNearAtomicUpdateResponseInterface {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Cache message index. */
- public static final int CACHE_MSG_IDX = nextIndexId();
-
- /** Node ID this reply should be sent to. */
- @GridDirectTransient
- private UUID nodeId;
-
- /** Future version. */
- private GridCacheVersion futVer;
-
- /** Update error. */
- @GridDirectTransient
- private volatile IgniteCheckedException err;
-
- /** Serialized error. */
- private byte[] errBytes;
-
- /** Return value. */
- @GridToStringInclude
- private GridCacheReturn ret;
-
- /** Failed keys. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private volatile Collection<KeyCacheObject> failedKeys;
-
- /** Keys that should be remapped. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> remapKeys;
-
- /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
- @GridDirectCollection(int.class)
- private List<Integer> nearValsIdxs;
-
- /** Indexes of keys for which update was skipped (used if originating node has near cache). */
- @GridDirectCollection(int.class)
- private List<Integer> nearSkipIdxs;
-
- /** Values generated on primary node which should be put to originating node's near cache. */
- @GridToStringInclude
- @GridDirectCollection(CacheObject.class)
- private List<CacheObject> nearVals;
-
- /** Version generated on primary node to be used for originating node's near cache update. */
- private GridCacheVersion nearVer;
-
- /** Near TTLs. */
- private GridLongList nearTtls;
-
- /** Near expire times. */
- private GridLongList nearExpireTimes;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public GridNearAtomicMultipleUpdateResponse() {
- // No-op.
- }
-
- /**
- * @param cacheId Cache ID.
- * @param nodeId Node ID this reply should be sent to.
- * @param futVer Future version.
- * @param addDepInfo Deployment info flag.
- */
- public GridNearAtomicMultipleUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
- assert futVer != null;
-
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futVer = futVer;
- this.addDepInfo = addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /**
- * @return Node ID this response should be sent to.
- */
- @Override public UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @param nodeId Node ID.
- */
- @Override public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /**
- * @return Future version.
- */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /**
- * Sets update error.
- *
- * @param err Error.
- */
- @Override public void error(IgniteCheckedException err) {
- this.err = err;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteCheckedException error() {
- return err;
- }
-
- /**
- * @return Collection of failed keys.
- */
- @Override public Collection<KeyCacheObject> failedKeys() {
- return failedKeys;
- }
-
- /**
- * @return Return value.
- */
- @Override public GridCacheReturn returnValue() {
- return ret;
- }
-
- /**
- * @param ret Return value.
- */
- @Override @SuppressWarnings("unchecked")
- public void returnValue(GridCacheReturn ret) {
- this.ret = ret;
- }
-
- /**
- * @param remapKeys Remap keys.
- */
- @Override public void remapKeys(List<KeyCacheObject> remapKeys) {
- this.remapKeys = remapKeys;
- }
-
- /**
- * @return Remap keys.
- */
- @Override public Collection<KeyCacheObject> remapKeys() {
- return remapKeys;
- }
-
- /**
- * Adds value to be put in near cache on originating node.
- *
- * @param keyIdx Key index.
- * @param val Value.
- * @param ttl TTL for near cache update.
- * @param expireTime Expire time for near cache update.
- */
- @Override public void addNearValue(int keyIdx,
- @Nullable CacheObject val,
- long ttl,
- long expireTime) {
- if (nearValsIdxs == null) {
- nearValsIdxs = new ArrayList<>();
- nearVals = new ArrayList<>();
- }
-
- addNearTtl(keyIdx, ttl, expireTime);
-
- nearValsIdxs.add(keyIdx);
- nearVals.add(val);
- }
-
- /**
- * @param keyIdx Key index.
- * @param ttl TTL for near cache update.
- * @param expireTime Expire time for near cache update.
- */
- @Override @SuppressWarnings("ForLoopReplaceableByForEach")
- public void addNearTtl(int keyIdx, long ttl, long expireTime) {
- if (ttl >= 0) {
- if (nearTtls == null) {
- nearTtls = new GridLongList(16);
-
- for (int i = 0; i < keyIdx; i++)
- nearTtls.add(-1L);
- }
- }
-
- if (nearTtls != null)
- nearTtls.add(ttl);
-
- if (expireTime >= 0) {
- if (nearExpireTimes == null) {
- nearExpireTimes = new GridLongList(16);
-
- for (int i = 0; i < keyIdx; i++)
- nearExpireTimes.add(-1);
- }
- }
-
- if (nearExpireTimes != null)
- nearExpireTimes.add(expireTime);
- }
-
- /**
- * @param idx Index.
- * @return Expire time for near cache update.
- */
- @Override public long nearExpireTime(int idx) {
- if (nearExpireTimes != null) {
- assert idx >= 0 && idx < nearExpireTimes.size();
-
- return nearExpireTimes.get(idx);
- }
-
- return -1L;
- }
-
- /**
- * @param idx Index.
- * @return TTL for near cache update.
- */
- @Override public long nearTtl(int idx) {
- if (nearTtls != null) {
- assert idx >= 0 && idx < nearTtls.size();
-
- return nearTtls.get(idx);
- }
-
- return -1L;
- }
-
- /**
- * @param nearVer Version generated on primary node to be used for originating node's near cache update.
- */
- @Override public void nearVersion(GridCacheVersion nearVer) {
- this.nearVer = nearVer;
- }
-
- /**
- * @return Version generated on primary node to be used for originating node's near cache update.
- */
- @Override public GridCacheVersion nearVersion() {
- return nearVer;
- }
-
- /**
- * @param keyIdx Index of key for which update was skipped
- */
- @Override public void addSkippedIndex(int keyIdx) {
- if (nearSkipIdxs == null)
- nearSkipIdxs = new ArrayList<>();
-
- nearSkipIdxs.add(keyIdx);
-
- addNearTtl(keyIdx, -1L, -1L);
- }
-
- /**
- * @return Indexes of keys for which update was skipped
- */
- @Override @Nullable public List<Integer> skippedIndexes() {
- return nearSkipIdxs;
- }
-
- /**
- * @return Indexes of keys for which values were generated on primary node.
- */
- @Override @Nullable public List<Integer> nearValuesIndexes() {
- return nearValsIdxs;
- }
-
- /**
- * @param idx Index.
- * @return Value generated on primary node which should be put to originating node's near cache.
- */
- @Override @Nullable public CacheObject nearValue(int idx) {
- return nearVals.get(idx);
- }
-
- /**
- * Adds key to collection of failed keys.
- *
- * @param key Key to add.
- * @param e Error cause.
- */
- @Override public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
- if (failedKeys == null)
- failedKeys = new ConcurrentLinkedQueue<>();
-
- failedKeys.add(key);
-
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
-
- err.addSuppressed(e);
- }
-
- /**
- * Adds keys to collection of failed keys.
- *
- * @param keys Key to add.
- * @param e Error cause.
- */
- @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
- if (keys != null) {
- if (failedKeys == null)
- failedKeys = new ArrayList<>(keys.size());
-
- failedKeys.addAll(keys);
- }
-
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
-
- err.addSuppressed(e);
- }
-
- /**
- * Adds keys to collection of failed keys.
- *
- * @param keys Key to add.
- * @param e Error cause.
- * @param ctx Context.
- */
- @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e,
- GridCacheContext ctx) {
- if (failedKeys == null)
- failedKeys = new ArrayList<>(keys.size());
-
- failedKeys.addAll(keys);
-
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
-
- err.addSuppressed(e);
- }
-
- /** {@inheritDoc}
- * @param ctx*/
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- if (err != null && errBytes == null)
- errBytes = ctx.marshaller().marshal(err);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- prepareMarshalCacheObjects(failedKeys, cctx);
-
- prepareMarshalCacheObjects(remapKeys, cctx);
-
- prepareMarshalCacheObjects(nearVals, cctx);
-
- if (ret != null)
- ret.prepareMarshal(cctx);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- if (errBytes != null && err == null)
- err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
-
- finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
-
- finishUnmarshalCacheObjects(nearVals, cctx, ldr);
-
- if (ret != null)
- ret.finishUnmarshal(cctx, ldr);
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeByteArray("errBytes", errBytes))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage("futVer", futVer))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeMessage("nearTtls", nearTtls))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMessage("nearVer", nearVer))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeMessage("ret", ret))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- errBytes = reader.readByteArray("errBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- futVer = reader.readMessage("futVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- nearExpireTimes = reader.readMessage("nearExpireTimes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- nearTtls = reader.readMessage("nearTtls");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- nearVer = reader.readMessage("nearVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- ret = reader.readMessage("ret");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearAtomicMultipleUpdateResponse.class);
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 41;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 14;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearAtomicMultipleUpdateResponse.class, this, "parent");
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index de89d91..7e6dc20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -482,13 +482,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
*
* @param mappings Mappings to send.
*/
- private void doUpdate(Map<UUID, GridNearAtomicMultipleUpdateRequest> mappings) {
+ private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
UUID locNodeId = cctx.localNodeId();
- GridNearAtomicMultipleUpdateRequest locUpdate = null;
+ GridNearAtomicUpdateRequest locUpdate = null;
// Send messages to remote nodes first, then run local update.
- for (GridNearAtomicMultipleUpdateRequest req : mappings.values()) {
+ for (GridNearAtomicUpdateRequest req : mappings.values()) {
if (locNodeId.equals(req.nodeId())) {
assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
", req=" + req + ']';
@@ -537,7 +537,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
/** Mappings if operations is mapped to more than one node. */
@GridToStringInclude
- private Map<UUID, GridNearAtomicMultipleUpdateRequest> mappings;
+ private Map<UUID, GridNearAtomicUpdateRequest> mappings;
/** */
private int resCnt;
@@ -588,7 +588,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
req.futureVersion(),
cctx.deploymentEnabled());
else
- res = new GridNearAtomicMultipleUpdateResponse(cctx.cacheId(),
+ res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
nodeId,
req.futureVersion(),
cctx.deploymentEnabled());
@@ -743,7 +743,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (rcvAll && nearEnabled) {
if (mappings != null) {
- for (GridNearAtomicMultipleUpdateRequest req0 : mappings.values()) {
+ for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
GridNearAtomicUpdateResponseInterface res0 = req0.response();
assert res0 != null : req0;
@@ -827,7 +827,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
req.futureVersion(),
cctx.deploymentEnabled());
else
- res = new GridNearAtomicMultipleUpdateResponse(cctx.cacheId(),
+ res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
req.nodeId(),
req.futureVersion(),
cctx.deploymentEnabled());
@@ -854,7 +854,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
Exception err = null;
GridNearAtomicUpdateRequestInterface singleReq0 = null;
- Map<UUID, GridNearAtomicMultipleUpdateRequest> mappings0 = null;
+ Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
int size = keys.size();
@@ -883,7 +883,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
singleReq0 = mapSingleUpdate(topVer, topNodes, futVer, updVer);
}
else {
- Map<UUID, GridNearAtomicMultipleUpdateRequest> pendingMappings = mapUpdate(topNodes,
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
topVer,
futVer,
updVer,
@@ -895,7 +895,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (syncMode == PRIMARY_SYNC) {
mappings0 = U.newHashMap(pendingMappings.size());
- for (GridNearAtomicMultipleUpdateRequest req : pendingMappings.values()) {
+ for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
if (req.hasPrimary())
mappings0.put(req.nodeId(), req);
}
@@ -1006,7 +1006,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
* @throws Exception If failed.
*/
@SuppressWarnings("ConstantConditions")
- private Map<UUID, GridNearAtomicMultipleUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+ private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
AffinityTopologyVersion topVer,
GridCacheVersion futVer,
@Nullable GridCacheVersion updVer,
@@ -1026,7 +1026,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (conflictRmvVals != null)
conflictRmvValsIt = conflictRmvVals.iterator();
- Map<UUID, GridNearAtomicMultipleUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
// Create mappings first, then send messages.
for (Object key : keys) {
@@ -1094,10 +1094,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
UUID nodeId = affNode.id();
- GridNearAtomicMultipleUpdateRequest mapped = pendingMappings.get(nodeId);
+ GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
if (mapped == null) {
- mapped = new GridNearAtomicMultipleUpdateRequest(
+ mapped = new GridNearAtomicUpdateRequest(
cctx.cacheId(),
nodeId,
futVer,
@@ -1238,7 +1238,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
cctx.deploymentEnabled());
}
else {
- GridNearAtomicMultipleUpdateRequest req = new GridNearAtomicMultipleUpdateRequest(
+ GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
cctx.cacheId(),
primary.id(),
futVer,