You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:35 UTC
[43/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index 0000000,3189649..db47889
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@@ -1,0 -1,834 +1,822 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Lock request message.
+ */
+ public class GridDistributedLockRequest<K, V> extends GridDistributedBaseMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Sender node ID. */
+ private UUID nodeId;
+
+ /** Near transaction version. */
+ private GridCacheVersion nearXidVer;
+
+ /** Thread ID. */
+ private long threadId;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Max wait timeout. */
+ private long timeout;
+
+ /** Indicates whether lock is obtained within a scope of transaction. */
+ private boolean isInTx;
+
+ /** Invalidate flag for transactions. */
+ private boolean isInvalidate;
+
+ /** Indicates whether implicit lock so for read or write operation. */
+ private boolean isRead;
+
+ /** Transaction isolation. */
+ private IgniteTxIsolation isolation;
+
+ /** Key bytes for keys to lock. */
+ @GridDirectCollection(byte[].class)
+ private List<byte[]> keyBytes;
+
+ /** Keys. */
+ @GridDirectTransient
+ private List<K> keys;
+
+ /** Write entries. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private List<IgniteTxEntry<K, V>> writeEntries;
+
+ /** Serialized write entries. */
+ private byte[] writeEntriesBytes;
+
+ /** Array indicating whether value should be returned for a key. */
+ @GridToStringInclude
+ private boolean[] retVals;
+
+ /** Key-bytes index. */
+ @GridDirectTransient
+ protected int idx;
+
+ /** Key count. */
+ private int txSize;
+
+ /** Group lock key if this is a group-lock transaction. */
+ @GridDirectTransient
+ private IgniteTxKey grpLockKey;
+
+ /** Group lock key bytes. */
+ private byte[] grpLockKeyBytes;
+
+ /** Partition lock flag. Only if group-lock transaction. */
+ private boolean partLock;
+
+ /** DR versions. */
+ @GridToStringInclude
+ private GridCacheVersion[] drVersByIdx;
+
+ /**
+ * Empty constructor.
+ */
+ public GridDistributedLockRequest() {
+ /* No-op. */
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param nearXidVer Near transaction ID.
+ * @param threadId Thread ID.
+ * @param futId Future ID.
+ * @param lockVer Cache version.
+ * @param isInTx {@code True} if implicit transaction lock.
+ * @param isRead Indicates whether implicit lock is for read or write operation.
+ * @param isolation Transaction isolation.
+ * @param isInvalidate Invalidation flag.
+ * @param timeout Lock timeout.
+ * @param keyCnt Number of keys.
+ * @param txSize Expected transaction size.
+ * @param grpLockKey Group lock key if this is a group-lock transaction.
+ * @param partLock {@code True} if this is a group-lock transaction request and whole partition is
+ * locked.
+ */
+ public GridDistributedLockRequest(
+ int cacheId,
+ UUID nodeId,
+ @Nullable GridCacheVersion nearXidVer,
+ long threadId,
+ IgniteUuid futId,
+ GridCacheVersion lockVer,
+ boolean isInTx,
+ boolean isRead,
+ IgniteTxIsolation isolation,
+ boolean isInvalidate,
+ long timeout,
+ int keyCnt,
+ int txSize,
+ @Nullable IgniteTxKey grpLockKey,
+ boolean partLock
+ ) {
+ super(lockVer, keyCnt);
+
+ assert keyCnt > 0;
+ assert futId != null;
+ assert !isInTx || isolation != null;
+
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.nearXidVer = nearXidVer;
+ this.threadId = threadId;
+ this.futId = futId;
+ this.isInTx = isInTx;
+ this.isRead = isRead;
+ this.isolation = isolation;
+ this.isInvalidate = isInvalidate;
+ this.timeout = timeout;
+ this.txSize = txSize;
+ this.grpLockKey = grpLockKey;
+ this.partLock = partLock;
+
+ retVals = new boolean[keyCnt];
+ }
+
+ /**
+ *
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Near transaction ID.
+ */
+ public GridCacheVersion nearXidVersion() {
+ return nearXidVer;
+ }
+
+ /**
+ *
+ * @return Owner node thread ID.
+ */
+ public long threadId() {
+ return threadId;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return {@code True} if implicit transaction lock.
+ */
+ public boolean inTx() {
+ return isInTx;
+ }
+
+ /**
+ * @return Invalidate flag.
+ */
+ public boolean isInvalidate() {
+ return isInvalidate;
+ }
+
+ /**
+ * @return {@code True} if lock is implicit and for a read operation.
+ */
+ public boolean txRead() {
+ return isRead;
+ }
+
+ /**
+ * @param idx Key index.
+ * @return Flag indicating whether a value should be returned.
+ */
+ public boolean returnValue(int idx) {
+ return retVals[idx];
+ }
+
+ /**
+ * @return Return flags.
+ */
+ public boolean[] returnFlags() {
+ return retVals;
+ }
+
+ /**
+ * @return Transaction isolation or <tt>null</tt> if not in transaction.
+ */
+ public IgniteTxIsolation isolation() {
+ return isolation;
+ }
+
+ /**
+ *
+ * @return Key to lock.
+ */
+ public List<byte[]> keyBytes() {
+ return keyBytes;
+ }
+
+ /**
+ * @return Write entries list.
+ */
+ public List<IgniteTxEntry<K, V>> writeEntries() {
+ return writeEntries;
+ }
+
+ /**
+ * @return Tx size.
+ */
+ public int txSize() {
+ return txSize;
+ }
+
+ /**
+ * Adds a key.
+ *
+ * @param key Key.
+ * @param retVal Flag indicating whether value should be returned.
+ * @param keyBytes Key bytes.
+ * @param writeEntry Write entry.
+ * @param cands Candidates.
+ * @param drVer DR version.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void addKeyBytes(
+ K key,
+ @Nullable byte[] keyBytes,
+ @Nullable IgniteTxEntry<K, V> writeEntry,
+ boolean retVal,
+ @Nullable Collection<GridCacheMvccCandidate<K>> cands,
+ @Nullable GridCacheVersion drVer,
+ GridCacheContext<K, V> ctx
+ ) throws IgniteCheckedException {
+ if (ctx.deploymentEnabled())
+ prepareObject(key, ctx.shared());
+
+ if (keyBytes != null) {
+ if (this.keyBytes == null)
+ this.keyBytes = new ArrayList<>(keysCount());
+
+ this.keyBytes.add(keyBytes);
+ }
+
+ if (keys == null)
+ keys = new ArrayList<>(keysCount());
+
+ keys.add(key);
+
+ candidatesByIndex(idx, cands);
+ drVersionByIndex(idx, drVer);
+
+ retVals[idx] = retVal;
+
+ if (writeEntry != null) {
+ if (writeEntries == null) {
+ assert idx == 0 : "Cannot start adding write entries in the middle of lock message [idx=" + idx +
+ ", writeEntry=" + writeEntry + ']';
+
+ writeEntries = new ArrayList<>(keysCount());
+ }
+
+ writeEntries.add(writeEntry);
+ }
+
+ idx++;
+ }
+
+ /**
+ * @return Unmarshalled keys.
+ */
+ public List<K> keys() {
+ return keys;
+ }
+
+ /**
+ * @return {@code True} if lock request for group-lock transaction.
+ */
+ public boolean groupLock() {
+ return grpLockKey != null;
+ }
+
+ /**
+ * @return Group lock key.
+ */
+ @Nullable public IgniteTxKey groupLockKey() {
+ return grpLockKey;
+ }
+
+ /**
+ * @return {@code True} if partition is locked in group-lock transaction.
+ */
+ public boolean partitionLock() {
+ return partLock;
+ }
+
+ /**
+ * @return Max lock wait time.
+ */
+ public long timeout() {
+ return timeout;
+ }
+
+ /**
+ * @param idx Key index.
+ * @param drVer DR version.
+ */
+ @SuppressWarnings({"unchecked"})
+ public void drVersionByIndex(int idx, GridCacheVersion drVer) {
+ assert idx < keysCount();
+
+ // If nothing to add.
+ if (drVer == null)
+ return;
+
+ if (drVersByIdx == null)
+ drVersByIdx = new GridCacheVersion[keysCount()];
+
+ drVersByIdx[idx] = drVer;
+ }
+
+ /**
+ * @param idx Key index.
+ * @return DR versions for given key.
+ */
+ public GridCacheVersion drVersionByIndex(int idx) {
+ return drVersByIdx == null ? null : drVersByIdx[idx];
+ }
+
+ /**
+ * @return All DR versions.
+ */
+ public GridCacheVersion[] drVersions() {
+ return drVersByIdx;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (grpLockKey != null && grpLockKeyBytes == null) {
+ if (ctx.deploymentEnabled())
+ prepareObject(grpLockKey, ctx);
+
+ grpLockKeyBytes = CU.marshal(ctx, grpLockKey);
+ }
+
+ if (writeEntries != null) {
+ marshalTx(writeEntries, ctx);
+
+ writeEntriesBytes = ctx.marshaller().marshal(writeEntries);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (keys == null)
+ keys = unmarshalCollection(keyBytes, ctx, ldr);
+
+ if (grpLockKey == null && grpLockKeyBytes != null)
+ grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr);
+
+ if (writeEntriesBytes != null) {
+ writeEntries = ctx.marshaller().unmarshal(writeEntriesBytes, ldr);
+
+ unmarshalTx(writeEntries, false, ctx, ldr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneCallsConstructors", "OverriddenMethodCallDuringObjectConstruction",
+ "CloneDoesntCallSuperClone"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridDistributedLockRequest _clone = new GridDistributedLockRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDistributedLockRequest _clone = (GridDistributedLockRequest)_msg;
+
+ _clone.nodeId = nodeId;
+ _clone.nearXidVer = nearXidVer;
+ _clone.threadId = threadId;
+ _clone.futId = futId;
+ _clone.timeout = timeout;
+ _clone.isInTx = isInTx;
+ _clone.isInvalidate = isInvalidate;
+ _clone.isRead = isRead;
+ _clone.isolation = isolation;
+ _clone.keyBytes = keyBytes;
+ _clone.keys = keys;
+ _clone.writeEntries = writeEntries;
+ _clone.writeEntriesBytes = writeEntriesBytes;
+ _clone.retVals = retVals;
+ _clone.idx = idx;
+ _clone.txSize = txSize;
+ _clone.grpLockKey = grpLockKey;
+ _clone.grpLockKeyBytes = grpLockKeyBytes;
+ _clone.partLock = partLock;
+ _clone.drVersByIdx = drVersByIdx;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 8:
+ if (drVersByIdx != null) {
+ if (commState.it == null) {
- if (!commState.putInt(drVersByIdx.length))
++ if (!commState.putInt(null, drVersByIdx.length))
+ return false;
+
+ commState.it = arrayIterator(drVersByIdx);
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putCacheVersion((GridCacheVersion)commState.cur))
++ if (!commState.putCacheVersion(null, (GridCacheVersion)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid("futId", futId))
+ return false;
+
+ commState.idx++;
+
+ case 10:
- if (!commState.putByteArray(grpLockKeyBytes))
++ if (!commState.putByteArray("grpLockKeyBytes", grpLockKeyBytes))
+ return false;
+
+ commState.idx++;
+
+ case 11:
- if (!commState.putBoolean(isInTx))
++ if (!commState.putBoolean("isInTx", isInTx))
+ return false;
+
+ commState.idx++;
+
+ case 12:
- if (!commState.putBoolean(isInvalidate))
++ if (!commState.putBoolean("isInvalidate", isInvalidate))
+ return false;
+
+ commState.idx++;
+
+ case 13:
- if (!commState.putBoolean(isRead))
++ if (!commState.putBoolean("isRead", isRead))
+ return false;
+
+ commState.idx++;
+
+ case 14:
- if (!commState.putEnum(isolation))
++ if (!commState.putEnum("isolation", isolation))
+ return false;
+
+ commState.idx++;
+
+ case 15:
+ if (keyBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(keyBytes.size()))
++ if (!commState.putInt(null, keyBytes.size()))
+ return false;
+
+ commState.it = keyBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 16:
- if (!commState.putCacheVersion(nearXidVer))
++ if (!commState.putCacheVersion("nearXidVer", nearXidVer))
+ return false;
+
+ commState.idx++;
+
+ case 17:
- if (!commState.putUuid(nodeId))
++ if (!commState.putUuid("nodeId", nodeId))
+ return false;
+
+ commState.idx++;
+
+ case 18:
- if (!commState.putBoolean(partLock))
++ if (!commState.putBoolean("partLock", partLock))
+ return false;
+
+ commState.idx++;
+
+ case 19:
- if (!commState.putBooleanArray(retVals))
++ if (!commState.putBooleanArray("retVals", retVals))
+ return false;
+
+ commState.idx++;
+
+ case 20:
- if (!commState.putLong(threadId))
++ if (!commState.putLong("threadId", threadId))
+ return false;
+
+ commState.idx++;
+
+ case 21:
- if (!commState.putLong(timeout))
++ if (!commState.putLong("timeout", timeout))
+ return false;
+
+ commState.idx++;
+
+ case 22:
- if (!commState.putInt(txSize))
++ if (!commState.putInt("txSize", txSize))
+ return false;
+
+ commState.idx++;
+
+ case 23:
- if (!commState.putByteArray(writeEntriesBytes))
++ if (!commState.putByteArray("writeEntriesBytes", writeEntriesBytes))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 8:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (drVersByIdx == null)
+ drVersByIdx = new GridCacheVersion[commState.readSize];
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- GridCacheVersion _val = commState.getCacheVersion();
++ GridCacheVersion _val = commState.getCacheVersion(null);
+
- if (_val == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ drVersByIdx[i] = (GridCacheVersion)_val;
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 9:
- IgniteUuid futId0 = commState.getGridUuid();
++ futId = commState.getGridUuid("futId");
+
- if (futId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futId = futId0;
-
+ commState.idx++;
+
+ case 10:
- byte[] grpLockKeyBytes0 = commState.getByteArray();
++ grpLockKeyBytes = commState.getByteArray("grpLockKeyBytes");
+
- if (grpLockKeyBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- grpLockKeyBytes = grpLockKeyBytes0;
-
+ commState.idx++;
+
+ case 11:
- if (buf.remaining() < 1)
- return false;
++ isInTx = commState.getBoolean("isInTx");
+
- isInTx = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 12:
- if (buf.remaining() < 1)
- return false;
++ isInvalidate = commState.getBoolean("isInvalidate");
+
- isInvalidate = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 13:
- if (buf.remaining() < 1)
- return false;
++ isRead = commState.getBoolean("isRead");
+
- isRead = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 14:
- if (buf.remaining() < 1)
- return false;
++ byte isolation0 = commState.getByte("isolation");
+
- byte isolation0 = commState.getByte();
++ if (!commState.lastRead())
++ return false;
+
+ isolation = IgniteTxIsolation.fromOrdinal(isolation0);
+
+ commState.idx++;
+
+ case 15:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (keyBytes == null)
+ keyBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ keyBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 16:
- GridCacheVersion nearXidVer0 = commState.getCacheVersion();
++ nearXidVer = commState.getCacheVersion("nearXidVer");
+
- if (nearXidVer0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- nearXidVer = nearXidVer0;
-
+ commState.idx++;
+
+ case 17:
- UUID nodeId0 = commState.getUuid();
++ nodeId = commState.getUuid("nodeId");
+
- if (nodeId0 == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- nodeId = nodeId0;
-
+ commState.idx++;
+
+ case 18:
- if (buf.remaining() < 1)
- return false;
++ partLock = commState.getBoolean("partLock");
+
- partLock = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 19:
- boolean[] retVals0 = commState.getBooleanArray();
++ retVals = commState.getBooleanArray("retVals");
+
- if (retVals0 == BOOLEAN_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- retVals = retVals0;
-
+ commState.idx++;
+
+ case 20:
- if (buf.remaining() < 8)
- return false;
++ threadId = commState.getLong("threadId");
+
- threadId = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 21:
- if (buf.remaining() < 8)
- return false;
++ timeout = commState.getLong("timeout");
+
- timeout = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 22:
- if (buf.remaining() < 4)
- return false;
++ txSize = commState.getInt("txSize");
+
- txSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 23:
- byte[] writeEntriesBytes0 = commState.getByteArray();
++ writeEntriesBytes = commState.getByteArray("writeEntriesBytes");
+
- if (writeEntriesBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- writeEntriesBytes = writeEntriesBytes0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 22;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDistributedLockRequest.class, this, "keysCnt", retVals.length,
+ "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index 0000000,e24c053..9287763
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@@ -1,0 -1,437 +1,433 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Lock response message.
+ */
+ public class GridDistributedLockResponse<K, V> extends GridDistributedBaseMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Error. */
+ @GridDirectTransient
+ private Throwable err;
+
+ /** Serialized error. */
+ private byte[] errBytes;
+
+ /** Value bytes. */
+ @GridDirectCollection(GridCacheValueBytes.class)
+ private List<GridCacheValueBytes> valBytes;
+
+ /** Values. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private List<V> vals;
+
+ /**
+ * Empty constructor (required by {@link Externalizable}).
+ */
+ public GridDistributedLockResponse() {
+ /* No-op. */
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param lockVer Lock version.
+ * @param futId Future ID.
+ * @param cnt Key count.
+ */
+ public GridDistributedLockResponse(int cacheId,
+ GridCacheVersion lockVer,
+ IgniteUuid futId,
+ int cnt) {
+ super(lockVer, cnt);
+
+ assert futId != null;
+
+ this.cacheId = cacheId;
+ this.futId = futId;
+
+ vals = new ArrayList<>(cnt);
+ valBytes = new ArrayList<>(cnt);
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param lockVer Lock ID.
+ * @param futId Future ID.
+ * @param err Error.
+ */
+ public GridDistributedLockResponse(int cacheId,
+ GridCacheVersion lockVer,
+ IgniteUuid futId,
+ Throwable err) {
+ super(lockVer, 0);
+
+ assert futId != null;
+
+ this.cacheId = cacheId;
+ this.futId = futId;
+ this.err = err;
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param lockVer Lock ID.
+ * @param futId Future ID.
+ * @param cnt Count.
+ * @param err Error.
+ */
+ public GridDistributedLockResponse(int cacheId,
+ GridCacheVersion lockVer,
+ IgniteUuid futId,
+ int cnt,
+ Throwable err) {
+ super(lockVer, cnt);
+
+ assert futId != null;
+
+ this.cacheId = cacheId;
+ this.futId = futId;
+ this.err = err;
+
+ vals = new ArrayList<>(cnt);
+ valBytes = new ArrayList<>(cnt);
+ }
+
+ /**
+ *
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Error.
+ */
+ public Throwable error() {
+ return err;
+ }
+
+ /**
+ * @param err Error to set.
+ */
+ public void error(Throwable err) {
+ this.err = err;
+ }
+
+ /**
+ * @param idx Index of locked flag.
+ * @return Value of locked flag at given index.
+ */
+ public boolean isCurrentlyLocked(int idx) {
+ assert idx >= 0;
+
+ Collection<GridCacheMvccCandidate<K>> cands = candidatesByIndex(idx);
+
+ for (GridCacheMvccCandidate<K> cand : cands)
+ if (cand.owner())
+ return true;
+
+ return false;
+ }
+
+ /**
+ * @param idx Candidates index.
+ * @param cands Collection of candidates.
+ * @param committedVers Committed versions relative to lock version.
+ * @param rolledbackVers Rolled back versions relative to lock version.
+ */
+ public void setCandidates(int idx, Collection<GridCacheMvccCandidate<K>> cands,
+ Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) {
+ assert idx >= 0;
+
+ completedVersions(committedVers, rolledbackVers);
+
+ candidatesByIndex(idx, cands);
+ }
+
+ /**
+ * @param idx Value index.
+ *
+ * @return Value bytes (possibly {@code null}).
+ */
+ @Nullable public byte[] valueBytes(int idx) {
+ if (!F.isEmpty(valBytes)) {
+ GridCacheValueBytes res = valBytes.get(idx);
+
+ if (res != null && !res.isPlain())
+ return res.get();
+ }
+
+ return null;
+ }
+
+ /**
+ * @param val Value.
+ * @param valBytes Value bytes (possibly {@code null}).
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void addValueBytes(V val, @Nullable byte[] valBytes, GridCacheContext<K, V> ctx) throws IgniteCheckedException {
+ if (ctx.deploymentEnabled())
+ prepareObject(val, ctx.shared());
+
+ GridCacheValueBytes vb = null;
+
+ if (val != null) {
+ vb = val instanceof byte[] ? GridCacheValueBytes.plain(val) : valBytes != null ?
+ GridCacheValueBytes.marshaled(valBytes) : null;
+ }
+ else if (valBytes != null)
+ vb = GridCacheValueBytes.marshaled(valBytes);
+
+ this.valBytes.add(vb);
+
+ vals.add(val);
+ }
+
+ /**
+ * @return Values size.
+ */
+ protected int valuesSize() {
+ return vals.size();
+ }
+
+ /**
+ * @param idx Index.
+ * @return Value for given index.
+ */
+ @Nullable public V value(int idx) {
+ if (!F.isEmpty(vals)) {
+ V res = vals.get(idx);
+
+ if (res != null)
+ return res;
+ }
+
+ // If there was no value in values collection, then it could be in value bytes collection in case of byte[].
+ if (!F.isEmpty(valBytes)) {
+ GridCacheValueBytes res = valBytes.get(idx);
+
+ if (res != null && res.isPlain())
+ return (V)res.get();
+ }
+
+ // Value is not found in both value and value bytes collections.
+ return null;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (F.isEmpty(valBytes) && !F.isEmpty(vals))
+ valBytes = marshalValuesCollection(vals, ctx);
+
+ if (err != null)
+ errBytes = ctx.marshaller().marshal(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (F.isEmpty(vals) && !F.isEmpty(valBytes))
+ vals = unmarshalValueBytesCollection(valBytes, ctx, ldr);
+
+ if (errBytes != null)
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors",
+ "OverriddenMethodCallDuringObjectConstruction"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridDistributedLockResponse _clone = new GridDistributedLockResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDistributedLockResponse _clone = (GridDistributedLockResponse)_msg;
+
+ _clone.futId = futId;
+ _clone.err = err;
+ _clone.errBytes = errBytes;
+ _clone.valBytes = valBytes;
+ _clone.vals = vals;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 8:
- if (!commState.putByteArray(errBytes))
++ if (!commState.putByteArray("errBytes", errBytes))
+ return false;
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid("futId", futId))
+ return false;
+
+ commState.idx++;
+
+ case 10:
+ if (valBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(valBytes.size()))
++ if (!commState.putInt(null, valBytes.size()))
+ return false;
+
+ commState.it = valBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putValueBytes((GridCacheValueBytes)commState.cur))
++ if (!commState.putValueBytes(null, (GridCacheValueBytes)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 8:
- byte[] errBytes0 = commState.getByteArray();
++ errBytes = commState.getByteArray("errBytes");
+
- if (errBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- errBytes = errBytes0;
-
+ commState.idx++;
+
+ case 9:
- IgniteUuid futId0 = commState.getGridUuid();
++ futId = commState.getGridUuid("futId");
+
- if (futId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futId = futId0;
-
+ commState.idx++;
+
+ case 10:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (valBytes == null)
+ valBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- GridCacheValueBytes _val = commState.getValueBytes();
++ GridCacheValueBytes _val = commState.getValueBytes(null);
+
- if (_val == VAL_BYTES_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ valBytes.add((GridCacheValueBytes)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 23;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDistributedLockResponse.class, this,
+ "valBytesLen", valBytes == null ? 0 : valBytes.size(),
+ "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 0000000,fef3eda..8b84546
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@@ -1,0 -1,696 +1,692 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Transaction completion message.
+ */
+ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Thread ID. */
+ private long threadId;
+
+ /** Commit version. */
+ private GridCacheVersion commitVer;
+
+ /** Invalidate flag. */
+ private boolean invalidate;
+
+ /** Commit flag. */
+ private boolean commit;
+
+ /** Sync commit flag. */
+ private boolean syncCommit;
+
+ /** Sync commit flag. */
+ private boolean syncRollback;
+
+ /** Min version used as base for completed versions. */
+ private GridCacheVersion baseVer;
+
+ /** Transaction write entries. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Collection<IgniteTxEntry<K, V>> writeEntries;
+
+ /** */
+ @GridDirectCollection(byte[].class)
+ private Collection<byte[]> writeEntriesBytes;
+
+ /** Write entries which have not been transferred to nodes during lock request. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Collection<IgniteTxEntry<K, V>> recoveryWrites;
+
+ /** */
+ @GridDirectCollection(byte[].class)
+ private Collection<byte[]> recoveryWritesBytes;
+
+ /** Expected txSize. */
+ private int txSize;
+
+ /** Group lock key. */
+ @GridDirectTransient
+ private IgniteTxKey grpLockKey;
+
+ /** Group lock key bytes. */
+ private byte[] grpLockKeyBytes;
+
+ /** System flag. */
+ private boolean sys;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridDistributedTxFinishRequest() {
+ /* No-op. */
+ }
+
+ /**
+ * @param xidVer Transaction ID.
+ * @param futId future ID.
+ * @param threadId Thread ID.
+ * @param commitVer Commit version.
+ * @param commit Commit flag.
+ * @param invalidate Invalidate flag.
+ * @param sys System flag.
+ * @param baseVer Base version.
+ * @param committedVers Committed versions.
+ * @param rolledbackVers Rolled back versions.
+ * @param txSize Expected transaction size.
+ * @param writeEntries Write entries.
+ * @param recoveryWrites Recover entries. In pessimistic mode entries which were not transferred to remote nodes
+ * with lock requests. {@code Null} for optimistic mode.
+ * @param grpLockKey Group lock key if this is a group-lock transaction.
+ */
+ public GridDistributedTxFinishRequest(
+ GridCacheVersion xidVer,
+ IgniteUuid futId,
+ @Nullable GridCacheVersion commitVer,
+ long threadId,
+ boolean commit,
+ boolean invalidate,
+ boolean sys,
+ boolean syncCommit,
+ boolean syncRollback,
+ GridCacheVersion baseVer,
+ Collection<GridCacheVersion> committedVers,
+ Collection<GridCacheVersion> rolledbackVers,
+ int txSize,
+ Collection<IgniteTxEntry<K, V>> writeEntries,
+ Collection<IgniteTxEntry<K, V>> recoveryWrites,
+ @Nullable IgniteTxKey grpLockKey
+ ) {
+ super(xidVer, writeEntries == null ? 0 : writeEntries.size());
+ assert xidVer != null;
+
+ this.futId = futId;
+ this.commitVer = commitVer;
+ this.threadId = threadId;
+ this.commit = commit;
+ this.invalidate = invalidate;
+ this.sys = sys;
+ this.syncCommit = syncCommit;
+ this.syncRollback = syncRollback;
+ this.baseVer = baseVer;
+ this.txSize = txSize;
+ this.writeEntries = writeEntries;
+ this.recoveryWrites = recoveryWrites;
+ this.grpLockKey = grpLockKey;
+
+ completedVersions(committedVers, rolledbackVers);
+ }
+
+ /**
+ * Clones write entries so that near entries are not passed to DHT cache.
+ */
+ public void cloneEntries() {
+ if (F.isEmpty(writeEntries))
+ return;
+
+ Collection<IgniteTxEntry<K, V>> cp = new ArrayList<>(writeEntries.size());
+
+ for (IgniteTxEntry<K, V> e : writeEntries) {
+ GridCacheContext<K, V> cacheCtx = e.context();
+
+ // Clone only if it is a near cache.
+ if (cacheCtx.isNear())
+ cp.add(e.cleanCopy(cacheCtx.nearTx().dht().context()));
+ else
+ cp.add(e);
+ }
+
+ writeEntries = cp;
+ }
+
+ /**
+ * @return System flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Thread ID.
+ */
+ public long threadId() {
+ return threadId;
+ }
+
+ /**
+ * @return Commit version.
+ */
+ public GridCacheVersion commitVersion() {
+ return commitVer;
+ }
+
+ /**
+ * @return Commit flag.
+ */
+ public boolean commit() {
+ return commit;
+ }
+
+ /**
+ *
+ * @return Invalidate flag.
+ */
+ public boolean isInvalidate() {
+ return invalidate;
+ }
+
+ /**
+ * @return Sync commit flag.
+ */
+ public boolean syncCommit() {
+ return syncCommit;
+ }
+
+ /**
+ * @return Sync rollback flag.
+ */
+ public boolean syncRollback() {
+ return syncRollback;
+ }
+
+ /**
+ * @return Base version.
+ */
+ public GridCacheVersion baseVersion() {
+ return baseVer;
+ }
+
+ /**
+ * @return Write entries.
+ */
+ public Collection<IgniteTxEntry<K, V>> writes() {
+ return writeEntries;
+ }
+
+ /**
+ * @return Recover entries.
+ */
+ public Collection<IgniteTxEntry<K, V>> recoveryWrites() {
+ return recoveryWrites;
+ }
+
+ /**
+ * @return Expected tx size.
+ */
+ public int txSize() {
+ return txSize;
+ }
+
+ /**
+ *
+ * @return {@code True} if reply is required.
+ */
+ public boolean replyRequired() {
+ return commit ? syncCommit : syncRollback;
+ }
+
+ /**
+ * @return {@code True} if group lock transaction.
+ */
+ public boolean groupLock() {
+ return grpLockKey != null;
+ }
+
+ /**
+ * @return Group lock key.
+ */
+ @Nullable public IgniteTxKey groupLockKey() {
+ return grpLockKey;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (writeEntries != null) {
+ marshalTx(writeEntries, ctx);
+
+ writeEntriesBytes = new ArrayList<>(writeEntries.size());
+
+ for (IgniteTxEntry<K, V> e : writeEntries)
+ writeEntriesBytes.add(ctx.marshaller().marshal(e));
+ }
+
+ if (recoveryWrites != null) {
+ marshalTx(recoveryWrites, ctx);
+
+ recoveryWritesBytes = new ArrayList<>(recoveryWrites.size());
+
+ for (IgniteTxEntry<K, V> e : recoveryWrites)
+ recoveryWritesBytes.add(ctx.marshaller().marshal(e));
+ }
+
+ if (grpLockKey != null && grpLockKeyBytes == null) {
+ if (ctx.deploymentEnabled())
+ prepareObject(grpLockKey, ctx);
+
+ grpLockKeyBytes = CU.marshal(ctx, grpLockKey);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (writeEntriesBytes != null) {
+ writeEntries = new ArrayList<>(writeEntriesBytes.size());
+
+ for (byte[] arr : writeEntriesBytes)
+ writeEntries.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr));
+
+ unmarshalTx(writeEntries, false, ctx, ldr);
+ }
+
+ if (recoveryWritesBytes != null) {
+ recoveryWrites = new ArrayList<>(recoveryWritesBytes.size());
+
+ for (byte[] arr : recoveryWritesBytes)
+ recoveryWrites.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr));
+
+ unmarshalTx(recoveryWrites, false, ctx, ldr);
+ }
+
+ if (grpLockKeyBytes != null && grpLockKey == null)
+ grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors",
+ "OverriddenMethodCallDuringObjectConstruction"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridDistributedTxFinishRequest _clone = new GridDistributedTxFinishRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDistributedTxFinishRequest _clone = (GridDistributedTxFinishRequest)_msg;
+
+ _clone.futId = futId;
+ _clone.threadId = threadId;
+ _clone.commitVer = commitVer;
+ _clone.invalidate = invalidate;
+ _clone.commit = commit;
++ _clone.syncCommit = syncCommit;
++ _clone.syncRollback = syncRollback;
+ _clone.baseVer = baseVer;
+ _clone.writeEntries = writeEntries;
+ _clone.writeEntriesBytes = writeEntriesBytes;
+ _clone.recoveryWrites = recoveryWrites;
+ _clone.recoveryWritesBytes = recoveryWritesBytes;
+ _clone.txSize = txSize;
+ _clone.grpLockKey = grpLockKey;
+ _clone.grpLockKeyBytes = grpLockKeyBytes;
+ _clone.sys = sys;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 8:
- if (!commState.putCacheVersion(baseVer))
++ if (!commState.putCacheVersion("baseVer", baseVer))
+ return false;
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putBoolean(commit))
++ if (!commState.putBoolean("commit", commit))
+ return false;
+
+ commState.idx++;
+
+ case 10:
- if (!commState.putCacheVersion(commitVer))
++ if (!commState.putCacheVersion("commitVer", commitVer))
+ return false;
+
+ commState.idx++;
+
+ case 11:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid("futId", futId))
+ return false;
+
+ commState.idx++;
+
+ case 12:
- if (!commState.putByteArray(grpLockKeyBytes))
++ if (!commState.putByteArray("grpLockKeyBytes", grpLockKeyBytes))
+ return false;
+
+ commState.idx++;
+
+ case 13:
- if (!commState.putBoolean(invalidate))
++ if (!commState.putBoolean("invalidate", invalidate))
+ return false;
+
+ commState.idx++;
+
+ case 14:
- if (!commState.putBoolean(syncCommit))
- return false;
-
- commState.idx++;
-
- case 15:
- if (!commState.putBoolean(syncRollback))
- return false;
-
- commState.idx++;
-
- case 16:
+ if (recoveryWritesBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(recoveryWritesBytes.size()))
++ if (!commState.putInt(null, recoveryWritesBytes.size()))
+ return false;
+
+ commState.it = recoveryWritesBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
++ case 15:
++ if (!commState.putBoolean("syncCommit", syncCommit))
++ return false;
++
++ commState.idx++;
++
++ case 16:
++ if (!commState.putBoolean("syncRollback", syncRollback))
++ return false;
++
++ commState.idx++;
++
+ case 17:
- if (!commState.putLong(threadId))
++ if (!commState.putBoolean("sys", sys))
+ return false;
+
+ commState.idx++;
+
+ case 18:
- if (!commState.putInt(txSize))
++ if (!commState.putLong("threadId", threadId))
+ return false;
+
+ commState.idx++;
+
+ case 19:
++ if (!commState.putInt("txSize", txSize))
++ return false;
++
++ commState.idx++;
++
++ case 20:
+ if (writeEntriesBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(writeEntriesBytes.size()))
++ if (!commState.putInt(null, writeEntriesBytes.size()))
+ return false;
+
+ commState.it = writeEntriesBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
- case 20:
- if (!commState.putBoolean(sys))
- return false;
-
- commState.idx++;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 8:
- GridCacheVersion baseVer0 = commState.getCacheVersion();
++ baseVer = commState.getCacheVersion("baseVer");
+
- if (baseVer0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- baseVer = baseVer0;
-
+ commState.idx++;
+
+ case 9:
- if (buf.remaining() < 1)
- return false;
++ commit = commState.getBoolean("commit");
+
- commit = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 10:
- GridCacheVersion commitVer0 = commState.getCacheVersion();
++ commitVer = commState.getCacheVersion("commitVer");
+
- if (commitVer0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- commitVer = commitVer0;
-
+ commState.idx++;
+
+ case 11:
- IgniteUuid futId0 = commState.getGridUuid();
++ futId = commState.getGridUuid("futId");
+
- if (futId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futId = futId0;
-
+ commState.idx++;
+
+ case 12:
- byte[] grpLockKeyBytes0 = commState.getByteArray();
++ grpLockKeyBytes = commState.getByteArray("grpLockKeyBytes");
+
- if (grpLockKeyBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- grpLockKeyBytes = grpLockKeyBytes0;
-
+ commState.idx++;
+
+ case 13:
- if (buf.remaining() < 1)
- return false;
-
- invalidate = commState.getBoolean();
-
- commState.idx++;
-
- case 14:
- if (buf.remaining() < 1)
- return false;
-
- syncCommit = commState.getBoolean();
-
- commState.idx++;
++ invalidate = commState.getBoolean("invalidate");
+
- case 15:
- if (buf.remaining() < 1)
++ if (!commState.lastRead())
+ return false;
+
- syncRollback = commState.getBoolean();
-
+ commState.idx++;
+
- case 16:
++ case 14:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (recoveryWritesBytes == null)
+ recoveryWritesBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ recoveryWritesBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
- case 17:
- if (buf.remaining() < 8)
++ case 15:
++ syncCommit = commState.getBoolean("syncCommit");
++
++ if (!commState.lastRead())
+ return false;
+
- threadId = commState.getLong();
++ commState.idx++;
++
++ case 16:
++ syncRollback = commState.getBoolean("syncRollback");
++
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
- case 18:
- if (buf.remaining() < 4)
++ case 17:
++ sys = commState.getBoolean("sys");
++
++ if (!commState.lastRead())
+ return false;
+
- txSize = commState.getInt();
++ commState.idx++;
++
++ case 18:
++ threadId = commState.getLong("threadId");
++
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 19:
++ txSize = commState.getInt("txSize");
++
++ if (!commState.lastRead())
++ return false;
++
++ commState.idx++;
++
++ case 20:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (writeEntriesBytes == null)
+ writeEntriesBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ writeEntriesBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
- case 20:
- if (buf.remaining() < 1)
- return false;
-
- sys = commState.getBoolean();
-
- commState.idx++;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 24;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return GridToStringBuilder.toString(GridDistributedTxFinishRequest.class, this,
+ "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 0000000,02f8bb4..0b2acfa
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@@ -1,0 -1,173 +1,169 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Transaction finish response.
+ */
+ public class GridDistributedTxFinishResponse<K, V> extends GridCacheMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private GridCacheVersion txId;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridDistributedTxFinishResponse() {
+ /* No-op. */
+ }
+
+ /**
+ * @param txId Transaction id.
+ * @param futId Future ID.
+ */
+ public GridDistributedTxFinishResponse(GridCacheVersion txId, IgniteUuid futId) {
+ assert txId != null;
+ assert futId != null;
+
+ this.txId = txId;
+ this.futId = futId;
+ }
+
+ /**
+ *
+ * @return Transaction id.
+ */
+ public GridCacheVersion xid() {
+ return txId;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors",
+ "OverriddenMethodCallDuringObjectConstruction"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridDistributedTxFinishResponse _clone = new GridDistributedTxFinishResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDistributedTxFinishResponse _clone = (GridDistributedTxFinishResponse)_msg;
+
+ _clone.txId = txId;
+ _clone.futId = futId;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 3:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid("futId", futId))
+ return false;
+
+ commState.idx++;
+
+ case 4:
- if (!commState.putCacheVersion(txId))
++ if (!commState.putCacheVersion("txId", txId))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 3:
- IgniteUuid futId0 = commState.getGridUuid();
++ futId = commState.getGridUuid("futId");
+
- if (futId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futId = futId0;
-
+ commState.idx++;
+
+ case 4:
- GridCacheVersion txId0 = commState.getCacheVersion();
++ txId = commState.getCacheVersion("txId");
+
- if (txId0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- txId = txId0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 25;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return GridToStringBuilder.toString(GridDistributedTxFinishResponse.class, this);
+ }
+ }