You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:36 UTC
[44/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 0000000,85b07bf..6a46bee
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@@ -1,0 -1,734 +1,732 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.atomic.*;
+
+ /**
+ * Parent of all cache messages.
+ */
+ public abstract class GridCacheMessage<K, V> extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Maximum number of cache lookup indexes. */
+ public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 256;
+
+ /** Cache message index field name. */
+ public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX";
+
+ /** Message index id. */
+ private static final AtomicInteger msgIdx = new AtomicInteger();
+
+ /** Null message ID. */
+ private static final long NULL_MSG_ID = -1;
+
+ /** ID of this message. */
+ private long msgId = NULL_MSG_ID;
+
+ /** */
+ @GridToStringInclude
+ private GridDeploymentInfoBean depInfo;
+
+ /** */
+ @GridDirectTransient
+ private Exception err;
+
+ /** */
+ @GridDirectTransient
+ private boolean skipPrepare;
+
+ /** Cache ID. */
+ protected int cacheId;
+
+ /**
+ * Gets next ID for indexed message ID.
+ *
+ * @return Message ID.
+ */
+ public static int nextIndexId() {
+ return msgIdx.getAndIncrement();
+ }
+
+ /**
+ * @return {@code True} if this message is preloader message.
+ */
+ public boolean allowForStartup() {
+ return false;
+ }
+
+ /**
+ * @return If this is a transactional message.
+ */
+ public boolean transactional() {
+ return false;
+ }
+
+ /**
+ * @return {@code True} if class loading errors should be ignored, false otherwise.
+ */
+ public boolean ignoreClassErrors() {
+ return false;
+ }
+
+ /**
+ * Gets message lookup index. All messages that does not return -1 in this method must return a unique
+ * number in range from 0 to {@link #MAX_CACHE_MSG_LOOKUP_INDEX}.
+ *
+ * @return Message lookup index.
+ */
+ public int lookupIndex() {
+ return -1;
+ }
+
+ /**
+ * If class loading error occurred during unmarshalling and {@link #ignoreClassErrors()} is
+ * set to {@code true}, then the error will be passed into this method.
+ *
+ * @param err Error.
+ */
+ public void onClassError(Exception err) {
+ this.err = err;
+ }
+
+ /**
+ * @return Error set via {@link #onClassError(Exception)} method.
+ */
+ public Exception classError() {
+ return err;
+ }
+
+ /**
+ * @return Message ID.
+ */
+ public long messageId() {
+ return msgId;
+ }
+
+ /**
+ * Sets message ID. This method is package protected and is only called
+ * by {@link GridCacheIoManager}.
+ *
+ * @param msgId New message ID.
+ */
+ void messageId(long msgId) {
+ this.msgId = msgId;
+ }
+
+ /**
+ * @return Cache ID.
+ */
+ public int cacheId() {
+ return cacheId;
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ */
+ public void cacheId(int cacheId) {
+ this.cacheId = cacheId;
+ }
+
+ /**
+ * Gets topology version or -1 in case of topology version is not required for this message.
+ *
+ * @return Topology version.
+ */
+ public long topologyVersion() {
+ return -1;
+ }
+
+ /**
+ * @param filters Predicate filters.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected final void prepareFilter(@Nullable IgnitePredicate<CacheEntry<K, V>>[] filters,
+ GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ if (filters != null)
+ for (IgnitePredicate filter : filters)
+ prepareObject(filter, ctx);
+ }
+
+ /**
+ * @param o Object to prepare for marshalling.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected final void prepareObject(@Nullable Object o, GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ if (!skipPrepare && o != null) {
+ GridDeploymentInfo d = ctx.deploy().globalDeploymentInfo();
+
+ if (d != null) {
+ prepare(d);
+
+ // Global deployment has been injected.
+ skipPrepare = true;
+ }
+ else {
+ Class<?> cls = U.detectClass(o);
+
+ ctx.deploy().registerClass(cls);
+
+ ClassLoader ldr = U.detectClassLoader(cls);
+
+ if (ldr instanceof GridDeploymentInfo)
+ prepare((GridDeploymentInfo)ldr);
+ }
+ }
+ }
+
+ /**
+ * @param col Collection of objects to prepare for marshalling.
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected final void prepareObjects(@Nullable Iterable<?> col, GridCacheSharedContext<K, V> ctx)
+ throws IgniteCheckedException {
+ if (col != null)
+ for (Object o : col)
+ prepareObject(o, ctx);
+ }
+
+ /**
+ * @param depInfo Deployment to set.
+ * @see GridCacheDeployable#prepare(GridDeploymentInfo)
+ */
+ public final void prepare(GridDeploymentInfo depInfo) {
+ if (depInfo != this.depInfo) {
+ if (this.depInfo != null && depInfo instanceof GridDeployment)
+ // Make sure not to replace remote deployment with local.
+ if (((GridDeployment)depInfo).local())
+ return;
+
+ this.depInfo = depInfo instanceof GridDeploymentInfoBean ?
+ (GridDeploymentInfoBean)depInfo : new GridDeploymentInfoBean(depInfo);
+ }
+ }
+
+ /**
+ * @return Preset deployment info.
+ * @see GridCacheDeployable#deployInfo()
+ */
+ public GridDeploymentInfo deployInfo() {
+ return depInfo;
+ }
+
+ /**
+ * This method is called before the whole message is serialized
+ * and is responsible for pre-marshalling state.
+ *
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /**
+ * This method is called after the message is deserialized and is responsible for
+ * unmarshalling state marshalled in {@link #prepareMarshal(GridCacheSharedContext)} method.
+ *
+ * @param ctx Context.
+ * @param ldr Class loader.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /**
+ * @param info Entry to marshal.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected final void marshalInfo(GridCacheEntryInfo<K, V> info, GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ assert ctx != null;
+
+ if (info != null) {
+ info.marshal(ctx);
+
+ if (ctx.deploymentEnabled()) {
+ prepareObject(info.key(), ctx);
+ prepareObject(info.value(), ctx);
+ }
+ }
+ }
+
+ /**
+ * @param info Entry to unmarshal.
+ * @param ctx Context.
+ * @param ldr Loader.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected final void unmarshalInfo(GridCacheEntryInfo<K, V> info, GridCacheContext<K, V> ctx,
+ ClassLoader ldr) throws IgniteCheckedException {
+ assert ldr != null;
+ assert ctx != null;
+
+ if (info != null)
+ info.unmarshal(ctx, ldr);
+ }
+
+ /**
+ * @param infos Entries to marshal.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected final void marshalInfos(Iterable<? extends GridCacheEntryInfo<K, V>> infos, GridCacheSharedContext<K, V> ctx)
+ throws IgniteCheckedException {
+ assert ctx != null;
+
+ if (infos != null)
+ for (GridCacheEntryInfo<K, V> e : infos)
+ marshalInfo(e, ctx);
+ }
+
+ /**
+ * @param infos Entries to unmarshal.
+ * @param ctx Context.
+ * @param ldr Loader.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected final void unmarshalInfos(Iterable<? extends GridCacheEntryInfo<K, V>> infos,
+ GridCacheContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ assert ldr != null;
+ assert ctx != null;
+
+ if (infos != null)
+ for (GridCacheEntryInfo<K, V> e : infos)
+ unmarshalInfo(e, ctx, ldr);
+ }
+
+ /**
+ * @param txEntries Entries to marshal.
+ * @param ctx Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected final void marshalTx(Iterable<IgniteTxEntry<K, V>> txEntries, GridCacheSharedContext<K, V> ctx)
+ throws IgniteCheckedException {
+ assert ctx != null;
+
+ if (txEntries != null) {
+ boolean transferExpiry = transferExpiryPolicy();
+
+ for (IgniteTxEntry<K, V> e : txEntries) {
+ e.marshal(ctx, transferExpiry);
+
+ if (ctx.deploymentEnabled()) {
+ prepareObject(e.key(), ctx);
+ prepareObject(e.value(), ctx);
+ prepareFilter(e.filters(), ctx);
+ }
+ }
+ }
+ }
+
+ /**
+ * @return {@code True} if entries expire policy should be marshalled.
+ */
+ protected boolean transferExpiryPolicy() {
+ return false;
+ }
+
+ /**
+ * @param txEntries Entries to unmarshal.
+ * @param ctx Context.
+ * @param ldr Loader.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected final void unmarshalTx(Iterable<IgniteTxEntry<K, V>> txEntries,
+ boolean near,
+ GridCacheSharedContext<K, V> ctx,
+ ClassLoader ldr) throws IgniteCheckedException {
+ assert ldr != null;
+ assert ctx != null;
+
+ if (txEntries != null) {
+ for (IgniteTxEntry<K, V> e : txEntries)
+ e.unmarshal(ctx, near, ldr);
+ }
+ }
+
+ /**
+ * @param args Arguments to marshal.
+ * @param ctx Context.
+ * @return Marshalled collection.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected final byte[][] marshalInvokeArguments(@Nullable Object[] args,
+ GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ assert ctx != null;
+
+ if (args == null || args.length == 0)
+ return null;
+
+ byte[][] argsBytes = new byte[args.length][];
+
+ for (int i = 0; i < args.length; i++) {
+ Object arg = args[i];
+
+ if (ctx.deploymentEnabled())
+ prepareObject(arg, ctx);
+
+ argsBytes[i] = arg == null ? null : CU.marshal(ctx, arg);
+ }
+
+ return argsBytes;
+ }
+
+
+ /**
+ * @param byteCol Collection to unmarshal.
+ * @param ctx Context.
+ * @param ldr Loader.
+ * @return Unmarshalled collection.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected final Object[] unmarshalInvokeArguments(@Nullable byte[][] byteCol,
+ GridCacheSharedContext<K, V> ctx,
+ ClassLoader ldr) throws IgniteCheckedException {
+ assert ldr != null;
+ assert ctx != null;
+
+ if (byteCol == null)
+ return null;
+
+ Object[] args = new Object[byteCol.length];
+
+ IgniteMarshaller marsh = ctx.marshaller();
+
+ for (int i = 0; i < byteCol.length; i++)
+ args[i] = byteCol[i] == null ? null : marsh.unmarshal(byteCol[i], ldr);
+
+ return args;
+ }
+
+ /**
+ * @param filter Collection to marshal.
+ * @param ctx Context.
+ * @return Marshalled collection.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected final <T> byte[][] marshalFilter(@Nullable IgnitePredicate<CacheEntry<K, V>>[] filter,
+ GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ assert ctx != null;
+
+ if (filter == null)
+ return null;
+
+ byte[][] filterBytes = new byte[filter.length][];
+
+ for (int i = 0; i < filter.length; i++) {
+ IgnitePredicate<CacheEntry<K, V>> p = filter[i];
+
+ if (ctx.deploymentEnabled())
+ prepareObject(p, ctx);
+
+ filterBytes[i] = p == null ? null : CU.marshal(ctx, p);
+ }
+
+ return filterBytes;
+ }
+
+ /**
+ * @param byteCol Collection to unmarshal.
+ * @param ctx Context.
+ * @param ldr Loader.
+ * @return Unmarshalled collection.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings({"unchecked"})
+ @Nullable protected final <T> IgnitePredicate<CacheEntry<K, V>>[] unmarshalFilter(
+ @Nullable byte[][] byteCol, GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ assert ldr != null;
+ assert ctx != null;
+
+ if (byteCol == null)
+ return null;
+
+ IgnitePredicate<CacheEntry<K, V>>[] filter = new IgnitePredicate[byteCol.length];
+
+ IgniteMarshaller marsh = ctx.marshaller();
+
+ for (int i = 0; i < byteCol.length; i++)
+ filter[i] = byteCol[i] == null ? null :
+ marsh.<IgnitePredicate<CacheEntry<K, V>>>unmarshal(byteCol[i], ldr);
+
+ return filter;
+ }
+
+ /**
+ * @param col Values collection to marshal.
+ * @param ctx Context.
+ * @return Marshaled collection.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected List<GridCacheValueBytes> marshalValuesCollection(@Nullable Collection<?> col,
+ GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ assert ctx != null;
+
+ if (col == null)
+ return null;
+
+ List<GridCacheValueBytes> byteCol = new ArrayList<>(col.size());
+
+ for (Object o : col) {
+ if (ctx.deploymentEnabled())
+ prepareObject(o, ctx);
+
+ byteCol.add(o == null ? null : o instanceof byte[] ? GridCacheValueBytes.plain(o) :
+ GridCacheValueBytes.marshaled(CU.marshal(ctx, o)));
+ }
+
+ return byteCol;
+ }
+
+ /**
+ * @param byteCol Collection to unmarshal.
+ * @param ctx Context.
+ * @param ldr Loader.
+ * @return Unmarshalled collection.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected <T> List<T> unmarshalValueBytesCollection(@Nullable Collection<GridCacheValueBytes> byteCol,
+ GridCacheSharedContext<K, V> ctx, ClassLoader ldr)
+ throws IgniteCheckedException {
+ assert ldr != null;
+ assert ctx != null;
+
+ if (byteCol == null)
+ return null;
+
+ List<T> col = new ArrayList<>(byteCol.size());
+
+ IgniteMarshaller marsh = ctx.marshaller();
+
+ for (GridCacheValueBytes item : byteCol) {
+ assert item == null || item.get() != null;
+
+ col.add(item != null ? item.isPlain() ? (T)item.get() : marsh.<T>unmarshal(item.get(), ldr) : null);
+ }
+
+ return col;
+ }
+
+ /**
+ * @param col Collection to marshal.
+ * @param ctx Context.
+ * @return Marshalled collection.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected List<byte[]> marshalCollection(@Nullable Collection<?> col,
+ GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ assert ctx != null;
+
+ if (col == null)
+ return null;
+
+ List<byte[]> byteCol = new ArrayList<>(col.size());
+
+ for (Object o : col) {
+ if (ctx.deploymentEnabled())
+ prepareObject(o, ctx);
+
+ byteCol.add(o == null ? null : CU.marshal(ctx, o));
+ }
+
+ return byteCol;
+ }
+
+ /**
+ * @param byteCol Collection to unmarshal.
+ * @param ctx Context.
+ * @param ldr Loader.
+ * @return Unmarshalled collection.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected <T> List<T> unmarshalCollection(@Nullable Collection<byte[]> byteCol,
+ GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ assert ldr != null;
+ assert ctx != null;
+
+ if (byteCol == null)
+ return null;
+
+ List<T> col = new ArrayList<>(byteCol.size());
+
+ IgniteMarshaller marsh = ctx.marshaller();
+
+ for (byte[] bytes : byteCol)
+ col.add(bytes == null ? null : marsh.<T>unmarshal(bytes, ldr));
+
+ return col;
+ }
+
+ /**
+ * @param map Map to marshal.
+ * @param ctx Context.
+ * @return Marshalled map.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("TypeMayBeWeakened") // Don't weaken type to clearly see that it's linked hash map.
+ @Nullable protected final LinkedHashMap<byte[], Boolean> marshalBooleanLinkedMap(
+ @Nullable LinkedHashMap<?, Boolean> map, GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ assert ctx != null;
+
+ if (map == null)
+ return null;
+
+ LinkedHashMap<byte[], Boolean> byteMap = U.newLinkedHashMap(map.size());
+
+ for (Map.Entry<?, Boolean> e : map.entrySet()) {
+ if (ctx.deploymentEnabled())
+ prepareObject(e.getKey(), ctx);
+
+ byteMap.put(CU.marshal(ctx, e.getKey()), e.getValue());
+ }
+
+ return byteMap;
+ }
+
+ /**
+ * @param byteMap Map to unmarshal.
+ * @param ctx Context.
+ * @param ldr Loader.
+ * @return Unmarshalled map.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable protected final <K1> LinkedHashMap<K1, Boolean> unmarshalBooleanLinkedMap(
+ @Nullable Map<byte[], Boolean> byteMap, GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ assert ldr != null;
+ assert ctx != null;
+
+ if (byteMap == null)
+ return null;
+
+ LinkedHashMap<K1, Boolean> map = U.newLinkedHashMap(byteMap.size());
+
+ IgniteMarshaller marsh = ctx.marshaller();
+
+ for (Map.Entry<byte[], Boolean> e : byteMap.entrySet())
+ map.put(marsh.<K1>unmarshal(e.getKey(), ldr), e.getValue());
+
+ return map;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridCacheMessage _clone = (GridCacheMessage)_msg;
+
+ _clone.msgId = msgId;
+ _clone.depInfo = depInfo != null ? (GridDeploymentInfoBean)depInfo.clone() : null;
+ _clone.err = err;
+ _clone.skipPrepare = skipPrepare;
+ _clone.cacheId = cacheId;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putInt(cacheId))
++ if (!commState.putInt("cacheId", cacheId))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putMessage(depInfo))
++ if (!commState.putMessage("depInfo", depInfo))
+ return false;
+
+ commState.idx++;
+
+ case 2:
- if (!commState.putLong(msgId))
++ if (!commState.putLong("msgId", msgId))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- if (buf.remaining() < 4)
- return false;
++ cacheId = commState.getInt("cacheId");
+
- cacheId = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 1:
- Object depInfo0 = commState.getMessage();
++ depInfo = (GridDeploymentInfoBean)commState.getMessage("depInfo");
+
- if (depInfo0 == MSG_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- depInfo = (GridDeploymentInfoBean)depInfo0;
-
+ commState.idx++;
+
+ case 2:
- if (buf.remaining() < 8)
- return false;
++ msgId = commState.getLong("msgId");
+
- msgId = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheMessage.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
index 0000000,73ad5d0..5fcef9f
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
@@@ -1,0 -1,227 +1,221 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Message sent to check that transactions related to some optimistic transaction
+ * were prepared on remote node.
+ */
+ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends GridDistributedBaseMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Near transaction ID. */
+ private GridCacheVersion nearXidVer;
+
+ /** Expected number of transactions on node. */
+ private int txNum;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ public GridCacheOptimisticCheckPreparedTxRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param tx Transaction.
+ * @param txNum Expected number of transactions on remote node.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ */
+ public GridCacheOptimisticCheckPreparedTxRequest(IgniteTxEx<K, V> tx, int txNum, IgniteUuid futId, IgniteUuid miniId) {
+ super(tx.xidVersion(), 0);
+
+ nearXidVer = tx.nearXidVersion();
+ this.futId = futId;
+ this.miniId = miniId;
+ this.txNum = txNum;
+ }
+
+ /**
+ * @return Near version.
+ */
+ public GridCacheVersion nearXidVersion() {
+ return nearXidVer;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return Expected number of transactions on node.
+ */
+ public int transactions() {
+ return txNum;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridCacheOptimisticCheckPreparedTxRequest _clone = new GridCacheOptimisticCheckPreparedTxRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridCacheOptimisticCheckPreparedTxRequest _clone = (GridCacheOptimisticCheckPreparedTxRequest)_msg;
+
+ _clone.futId = futId;
+ _clone.miniId = miniId;
+ _clone.nearXidVer = nearXidVer;
+ _clone.txNum = txNum;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 8:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid("futId", futId))
+ return false;
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putGridUuid(miniId))
++ if (!commState.putGridUuid("miniId", miniId))
+ return false;
+
+ commState.idx++;
+
+ case 10:
- if (!commState.putCacheVersion(nearXidVer))
++ if (!commState.putCacheVersion("nearXidVer", nearXidVer))
+ return false;
+
+ commState.idx++;
+
+ case 11:
- if (!commState.putInt(txNum))
++ if (!commState.putInt("txNum", txNum))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 8:
- IgniteUuid futId0 = commState.getGridUuid();
++ futId = commState.getGridUuid("futId");
+
- if (futId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futId = futId0;
-
+ commState.idx++;
+
+ case 9:
- IgniteUuid miniId0 = commState.getGridUuid();
++ miniId = commState.getGridUuid("miniId");
+
- if (miniId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- miniId = miniId0;
-
+ commState.idx++;
+
+ case 10:
- GridCacheVersion nearXidVer0 = commState.getCacheVersion();
++ nearXidVer = commState.getCacheVersion("nearXidVer");
+
- if (nearXidVer0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- nearXidVer = nearXidVer0;
-
+ commState.idx++;
+
+ case 11:
- if (buf.remaining() < 4)
- return false;
++ txNum = commState.getInt("txNum");
+
- txNum = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 18;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheOptimisticCheckPreparedTxRequest.class, this, "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
index 0000000,f0e03e1..e2a66fc
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxResponse.java
@@@ -1,0 -1,198 +1,194 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Check prepared transactions response.
+ */
+ public class GridCacheOptimisticCheckPreparedTxResponse<K, V> extends GridDistributedBaseMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Flag indicating if all remote transactions were prepared. */
+ private boolean success;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ public GridCacheOptimisticCheckPreparedTxResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param txId Transaction ID.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ * @param success {@code True} if all remote transactions were prepared, {@code false} otherwise.
+ */
+ public GridCacheOptimisticCheckPreparedTxResponse(GridCacheVersion txId, IgniteUuid futId, IgniteUuid miniId,
+ boolean success) {
+ super(txId, 0);
+
+ this.futId = futId;
+ this.miniId = miniId;
+ this.success = success;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return {@code True} if all remote transactions were prepared.
+ */
+ public boolean success() {
+ return success;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridCacheOptimisticCheckPreparedTxResponse _clone = new GridCacheOptimisticCheckPreparedTxResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridCacheOptimisticCheckPreparedTxResponse _clone = (GridCacheOptimisticCheckPreparedTxResponse)_msg;
+
+ _clone.futId = futId;
+ _clone.miniId = miniId;
+ _clone.success = success;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 8:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid("futId", futId))
+ return false;
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putGridUuid(miniId))
++ if (!commState.putGridUuid("miniId", miniId))
+ return false;
+
+ commState.idx++;
+
+ case 10:
- if (!commState.putBoolean(success))
++ if (!commState.putBoolean("success", success))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 8:
- IgniteUuid futId0 = commState.getGridUuid();
++ futId = commState.getGridUuid("futId");
+
- if (futId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futId = futId0;
-
+ commState.idx++;
+
+ case 9:
- IgniteUuid miniId0 = commState.getGridUuid();
++ miniId = commState.getGridUuid("miniId");
+
- if (miniId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- miniId = miniId0;
-
+ commState.idx++;
+
+ case 10:
- if (buf.remaining() < 1)
- return false;
++ success = commState.getBoolean("success");
+
- success = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 19;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheOptimisticCheckPreparedTxResponse.class, this, "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
index 0000000,2aae468..e2d45d7
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
@@@ -1,0 -1,292 +1,284 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Message sent to check that transactions related to some pessimistic transaction
+ * were prepared on remote node.
+ */
+ public class GridCachePessimisticCheckCommittedTxRequest<K, V> extends GridDistributedBaseMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Near transaction ID. */
+ private GridCacheVersion nearXidVer;
+
+ /** Originating node ID. */
+ private UUID originatingNodeId;
+
+ /** Originating thread ID. */
+ private long originatingThreadId;
+
+ /** Flag indicating that this is near-only check. */
+ @GridDirectVersion(1)
+ private boolean nearOnlyCheck;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ public GridCachePessimisticCheckCommittedTxRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param tx Transaction.
+ * @param originatingThreadId Originating thread ID.
+ * @param futId Future ID.
+ */
+ public GridCachePessimisticCheckCommittedTxRequest(IgniteTxEx<K, V> tx, long originatingThreadId, IgniteUuid futId,
+ boolean nearOnlyCheck) {
+ super(tx.xidVersion(), 0);
+
+ this.futId = futId;
+ this.nearOnlyCheck = nearOnlyCheck;
+
+ nearXidVer = tx.nearXidVersion();
+ originatingNodeId = tx.eventNodeId();
+ this.originatingThreadId = originatingThreadId;
+ }
+
+ /**
+ * @return Near version.
+ */
+ public GridCacheVersion nearXidVersion() {
+ return nearXidVer;
+ }
+
+ /**
+ * @return Tx originating node ID.
+ */
+ public UUID originatingNodeId() {
+ return originatingNodeId;
+ }
+
+ /**
+ * @return Tx originating thread ID.
+ */
+ public long originatingThreadId() {
+ return originatingThreadId;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @param miniId Mini ID to set.
+ */
+ public void miniId(IgniteUuid miniId) {
+ this.miniId = miniId;
+ }
+
+ /**
+ * @return Flag indicating that this request was sent only to near node. If this flag is set, no finalizing
+ * will be executed on receiving (near) node since this is a user node.
+ */
+ public boolean nearOnlyCheck() {
+ return nearOnlyCheck;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridCachePessimisticCheckCommittedTxRequest _clone = new GridCachePessimisticCheckCommittedTxRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridCachePessimisticCheckCommittedTxRequest _clone = (GridCachePessimisticCheckCommittedTxRequest)_msg;
+
+ _clone.futId = futId;
+ _clone.miniId = miniId;
+ _clone.nearXidVer = nearXidVer;
+ _clone.originatingNodeId = originatingNodeId;
+ _clone.originatingThreadId = originatingThreadId;
+ _clone.nearOnlyCheck = nearOnlyCheck;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 8:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid("futId", futId))
+ return false;
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putGridUuid(miniId))
++ if (!commState.putGridUuid("miniId", miniId))
+ return false;
+
+ commState.idx++;
+
+ case 10:
- if (!commState.putCacheVersion(nearXidVer))
++ if (!commState.putCacheVersion("nearXidVer", nearXidVer))
+ return false;
+
+ commState.idx++;
+
+ case 11:
- if (!commState.putUuid(originatingNodeId))
++ if (!commState.putUuid("originatingNodeId", originatingNodeId))
+ return false;
+
+ commState.idx++;
+
+ case 12:
- if (!commState.putLong(originatingThreadId))
++ if (!commState.putLong("originatingThreadId", originatingThreadId))
+ return false;
+
+ commState.idx++;
+
+ case 13:
- if (!commState.putBoolean(nearOnlyCheck))
++ if (!commState.putBoolean("nearOnlyCheck", nearOnlyCheck))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 8:
- IgniteUuid futId0 = commState.getGridUuid();
++ futId = commState.getGridUuid("futId");
+
- if (futId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futId = futId0;
-
+ commState.idx++;
+
+ case 9:
- IgniteUuid miniId0 = commState.getGridUuid();
++ miniId = commState.getGridUuid("miniId");
+
- if (miniId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- miniId = miniId0;
-
+ commState.idx++;
+
+ case 10:
- GridCacheVersion nearXidVer0 = commState.getCacheVersion();
++ nearXidVer = commState.getCacheVersion("nearXidVer");
+
- if (nearXidVer0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- nearXidVer = nearXidVer0;
-
+ commState.idx++;
+
+ case 11:
- UUID originatingNodeId0 = commState.getUuid();
++ originatingNodeId = commState.getUuid("originatingNodeId");
+
- if (originatingNodeId0 == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- originatingNodeId = originatingNodeId0;
-
+ commState.idx++;
+
+ case 12:
- if (buf.remaining() < 8)
- return false;
++ originatingThreadId = commState.getLong("originatingThreadId");
+
- originatingThreadId = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 13:
- if (buf.remaining() < 1)
- return false;
++ nearOnlyCheck = commState.getBoolean("nearOnlyCheck");
+
- nearOnlyCheck = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 20;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCachePessimisticCheckCommittedTxRequest.class, this, "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
index 0000000,afb45a1..3ac0aac
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
@@@ -1,0 -1,232 +1,226 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Check prepared transactions response.
+ */
+ public class GridCachePessimisticCheckCommittedTxResponse<K, V> extends GridDistributedBaseMessage<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** Committed transaction info. */
+ @GridDirectTransient
+ private GridCacheCommittedTxInfo<K, V> committedTxInfo;
+
+ /** Serialized transaction info. */
+ private byte[] committedTxInfoBytes;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ public GridCachePessimisticCheckCommittedTxResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param txId Transaction ID.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ * @param committedTxInfo Committed transaction info.
+ */
+ public GridCachePessimisticCheckCommittedTxResponse(GridCacheVersion txId, IgniteUuid futId, IgniteUuid miniId,
+ @Nullable GridCacheCommittedTxInfo<K, V> committedTxInfo) {
+ super(txId, 0);
+
+ this.futId = futId;
+ this.miniId = miniId;
+ this.committedTxInfo = committedTxInfo;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return {@code True} if all remote transactions were prepared.
+ */
+ public GridCacheCommittedTxInfo<K, V> committedTxInfo() {
+ return committedTxInfo;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (committedTxInfo != null) {
+ marshalTx(committedTxInfo.recoveryWrites(), ctx);
+
+ committedTxInfoBytes = ctx.marshaller().marshal(committedTxInfo);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (committedTxInfoBytes != null) {
+ committedTxInfo = ctx.marshaller().unmarshal(committedTxInfoBytes, ldr);
+
+ unmarshalTx(committedTxInfo.recoveryWrites(), false, ctx, ldr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridCachePessimisticCheckCommittedTxResponse _clone = new GridCachePessimisticCheckCommittedTxResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridCachePessimisticCheckCommittedTxResponse _clone = (GridCachePessimisticCheckCommittedTxResponse)_msg;
+
+ _clone.futId = futId;
+ _clone.miniId = miniId;
+ _clone.committedTxInfo = committedTxInfo;
+ _clone.committedTxInfoBytes = committedTxInfoBytes;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 8:
- if (!commState.putByteArray(committedTxInfoBytes))
++ if (!commState.putByteArray("committedTxInfoBytes", committedTxInfoBytes))
+ return false;
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid("futId", futId))
+ return false;
+
+ commState.idx++;
+
+ case 10:
- if (!commState.putGridUuid(miniId))
++ if (!commState.putGridUuid("miniId", miniId))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 8:
- byte[] committedTxInfoBytes0 = commState.getByteArray();
++ committedTxInfoBytes = commState.getByteArray("committedTxInfoBytes");
+
- if (committedTxInfoBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- committedTxInfoBytes = committedTxInfoBytes0;
-
+ commState.idx++;
+
+ case 9:
- IgniteUuid futId0 = commState.getGridUuid();
++ futId = commState.getGridUuid("futId");
+
- if (futId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futId = futId0;
-
+ commState.idx++;
+
+ case 10:
- IgniteUuid miniId0 = commState.getGridUuid();
++ miniId = commState.getGridUuid("miniId");
+
- if (miniId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- miniId = miniId0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 21;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCachePessimisticCheckCommittedTxResponse.class, this, "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index 0000000,514fa51..5597a4a
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@@ -1,0 -1,457 +1,451 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Base for all messages in replicated cache.
+ */
+ public abstract class GridDistributedBaseMessage<K, V> extends GridCacheMessage<K, V> implements GridCacheDeployable,
+ GridCacheVersionable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Lock or transaction version. */
+ @GridToStringInclude
+ protected GridCacheVersion ver;
+
+ /**
+ * Candidates for every key ordered in the order of keys. These
+ * can be either local-only candidates in case of lock acquisition,
+ * or pending candidates in case of transaction commit.
+ */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Collection<GridCacheMvccCandidate<K>>[] candsByIdx;
+
+ /** */
+ @GridToStringExclude
+ private byte[] candsByIdxBytes;
+
+ /** Collections of local lock candidates. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Map<K, Collection<GridCacheMvccCandidate<K>>> candsByKey;
+
+ /** Collections of local lock candidates in serialized form. */
+ @GridToStringExclude
+ private byte[] candsByKeyBytes;
+
+ /** Committed versions with order higher than one for this message (needed for commit ordering). */
+ @GridToStringInclude
+ @GridDirectCollection(GridCacheVersion.class)
+ private Collection<GridCacheVersion> committedVers;
+
+ /** Rolled back versions with order higher than one for this message (needed for commit ordering). */
+ @GridToStringInclude
+ @GridDirectCollection(GridCacheVersion.class)
+ private Collection<GridCacheVersion> rolledbackVers;
+
+ /** Count of keys referenced in candidates array (needed only locally for optimization). */
+ @GridToStringInclude
+ @GridDirectTransient
+ private int cnt;
+
+ /**
+ * Empty constructor required by {@link Externalizable}
+ */
+ protected GridDistributedBaseMessage() {
+ /* No-op. */
+ }
+
+ /**
+ * @param cnt Count of keys references in list of candidates.
+ */
+ protected GridDistributedBaseMessage(int cnt) {
+ assert cnt >= 0;
+
+ this.cnt = cnt;
+ }
+
+ /**
+ * @param ver Either lock or transaction version.
+ * @param cnt Key count.
+ */
+ protected GridDistributedBaseMessage(GridCacheVersion ver, int cnt) {
+ this(cnt);
+
+ assert ver != null;
+
+ this.ver = ver;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (candsByIdx != null)
+ candsByIdxBytes = ctx.marshaller().marshal(candsByIdx);
+
+ if (candsByKey != null) {
+ if (ctx.deploymentEnabled()) {
+ for (K key : candsByKey.keySet())
+ prepareObject(key, ctx);
+ }
+
+ candsByKeyBytes = CU.marshal(ctx, candsByKey);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (candsByIdxBytes != null)
+ candsByIdx = ctx.marshaller().unmarshal(candsByIdxBytes, ldr);
+
+ if (candsByKeyBytes != null)
+ candsByKey = ctx.marshaller().unmarshal(candsByKeyBytes, ldr);
+ }
+
+ /**
+ * @return Version.
+ */
+ @Override public GridCacheVersion version() {
+ return ver;
+ }
+
+ /**
+ * @param ver Version.
+ */
+ public void version(GridCacheVersion ver) {
+ this.ver = ver;
+ }
+
+ /**
+ * @param committedVers Committed versions.
+ * @param rolledbackVers Rolled back versions.
+ */
+ public void completedVersions(Collection<GridCacheVersion> committedVers,
+ Collection<GridCacheVersion> rolledbackVers) {
+ this.committedVers = committedVers;
+ this.rolledbackVers = rolledbackVers;
+ }
+
+ /**
+ * @return Committed versions.
+ */
+ public Collection<GridCacheVersion> committedVersions() {
+ return committedVers == null ? Collections.<GridCacheVersion>emptyList() : committedVers;
+ }
+
+ /**
+ * @return Rolled back versions.
+ */
+ public Collection<GridCacheVersion> rolledbackVersions() {
+ return rolledbackVers == null ? Collections.<GridCacheVersion>emptyList() : rolledbackVers;
+ }
+
+ /**
+ * @param idx Key index.
+ * @param candsByIdx List of candidates for that key.
+ */
+ @SuppressWarnings({"unchecked"})
+ public void candidatesByIndex(int idx, Collection<GridCacheMvccCandidate<K>> candsByIdx) {
+ assert idx < cnt;
+
+ // If nothing to add.
+ if (candsByIdx == null || candsByIdx.isEmpty())
+ return;
+
+ if (this.candsByIdx == null)
+ this.candsByIdx = new Collection[cnt];
+
+ this.candsByIdx[idx] = candsByIdx;
+ }
+
+ /**
+ * @param idx Key index.
+ * @return Candidates for given key.
+ */
+ public Collection<GridCacheMvccCandidate<K>> candidatesByIndex(int idx) {
+ return candsByIdx == null || candsByIdx[idx] == null ? Collections.<GridCacheMvccCandidate<K>>emptyList() : candsByIdx[idx];
+ }
+
+ /**
+ * @param key Candidates key.
+ * @param candsByKey Collection of local candidates.
+ */
+ public void candidatesByKey(K key, Collection<GridCacheMvccCandidate<K>> candsByKey) {
+ if (this.candsByKey == null)
+ this.candsByKey = new HashMap<>(1, 1.0f);
+
+ this.candsByKey.put(key, candsByKey);
+ }
+
+ /**
+ *
+ * @param key Candidates key.
+ * @return Collection of lock candidates at given index.
+ */
+ @Nullable public Collection<GridCacheMvccCandidate<K>> candidatesByKey(K key) {
+ assert key != null;
+
+ if (candsByKey == null)
+ return null;
+
+ return candsByKey.get(key);
+ }
+
+ /**
+ * @return Map of candidates.
+ */
+ public Map<K, Collection<GridCacheMvccCandidate<K>>> candidatesByKey() {
+ return candsByKey == null ? Collections.<K, Collection<GridCacheMvccCandidate<K>>>emptyMap() : candsByKey;
+ }
+
+ /**
+ * @return Count of keys referenced in candidates array (needed only locally for optimization).
+ */
+ public int keysCount() {
+ return cnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridDistributedBaseMessage _clone = (GridDistributedBaseMessage)_msg;
+
+ _clone.ver = ver;
+ _clone.candsByIdx = candsByIdx;
+ _clone.candsByIdxBytes = candsByIdxBytes;
+ _clone.candsByKey = candsByKey;
+ _clone.candsByKeyBytes = candsByKeyBytes;
+ _clone.committedVers = committedVers;
+ _clone.rolledbackVers = rolledbackVers;
+ _clone.cnt = cnt;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 3:
- if (!commState.putByteArray(candsByIdxBytes))
++ if (!commState.putByteArray("candsByIdxBytes", candsByIdxBytes))
+ return false;
+
+ commState.idx++;
+
+ case 4:
- if (!commState.putByteArray(candsByKeyBytes))
++ if (!commState.putByteArray("candsByKeyBytes", candsByKeyBytes))
+ return false;
+
+ commState.idx++;
+
+ case 5:
+ if (committedVers != null) {
+ if (commState.it == null) {
- if (!commState.putInt(committedVers.size()))
++ if (!commState.putInt(null, committedVers.size()))
+ return false;
+
+ commState.it = committedVers.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putCacheVersion((GridCacheVersion)commState.cur))
++ if (!commState.putCacheVersion(null, (GridCacheVersion)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 6:
+ if (rolledbackVers != null) {
+ if (commState.it == null) {
- if (!commState.putInt(rolledbackVers.size()))
++ if (!commState.putInt(null, rolledbackVers.size()))
+ return false;
+
+ commState.it = rolledbackVers.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putCacheVersion((GridCacheVersion)commState.cur))
++ if (!commState.putCacheVersion(null, (GridCacheVersion)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 7:
- if (!commState.putCacheVersion(ver))
++ if (!commState.putCacheVersion("ver", ver))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 3:
- byte[] candsByIdxBytes0 = commState.getByteArray();
++ candsByIdxBytes = commState.getByteArray("candsByIdxBytes");
+
- if (candsByIdxBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- candsByIdxBytes = candsByIdxBytes0;
-
+ commState.idx++;
+
+ case 4:
- byte[] candsByKeyBytes0 = commState.getByteArray();
++ candsByKeyBytes = commState.getByteArray("candsByKeyBytes");
+
- if (candsByKeyBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- candsByKeyBytes = candsByKeyBytes0;
-
+ commState.idx++;
+
+ case 5:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (committedVers == null)
+ committedVers = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- GridCacheVersion _val = commState.getCacheVersion();
++ GridCacheVersion _val = commState.getCacheVersion(null);
+
- if (_val == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ committedVers.add((GridCacheVersion)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 6:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (rolledbackVers == null)
+ rolledbackVers = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- GridCacheVersion _val = commState.getCacheVersion();
++ GridCacheVersion _val = commState.getCacheVersion(null);
+
- if (_val == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ rolledbackVers.add((GridCacheVersion)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 7:
- GridCacheVersion ver0 = commState.getCacheVersion();
++ ver = commState.getCacheVersion("ver");
+
- if (ver0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- ver = ver0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDistributedBaseMessage.class, this, "super", super.toString());
+ }
+ }