You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:34 UTC
[42/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 0000000,7d5741f..11f454f
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@@ -1,0 -1,777 +1,771 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Transaction prepare request for optimistic and eventually consistent
+ * transactions.
+ */
+ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Thread ID. */
+ @GridToStringInclude
+ private long threadId;
+
+ /** Transaction concurrency. */
+ @GridToStringInclude
+ private IgniteTxConcurrency concurrency;
+
+ /** Transaction isolation. */
+ @GridToStringInclude
+ private IgniteTxIsolation isolation;
+
+ /** Commit version for EC transactions. */
+ @GridToStringInclude
+ private GridCacheVersion commitVer;
+
+ /** Transaction timeout. */
+ @GridToStringInclude
+ private long timeout;
+
+ /** Invalidation flag. */
+ @GridToStringInclude
+ private boolean invalidate;
+
+ /** Transaction read set. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Collection<IgniteTxEntry<K, V>> reads;
+
+ /** */
+ @GridDirectCollection(byte[].class)
+ private Collection<byte[]> readsBytes;
+
+ /** Transaction write entries. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Collection<IgniteTxEntry<K, V>> writes;
+
+ /** */
+ @GridDirectCollection(byte[].class)
+ private Collection<byte[]> writesBytes;
+
+ /** DHT versions to verify. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Map<IgniteTxKey<K>, GridCacheVersion> dhtVers;
+
+ /** Serialized map. */
+ @GridToStringExclude
+ private byte[] dhtVersBytes;
+
+ /** Group lock key, if any. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private IgniteTxKey grpLockKey;
+
+ /** Group lock key bytes. */
+ @GridToStringExclude
+ private byte[] grpLockKeyBytes;
+
+ /** Partition lock flag. */
+ private boolean partLock;
+
+ /** Expected transaction size. */
+ private int txSize;
+
+ /** Transaction nodes mapping (primary node -> related backup nodes). */
+ @GridDirectTransient
+ private Map<UUID, Collection<UUID>> txNodes;
+
+ /** */
+ private byte[] txNodesBytes;
+
+ /** System flag. */
+ private boolean sys;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public GridDistributedTxPrepareRequest() {
+ /* No-op. */
+ }
+
+ /**
+ * @param tx Cache transaction.
+ * @param reads Read entries.
+ * @param writes Write entries.
+ * @param grpLockKey Group lock key.
+ * @param partLock {@code True} if preparing group-lock transaction with partition lock.
+ * @param txNodes Transaction nodes mapping.
+ */
+ public GridDistributedTxPrepareRequest(
+ IgniteTxEx<K, V> tx,
+ @Nullable Collection<IgniteTxEntry<K, V>> reads,
+ Collection<IgniteTxEntry<K, V>> writes,
+ IgniteTxKey grpLockKey,
+ boolean partLock,
+ Map<UUID, Collection<UUID>> txNodes
+ ) {
+ super(tx.xidVersion(), 0);
+
+ commitVer = null;
+ threadId = tx.threadId();
+ concurrency = tx.concurrency();
+ isolation = tx.isolation();
+ timeout = tx.timeout();
+ invalidate = tx.isInvalidate();
+ txSize = tx.size();
+ sys = tx.system();
+
+ this.reads = reads;
+ this.writes = writes;
+ this.grpLockKey = grpLockKey;
+ this.partLock = partLock;
+ this.txNodes = txNodes;
+ }
+
+ /**
+ * @return Transaction nodes mapping.
+ */
+ public Map<UUID, Collection<UUID>> transactionNodes() {
+ return txNodes;
+ }
+
+ /**
+ * @return System flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /**
+ * Adds version to be verified on remote node.
+ *
+ * @param key Key for which version is verified.
+ * @param dhtVer DHT version to check.
+ */
+ public void addDhtVersion(IgniteTxKey<K> key, @Nullable GridCacheVersion dhtVer) {
+ if (dhtVers == null)
+ dhtVers = new HashMap<>();
+
+ dhtVers.put(key, dhtVer);
+ }
+
+ /**
+ * @return Map of versions to be verified.
+ */
+ public Map<IgniteTxKey<K>, GridCacheVersion> dhtVersions() {
+ return dhtVers == null ? Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap() : dhtVers;
+ }
+
+ /**
+ * @return Thread ID.
+ */
+ public long threadId() {
+ return threadId;
+ }
+
+ /**
+ * @return Commit version.
+ */
+ public GridCacheVersion commitVersion() { return commitVer; }
+
+ /**
+ * @return Invalidate flag.
+ */
+ public boolean isInvalidate() { return invalidate; }
+
+ /**
+ * @return Transaction timeout.
+ */
+ public long timeout() {
+ return timeout;
+ }
+
+ /**
+ * @return Concurrency.
+ */
+ public IgniteTxConcurrency concurrency() {
+ return concurrency;
+ }
+
+ /**
+ * @return Isolation level.
+ */
+ public IgniteTxIsolation isolation() {
+ return isolation;
+ }
+
+ /**
+ * @return Read set.
+ */
+ public Collection<IgniteTxEntry<K, V>> reads() {
+ return reads;
+ }
+
+ /**
+ * @return Write entries.
+ */
+ public Collection<IgniteTxEntry<K, V>> writes() {
+ return writes;
+ }
+
+ /**
+ * @param reads Reads.
+ */
+ protected void reads(Collection<IgniteTxEntry<K, V>> reads) {
+ this.reads = reads;
+ }
+
+ /**
+ * @param writes Writes.
+ */
+ protected void writes(Collection<IgniteTxEntry<K, V>> writes) {
+ this.writes = writes;
+ }
+
+ /**
+ * @return Group lock key if preparing group-lock transaction.
+ */
+ @Nullable public IgniteTxKey groupLockKey() {
+ return grpLockKey;
+ }
+
+ /**
+ * @return {@code True} if preparing group-lock transaction with partition lock.
+ */
+ public boolean partitionLock() {
+ return partLock;
+ }
+
+ /**
+ * @return Expected transaction size.
+ */
+ public int txSize() {
+ return txSize;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (writes != null) {
+ marshalTx(writes, ctx);
+
+ writesBytes = new ArrayList<>(writes.size());
+
+ for (IgniteTxEntry<K, V> e : writes)
+ writesBytes.add(ctx.marshaller().marshal(e));
+ }
+
+ if (reads != null) {
+ marshalTx(reads, ctx);
+
+ readsBytes = new ArrayList<>(reads.size());
+
+ for (IgniteTxEntry<K, V> e : reads)
+ readsBytes.add(ctx.marshaller().marshal(e));
+ }
+
+ if (grpLockKey != null && grpLockKeyBytes == null)
+ grpLockKeyBytes = ctx.marshaller().marshal(grpLockKey);
+
+ if (dhtVers != null && dhtVersBytes == null)
+ dhtVersBytes = ctx.marshaller().marshal(dhtVers);
+
+ if (txNodes != null)
+ txNodesBytes = ctx.marshaller().marshal(txNodes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (writesBytes != null) {
+ writes = new ArrayList<>(writesBytes.size());
+
+ for (byte[] arr : writesBytes)
+ writes.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr));
+
+ unmarshalTx(writes, false, ctx, ldr);
+ }
+
+ if (readsBytes != null) {
+ reads = new ArrayList<>(readsBytes.size());
+
+ for (byte[] arr : readsBytes)
+ reads.add(ctx.marshaller().<IgniteTxEntry<K, V>>unmarshal(arr, ldr));
+
+ unmarshalTx(reads, false, ctx, ldr);
+ }
+
+ if (grpLockKeyBytes != null && grpLockKey == null)
+ grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr);
+
+ if (dhtVersBytes != null && dhtVers == null)
+ dhtVers = ctx.marshaller().unmarshal(dhtVersBytes, ldr);
+
+ if (txNodesBytes != null)
+ txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr);
+ }
+
+ /**
+ *
+ * @param out Output.
+ * @param col Set to write.
+ * @throws IOException If write failed.
+ */
+ private void writeCollection(ObjectOutput out, Collection<IgniteTxEntry<K, V>> col) throws IOException {
+ boolean empty = F.isEmpty(col);
+
+ if (!empty) {
+ out.writeInt(col.size());
+
+ for (IgniteTxEntry<K, V> e : col) {
+ V val = e.value();
+ boolean hasWriteVal = e.hasWriteValue();
+ boolean hasReadVal = e.hasReadValue();
+
+ try {
+ // Don't serialize value if invalidate is set to true.
+ if (invalidate)
+ e.value(null, false, false);
+
+ out.writeObject(e);
+ }
+ finally {
+ // Set original value back.
+ e.value(val, hasWriteVal, hasReadVal);
+ }
+ }
+ }
+ else
+ out.writeInt(-1);
+ }
+
+ /**
+ * @param in Input.
+ * @return Deserialized set.
+ * @throws IOException If deserialization failed.
+ * @throws ClassNotFoundException If deserialized class could not be found.
+ */
+ @SuppressWarnings({"unchecked"})
+ @Nullable private Collection<IgniteTxEntry<K, V>> readCollection(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ List<IgniteTxEntry<K, V>> col = null;
+
+ int size = in.readInt();
+
+ // Check null flag.
+ if (size != -1) {
+ col = new ArrayList<>(size);
+
+ for (int i = 0; i < size; i++)
+ col.add((IgniteTxEntry<K, V>)in.readObject());
+ }
+
+ return col == null ? Collections.<IgniteTxEntry<K,V>>emptyList() : col;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors",
+ "OverriddenMethodCallDuringObjectConstruction"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridDistributedTxPrepareRequest _clone = new GridDistributedTxPrepareRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDistributedTxPrepareRequest _clone = (GridDistributedTxPrepareRequest)_msg;
+
+ _clone.threadId = threadId;
+ _clone.concurrency = concurrency;
+ _clone.isolation = isolation;
+ _clone.commitVer = commitVer;
+ _clone.timeout = timeout;
+ _clone.invalidate = invalidate;
+ _clone.reads = reads;
+ _clone.readsBytes = readsBytes;
+ _clone.writes = writes;
+ _clone.writesBytes = writesBytes;
+ _clone.dhtVers = dhtVers;
+ _clone.dhtVersBytes = dhtVersBytes;
+ _clone.grpLockKey = grpLockKey;
+ _clone.grpLockKeyBytes = grpLockKeyBytes;
+ _clone.partLock = partLock;
+ _clone.txSize = txSize;
+ _clone.txNodes = txNodes;
+ _clone.txNodesBytes = txNodesBytes;
+ _clone.sys = sys;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 8:
- if (!commState.putCacheVersion(commitVer))
++ if (!commState.putCacheVersion("commitVer", commitVer))
+ return false;
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putEnum(concurrency))
++ if (!commState.putEnum("concurrency", concurrency))
+ return false;
+
+ commState.idx++;
+
+ case 10:
- if (!commState.putByteArray(dhtVersBytes))
++ if (!commState.putByteArray("dhtVersBytes", dhtVersBytes))
+ return false;
+
+ commState.idx++;
+
+ case 11:
- if (!commState.putByteArray(grpLockKeyBytes))
++ if (!commState.putByteArray("grpLockKeyBytes", grpLockKeyBytes))
+ return false;
+
+ commState.idx++;
+
+ case 12:
- if (!commState.putBoolean(invalidate))
++ if (!commState.putBoolean("invalidate", invalidate))
+ return false;
+
+ commState.idx++;
+
+ case 13:
- if (!commState.putEnum(isolation))
++ if (!commState.putEnum("isolation", isolation))
+ return false;
+
+ commState.idx++;
+
+ case 14:
- if (!commState.putBoolean(partLock))
++ if (!commState.putBoolean("partLock", partLock))
+ return false;
+
+ commState.idx++;
+
+ case 15:
+ if (readsBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(readsBytes.size()))
++ if (!commState.putInt(null, readsBytes.size()))
+ return false;
+
+ commState.it = readsBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 16:
- if (!commState.putLong(threadId))
++ if (!commState.putBoolean("sys", sys))
+ return false;
+
+ commState.idx++;
+
+ case 17:
- if (!commState.putLong(timeout))
++ if (!commState.putLong("threadId", threadId))
+ return false;
+
+ commState.idx++;
+
+ case 18:
- if (!commState.putByteArray(txNodesBytes))
++ if (!commState.putLong("timeout", timeout))
+ return false;
+
+ commState.idx++;
+
+ case 19:
- if (!commState.putInt(txSize))
++ if (!commState.putByteArray("txNodesBytes", txNodesBytes))
+ return false;
+
+ commState.idx++;
+
+ case 20:
++ if (!commState.putInt("txSize", txSize))
++ return false;
++
++ commState.idx++;
++
++ case 21:
+ if (writesBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(writesBytes.size()))
++ if (!commState.putInt(null, writesBytes.size()))
+ return false;
+
+ commState.it = writesBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
- case 21:
- if (!commState.putBoolean(sys))
- return false;
-
- commState.idx++;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 8:
- GridCacheVersion commitVer0 = commState.getCacheVersion();
++ commitVer = commState.getCacheVersion("commitVer");
+
- if (commitVer0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- commitVer = commitVer0;
-
+ commState.idx++;
+
+ case 9:
- if (buf.remaining() < 1)
- return false;
++ byte concurrency0 = commState.getByte("concurrency");
+
- byte concurrency0 = commState.getByte();
++ if (!commState.lastRead())
++ return false;
+
+ concurrency = IgniteTxConcurrency.fromOrdinal(concurrency0);
+
+ commState.idx++;
+
+ case 10:
- byte[] dhtVersBytes0 = commState.getByteArray();
++ dhtVersBytes = commState.getByteArray("dhtVersBytes");
+
- if (dhtVersBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- dhtVersBytes = dhtVersBytes0;
-
+ commState.idx++;
+
+ case 11:
- byte[] grpLockKeyBytes0 = commState.getByteArray();
++ grpLockKeyBytes = commState.getByteArray("grpLockKeyBytes");
+
- if (grpLockKeyBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- grpLockKeyBytes = grpLockKeyBytes0;
-
+ commState.idx++;
+
+ case 12:
- if (buf.remaining() < 1)
- return false;
++ invalidate = commState.getBoolean("invalidate");
+
- invalidate = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 13:
- if (buf.remaining() < 1)
- return false;
++ byte isolation0 = commState.getByte("isolation");
+
- byte isolation0 = commState.getByte();
++ if (!commState.lastRead())
++ return false;
+
+ isolation = IgniteTxIsolation.fromOrdinal(isolation0);
+
+ commState.idx++;
+
+ case 14:
- if (buf.remaining() < 1)
- return false;
++ partLock = commState.getBoolean("partLock");
+
- partLock = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 15:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (readsBytes == null)
+ readsBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ readsBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 16:
- if (buf.remaining() < 8)
- return false;
++ sys = commState.getBoolean("sys");
+
- threadId = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 17:
- if (buf.remaining() < 8)
- return false;
++ threadId = commState.getLong("threadId");
+
- timeout = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 18:
- byte[] txNodesBytes0 = commState.getByteArray();
++ timeout = commState.getLong("timeout");
+
- if (txNodesBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- txNodesBytes = txNodesBytes0;
-
+ commState.idx++;
+
+ case 19:
- if (buf.remaining() < 4)
- return false;
++ txNodesBytes = commState.getByteArray("txNodesBytes");
+
- txSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 20:
++ txSize = commState.getInt("txSize");
++
++ if (!commState.lastRead())
++ return false;
++
++ commState.idx++;
++
++ case 21:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (writesBytes == null)
+ writesBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ writesBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
- case 21:
- if (buf.remaining() < 1)
- return false;
-
- sys = commState.getBoolean();
-
- commState.idx++;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 26;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this,
+ "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 0000000,837f8b0..b61c667
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@@ -1,0 -1,251 +1,247 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Response to prepare request.
+ */
+ public class GridDistributedTxPrepareResponse<K, V> extends GridDistributedBaseMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Collections of local lock candidates. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Map<K, Collection<GridCacheMvccCandidate<K>>> cands;
+
+ /** */
+ private byte[] candsBytes;
+
+ /** Error. */
+ @GridToStringExclude
+ @GridDirectTransient
+ private Throwable err;
+
+ /** Serialized error. */
+ private byte[] errBytes;
+
+ /**
+ * Empty constructor (required by {@link Externalizable}).
+ */
+ public GridDistributedTxPrepareResponse() {
+ /* No-op. */
+ }
+
+ /**
+ * @param xid Transaction ID.
+ */
+ public GridDistributedTxPrepareResponse(GridCacheVersion xid) {
+ super(xid, 0);
+ }
+
+ /**
+ * @param xid Lock ID.
+ * @param err Error.
+ */
+ public GridDistributedTxPrepareResponse(GridCacheVersion xid, Throwable err) {
+ super(xid, 0);
+
+ this.err = err;
+ }
+
+ /**
+ * @return Error.
+ */
+ public Throwable error() {
+ return err;
+ }
+
+ /**
+ * @param err Error to set.
+ */
+ public void error(Throwable err) {
+ this.err = err;
+ }
+
+ /**
+ * @return Rollback flag.
+ */
+ public boolean isRollback() {
+ return err != null;
+ }
+
+ /**
+ * @param cands Candidates map to set.
+ */
+ public void candidates(Map<K, Collection<GridCacheMvccCandidate<K>>> cands) {
+ this.cands = cands;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (candsBytes == null && cands != null) {
+ if (ctx.deploymentEnabled()) {
+ for (K k : cands.keySet())
+ prepareObject(k, ctx);
+ }
+
+ candsBytes = CU.marshal(ctx, cands);
+ }
+
+ if (err != null)
+ errBytes = ctx.marshaller().marshal(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (candsBytes != null && cands == null)
+ cands = ctx.marshaller().unmarshal(candsBytes, ldr);
+
+ if (errBytes != null)
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
+ }
+
+ /**
+ *
+ * @param key Candidates key.
+ * @return Collection of lock candidates at given index.
+ */
+ @Nullable public Collection<GridCacheMvccCandidate<K>> candidatesForKey(K key) {
+ assert key != null;
+
+ if (cands == null)
+ return null;
+
+ return cands.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors",
+ "OverriddenMethodCallDuringObjectConstruction"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridDistributedTxPrepareResponse _clone = new GridDistributedTxPrepareResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDistributedTxPrepareResponse _clone = (GridDistributedTxPrepareResponse)_msg;
+
+ _clone.cands = cands;
+ _clone.candsBytes = candsBytes;
+ _clone.err = err;
+ _clone.errBytes = errBytes;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 8:
- if (!commState.putByteArray(candsBytes))
++ if (!commState.putByteArray("candsBytes", candsBytes))
+ return false;
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putByteArray(errBytes))
++ if (!commState.putByteArray("errBytes", errBytes))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 8:
- byte[] candsBytes0 = commState.getByteArray();
++ candsBytes = commState.getByteArray("candsBytes");
+
- if (candsBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- candsBytes = candsBytes0;
-
+ commState.idx++;
+
+ case 9:
- byte[] errBytes0 = commState.getByteArray();
++ errBytes = commState.getByteArray("errBytes");
+
- if (errBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- errBytes = errBytes0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 27;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return GridToStringBuilder.toString(GridDistributedTxPrepareResponse.class, this, "err",
+ err == null ? "null" : err.toString(), "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
index 0000000,7dcabd1..9971d88
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
@@@ -1,0 -1,239 +1,239 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Lock request message.
+ */
+ public class GridDistributedUnlockRequest<K, V> extends GridDistributedBaseMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Keys to unlock. */
+ @GridDirectCollection(byte[].class)
+ private List<byte[]> keyBytes;
+
+ /** Keys. */
+ @GridDirectTransient
+ private List<K> keys;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridDistributedUnlockRequest() {
+ /* No-op. */
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param keyCnt Key count.
+ */
+ public GridDistributedUnlockRequest(int cacheId, int keyCnt) {
+ super(keyCnt);
+
+ this.cacheId = cacheId;
+ }
+
+ /**
+ * @return Key to lock.
+ */
+ public List<byte[]> keyBytes() {
+ return keyBytes;
+ }
+
+ /**
+ * @return Keys.
+ */
+ public List<K> keys() {
+ return keys;
+ }
+
+ /**
+ * @param key Key.
+ * @param bytes Key bytes.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void addKey(K key, byte[] bytes, GridCacheContext<K, V> ctx) throws IgniteCheckedException {
+ boolean depEnabled = ctx.deploymentEnabled();
+
+ if (depEnabled)
+ prepareObject(key, ctx.shared());
+
+ if (keys == null)
+ keys = new ArrayList<>(keysCount());
+
+ keys.add(key);
+
+ if (keyBytes == null)
+ keyBytes = new ArrayList<>(keysCount());
+
+ keyBytes.add(bytes);
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (F.isEmpty(keyBytes) && !F.isEmpty(keys))
+ keyBytes = marshalCollection(keys, ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (keys == null && !F.isEmpty(keyBytes))
+ keys = unmarshalCollection(keyBytes, ctx, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors",
+ "OverriddenMethodCallDuringObjectConstruction"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridDistributedUnlockRequest _clone = new GridDistributedUnlockRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDistributedUnlockRequest _clone = (GridDistributedUnlockRequest)_msg;
+
+ _clone.keyBytes = keyBytes;
+ _clone.keys = keys;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 8:
+ if (keyBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(keyBytes.size()))
++ if (!commState.putInt(null, keyBytes.size()))
+ return false;
+
+ commState.it = keyBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 8:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (keyBytes == null)
+ keyBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ keyBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 28;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDistributedUnlockRequest.class, this, "keyBytesSize",
+ keyBytes == null ? 0 : keyBytes.size(), "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index 0000000,4fffd45..50d8f1c
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@@ -1,0 -1,139 +1,139 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.nio.*;
+
+ /**
+ * Affinity assignment request.
+ */
+ public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Topology version being queried. */
+ private long topVer;
+
+ /**
+ * Empty constructor.
+ */
+ public GridDhtAffinityAssignmentRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param topVer Topology version.
+ */
+ public GridDhtAffinityAssignmentRequest(int cacheId, long topVer) {
+ this.cacheId = cacheId;
+ this.topVer = topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allowForStartup() {
+ return true;
+ }
+
+ /**
+ * @return Requested topology version.
+ */
+ @Override public long topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 79;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridDhtAffinityAssignmentRequest _clone = new GridDhtAffinityAssignmentRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDhtAffinityAssignmentRequest _clone = (GridDhtAffinityAssignmentRequest)_msg;
+
+ _clone.topVer = topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 3:
- if (!commState.putLong(topVer))
++ if (!commState.putLong("topVer", topVer))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 3:
- if (buf.remaining() < 8)
- return false;
++ topVer = commState.getLong("topVer");
+
- topVer = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAffinityAssignmentRequest.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index 0000000,226bd00..669f5d0
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@@ -1,0 -1,196 +1,194 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Affinity assignment response.
+ */
+ public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Topology version. */
+ private long topVer;
+
+ /** Affinity assignment. */
+ @GridDirectTransient
+ @GridToStringInclude
+ private List<List<ClusterNode>> affAssignment;
+
+ /** Affinity assignment bytes. */
+ private byte[] affAssignmentBytes;
+
+ /**
+ * Empty constructor.
+ */
+ public GridDhtAffinityAssignmentResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param topVer Topology version.
+ * @param affAssignment Affinity assignment.
+ */
+ public GridDhtAffinityAssignmentResponse(int cacheId, long topVer, List<List<ClusterNode>> affAssignment) {
+ this.cacheId = cacheId;
+ this.topVer = topVer;
+ this.affAssignment = affAssignment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allowForStartup() {
+ return true;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public long topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Affinity assignment.
+ */
+ public List<List<ClusterNode>> affinityAssignment() {
+ return affAssignment;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 80;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridDhtAffinityAssignmentResponse _clone = new GridDhtAffinityAssignmentResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDhtAffinityAssignmentResponse _clone = (GridDhtAffinityAssignmentResponse)_msg;
+
+ _clone.topVer = topVer;
+ _clone.affAssignment = affAssignment;
+ _clone.affAssignmentBytes = affAssignmentBytes;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (affAssignment != null)
+ affAssignmentBytes = ctx.marshaller().marshal(affAssignment);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (affAssignmentBytes != null)
+ affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 3:
- if (!commState.putByteArray(affAssignmentBytes))
++ if (!commState.putByteArray("affAssignmentBytes", affAssignmentBytes))
+ return false;
+
+ commState.idx++;
+
+ case 4:
- if (!commState.putLong(topVer))
++ if (!commState.putLong("topVer", topVer))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 3:
- byte[] affAssignmentBytes0 = commState.getByteArray();
++ affAssignmentBytes = commState.getByteArray("affAssignmentBytes");
+
- if (affAssignmentBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- affAssignmentBytes = affAssignmentBytes0;
-
+ commState.idx++;
+
+ case 4:
- if (buf.remaining() < 8)
- return false;
++ topVer = commState.getLong("topVer");
+
- topVer = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAffinityAssignmentResponse.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 0000000,483fc2f..b5fe8b3
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@@ -1,0 -1,597 +1,587 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * DHT lock request.
+ */
+ public class GridDhtLockRequest<K, V> extends GridDistributedLockRequest<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Near keys. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private List<K> nearKeys;
+
+ /** Near keys to lock. */
+ @GridToStringExclude
+ @GridDirectCollection(byte[].class)
+ private List<byte[]> nearKeyBytes;
+
+ /** Invalidate reader flags. */
+ private BitSet invalidateEntries;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Owner mapped version, if any. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Map<K, GridCacheVersion> owned;
+
+ /** Owner mapped version bytes. */
+ private byte[] ownedBytes;
+
+ /** Topology version. */
+ private long topVer;
+
+ /** Subject ID. */
+ @GridDirectVersion(1)
+ private UUID subjId;
+
+ /** Task name hash. */
+ @GridDirectVersion(2)
+ private int taskNameHash;
+
+ /** Indexes of keys needed to be preloaded. */
+ @GridDirectVersion(3)
+ private BitSet preloadKeys;
+
+ /** TTL for read operation. */
+ private long accessTtl;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public GridDhtLockRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID.
+ * @param nearXidVer Near transaction ID.
+ * @param threadId Thread ID.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ * @param lockVer Cache version.
+ * @param topVer Topology version.
+ * @param isInTx {@code True} if implicit transaction lock.
+ * @param isRead Indicates whether implicit lock is for read or write operation.
+ * @param isolation Transaction isolation.
+ * @param isInvalidate Invalidation flag.
+ * @param timeout Lock timeout.
+ * @param dhtCnt DHT count.
+ * @param nearCnt Near count.
+ * @param txSize Expected transaction size.
+ * @param grpLockKey Group lock key.
+ * @param partLock {@code True} if partition lock.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param accessTtl TTL for read operation.
+ */
+ public GridDhtLockRequest(
+ int cacheId,
+ UUID nodeId,
+ GridCacheVersion nearXidVer,
+ long threadId,
+ IgniteUuid futId,
+ IgniteUuid miniId,
+ GridCacheVersion lockVer,
+ long topVer,
+ boolean isInTx,
+ boolean isRead,
+ IgniteTxIsolation isolation,
+ boolean isInvalidate,
+ long timeout,
+ int dhtCnt,
+ int nearCnt,
+ int txSize,
+ @Nullable IgniteTxKey grpLockKey,
+ boolean partLock,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ long accessTtl
+ ) {
+ super(cacheId,
+ nodeId,
+ nearXidVer,
+ threadId,
+ futId,
+ lockVer,
+ isInTx,
+ isRead,
+ isolation,
+ isInvalidate,
+ timeout,
+ dhtCnt == 0 ? nearCnt : dhtCnt,
+ txSize,
+ grpLockKey,
+ partLock);
+
+ this.topVer = topVer;
+
+ nearKeyBytes = nearCnt == 0 ? Collections.<byte[]>emptyList() : new ArrayList<byte[]>(nearCnt);
+ nearKeys = nearCnt == 0 ? Collections.<K>emptyList() : new ArrayList<K>(nearCnt);
+ invalidateEntries = new BitSet(dhtCnt == 0 ? nearCnt : dhtCnt);
+
+ assert miniId != null;
+
+ this.miniId = miniId;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.accessTtl = accessTtl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allowForStartup() {
+ return true;
+ }
+
+ /**
+ * @return Near node ID.
+ */
+ public UUID nearNodeId() {
+ return nodeId();
+ }
+
+ /**
+ * @return Subject ID.
+ */
+ public UUID subjectId() {
+ return subjId;
+ }
+
+ /**
+ * @return Task name hash.
+ */
+ public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public long topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Near keys.
+ */
+ public List<byte[]> nearKeyBytes() {
+ return nearKeyBytes == null ? Collections.<byte[]>emptyList() : nearKeyBytes;
+ }
+
+ /**
+ * Adds a Near key.
+ *
+ * @param key Key.
+ * @param keyBytes Key bytes.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void addNearKey(K key, byte[] keyBytes, GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ if (ctx.deploymentEnabled())
+ prepareObject(key, ctx);
+
+ nearKeys.add(key);
+
+ if (keyBytes != null)
+ nearKeyBytes.add(keyBytes);
+ }
+
+ /**
+ * @return Near keys.
+ */
+ public List<K> nearKeys() {
+ return nearKeys == null ? Collections.<K>emptyList() : nearKeys;
+ }
+
+ /**
+ * Adds a DHT key.
+ *
+ * @param key Key.
+ * @param keyBytes Key bytes.
+ * @param writeEntry Write entry.
+ * @param drVer DR version.
+ * @param invalidateEntry Flag indicating whether node should attempt to invalidate reader.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void addDhtKey(
+ K key,
+ byte[] keyBytes,
+ IgniteTxEntry<K, V> writeEntry,
+ @Nullable GridCacheVersion drVer,
+ boolean invalidateEntry,
+ GridCacheContext<K, V> ctx
+ ) throws IgniteCheckedException {
+ invalidateEntries.set(idx, invalidateEntry);
+
+ addKeyBytes(key, keyBytes, writeEntry, false, null, drVer, ctx);
+ }
+
+ /**
+ * Marks last added key for preloading.
+ */
+ public void markLastKeyForPreload() {
+ assert idx > 0;
+
+ if (preloadKeys == null)
+ preloadKeys = new BitSet();
+
+ preloadKeys.set(idx - 1, true);
+ }
+
+ /**
+ * @param idx Key index.
+ * @return {@code True} if need to preload key with given index.
+ */
+ public boolean needPreloadKey(int idx) {
+ return preloadKeys != null && preloadKeys.get(idx);
+ }
+
+ /**
+ * Sets owner and its mapped version.
+ *
+ * @param key Key.
+ * @param keyBytes Key bytes.
+ * @param ownerMapped Owner mapped version.
+ */
+ public void owned(K key, byte[] keyBytes, GridCacheVersion ownerMapped) {
+ if (owned == null)
+ owned = new GridLeanMap<>(3);
+
+ owned.put(key, ownerMapped);
+ }
+
+ /**
+ * @param key Key.
+ * @return Owner and its mapped versions.
+ */
+ @Nullable public GridCacheVersion owned(K key) {
+ return owned == null ? null : owned.get(key);
+ }
+
+ /**
+ * @param idx Entry index to check.
+ * @return {@code True} if near entry should be invalidated.
+ */
+ public boolean invalidateNearEntry(int idx) {
+ return invalidateEntries.get(idx);
+ }
+
+ /**
+ * @return Mini ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return TTL for read operation.
+ */
+ public long accessTtl() {
+ return accessTtl;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ assert F.isEmpty(nearKeys) || !F.isEmpty(nearKeyBytes);
+
+ if (owned != null)
+ ownedBytes = CU.marshal(ctx, owned);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (nearKeys == null && nearKeyBytes != null)
+ nearKeys = unmarshalCollection(nearKeyBytes, ctx, ldr);
+
+ if (ownedBytes != null)
+ owned = ctx.marshaller().unmarshal(ownedBytes, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridDhtLockRequest _clone = new GridDhtLockRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDhtLockRequest _clone = (GridDhtLockRequest)_msg;
+
+ _clone.nearKeys = nearKeys;
+ _clone.nearKeyBytes = nearKeyBytes;
+ _clone.invalidateEntries = invalidateEntries;
+ _clone.miniId = miniId;
+ _clone.owned = owned;
+ _clone.ownedBytes = ownedBytes;
+ _clone.topVer = topVer;
+ _clone.subjId = subjId;
+ _clone.taskNameHash = taskNameHash;
+ _clone.preloadKeys = preloadKeys;
+ _clone.accessTtl = accessTtl;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 24:
- if (!commState.putBitSet(invalidateEntries))
++ if (!commState.putBitSet("invalidateEntries", invalidateEntries))
+ return false;
+
+ commState.idx++;
+
+ case 25:
- if (!commState.putGridUuid(miniId))
++ if (!commState.putGridUuid("miniId", miniId))
+ return false;
+
+ commState.idx++;
+
+ case 26:
+ if (nearKeyBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(nearKeyBytes.size()))
++ if (!commState.putInt(null, nearKeyBytes.size()))
+ return false;
+
+ commState.it = nearKeyBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 27:
- if (!commState.putByteArray(ownedBytes))
++ if (!commState.putByteArray("ownedBytes", ownedBytes))
+ return false;
+
+ commState.idx++;
+
+ case 28:
- if (!commState.putLong(topVer))
++ if (!commState.putLong("topVer", topVer))
+ return false;
+
+ commState.idx++;
+
+ case 29:
- if (!commState.putUuid(subjId))
++ if (!commState.putUuid("subjId", subjId))
+ return false;
+
+ commState.idx++;
+
+ case 30:
- if (!commState.putInt(taskNameHash))
++ if (!commState.putInt("taskNameHash", taskNameHash))
+ return false;
+
+ commState.idx++;
+
+ case 31:
- if (!commState.putBitSet(preloadKeys))
++ if (!commState.putBitSet("preloadKeys", preloadKeys))
+ return false;
+
+ commState.idx++;
+
+ case 32:
+ if (!commState.putLong(accessTtl))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 24:
- BitSet invalidateEntries0 = commState.getBitSet();
++ invalidateEntries = commState.getBitSet("invalidateEntries");
+
- if (invalidateEntries0 == BIT_SET_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- invalidateEntries = invalidateEntries0;
-
+ commState.idx++;
+
+ case 25:
- IgniteUuid miniId0 = commState.getGridUuid();
++ miniId = commState.getGridUuid("miniId");
+
- if (miniId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- miniId = miniId0;
-
+ commState.idx++;
+
+ case 26:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (nearKeyBytes == null)
+ nearKeyBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ nearKeyBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 27:
- byte[] ownedBytes0 = commState.getByteArray();
++ ownedBytes = commState.getByteArray("ownedBytes");
+
- if (ownedBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- ownedBytes = ownedBytes0;
-
+ commState.idx++;
+
+ case 28:
- if (buf.remaining() < 8)
- return false;
++ topVer = commState.getLong("topVer");
+
- topVer = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 29:
- UUID subjId0 = commState.getUuid();
++ subjId = commState.getUuid("subjId");
+
- if (subjId0 == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- subjId = subjId0;
-
+ commState.idx++;
+
+ case 30:
- if (buf.remaining() < 4)
- return false;
++ taskNameHash = commState.getInt("taskNameHash");
+
- taskNameHash = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 31:
- BitSet preloadKeys0 = commState.getBitSet();
++ preloadKeys = commState.getBitSet("preloadKeys");
+
- if (preloadKeys0 == BIT_SET_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- preloadKeys = preloadKeys0;
-
+ commState.idx++;
+
+ case 32:
+ if (buf.remaining() < 8)
+ return false;
+
+ accessTtl = commState.getLong();
+
+ commState.idx++;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 29;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtLockRequest.class, this, "nearKeyBytesSize", nearKeyBytes.size(),
+ "super", super.toString());
+ }
+ }