You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/03 03:03:16 UTC

[07/50] [abbrv] ignite git commit: Merge branch sprint-3 into ignite-264

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 8f4700f,71e173b..9a0044f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@@ -298,7 -303,10 +299,7 @@@ public class GridNearTransactionalCache
                                          req.taskNameHash()
                                      );
  
-                                     tx = ctx.tm().onCreated(tx);
 -                                    if (req.groupLock())
 -                                        tx.groupLockKey(txKey);
 -
+                                     tx = ctx.tm().onCreated(null, tx);
  
                                      if (tx == null || !ctx.tm().onStarted(tx))
                                          throw new IgniteTxRollbackCheckedException("Failed to acquire lock " +
@@@ -661,12 -669,17 +661,12 @@@
              if (map == null || map.isEmpty())
                  return;
  
-             for (Map.Entry<ClusterNode, GridNearUnlockRequest<K, V>> mapping : map.entrySet()) {
 -            Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
 -            Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
 -
+             for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
                  ClusterNode n = mapping.getKey();
  
-                 GridDistributedUnlockRequest<K, V> req = mapping.getValue();
+                 GridDistributedUnlockRequest req = mapping.getValue();
  
-                 if (!F.isEmpty(req.keyBytes()) || !F.isEmpty(req.keys())) {
+                 if (!F.isEmpty(req.keys())) {
 -                    req.completedVersions(committed, rolledback);
 -
                      // We don't wait for reply to this message.
                      ctx.io().send(n, req, ctx.ioPolicy());
                  }

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index e6930d0,5d1a306..a995d47
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@@ -21,9 -21,9 +21,10 @@@ import org.apache.ignite.*
  import org.apache.ignite.cluster.*;
  import org.apache.ignite.internal.*;
  import org.apache.ignite.internal.cluster.*;
+ import org.apache.ignite.internal.processors.affinity.*;
  import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.distributed.*;
 +import org.apache.ignite.internal.processors.cache.distributed.dht.*;
  import org.apache.ignite.internal.processors.cache.transactions.*;
  import org.apache.ignite.internal.processors.cache.version.*;
  import org.apache.ignite.internal.transactions.*;
@@@ -356,100 -321,11 +350,100 @@@ public final class GridNearTxFinishFutu
      }
  
      /**
 +     *
 +     */
 +    private void checkBackup() {
 +        assert mappings.size() <= 1;
 +
 +        for (UUID nodeId : mappings.keySet()) {
 +            Collection<UUID> backups = tx.transactionNodes().get(nodeId);
 +
 +            if (!F.isEmpty(backups)) {
 +                assert backups.size() == 1;
 +
 +                UUID backupId = F.first(backups);
 +
 +                ClusterNode backup = ctx.discovery().node(backupId);
 +
 +                // Nothing to do if backup has left the grid.
 +                if (backup == null)
 +                    return;
 +
 +                MiniFuture mini = new MiniFuture(backup);
 +
 +                add(mini);
 +
 +                GridDhtTxFinishRequest<K, V> finishReq = new GridDhtTxFinishRequest<>(
 +                    cctx.localNodeId(),
 +                    futureId(),
 +                    mini.futureId(),
 +                    tx.topologyVersion(),
 +                    tx.xidVersion(),
 +                    tx.commitVersion(),
 +                    tx.threadId(),
 +                    tx.isolation(),
 +                    true,
 +                    false,
 +                    tx.system(),
 +                    false,
 +                    true,
 +                    true,
 +                    0,
 +                    null,
 +                    0);
 +
 +                finishReq.checkCommitted(true);
 +
 +                try {
 +                    cctx.io().send(backup, finishReq, tx.ioPolicy());
 +                }
 +                catch (ClusterTopologyCheckedException e) {
 +                    mini.onResult(e);
 +                }
 +                catch (IgniteCheckedException e) {
 +                    mini.onResult(e);
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     *
 +     */
 +    private boolean needFinishOnePhase() {
 +        for (Integer cacheId : tx.activeCacheIds()) {
 +            GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
 +
 +            if (cacheCtx.isNear())
 +                return true;
 +        }
 +
 +        return false;
 +    }
 +
 +    /**
 +     *
 +     */
 +    private void finishOnePhase() {
 +        // No need to send messages as transaction was already committed on remote node.
 +        // Finish local mapping only as we need send commit message to backups.
-         for (GridDistributedTxMapping<K, V> m : mappings.values()) {
++        for (GridDistributedTxMapping m : mappings.values()) {
 +            if (m.node().isLocal()) {
 +                IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finishColocatedLocal(commit, tx);
 +
 +                // Add new future.
 +                if (fut != null)
 +                    add(fut);
 +            }
 +        }
 +    }
 +
 +    /**
       * @param mappings Mappings.
       */
-     private void finish(Iterable<GridDistributedTxMapping<K, V>> mappings) {
+     private void finish(Iterable<GridDistributedTxMapping> mappings) {
          // Create mini futures.
-         for (GridDistributedTxMapping<K, V> m : mappings)
+         for (GridDistributedTxMapping m : mappings)
              finish(m);
      }
  
@@@ -536,24 -416,12 +531,15 @@@
  
          /** Keys. */
          @GridToStringInclude
-         private GridDistributedTxMapping<K, V> m;
+         private GridDistributedTxMapping m;
  
 +        /** Backup check flag. */
 +        private ClusterNode backup;
 +
          /**
-          * Empty constructor required for {@link Externalizable}.
-          */
-         public MiniFuture() {
-             // No-op.
-         }
- 
-         /**
           * @param m Mapping.
           */
-         MiniFuture(GridDistributedTxMapping<K, V> m) {
-             super(cctx.kernalContext());
- 
+         MiniFuture(GridDistributedTxMapping m) {
              this.m = m;
          }
  
@@@ -614,9 -471,7 +600,9 @@@
          /**
           * @param res Result callback.
           */
-         void onResult(GridNearTxFinishResponse<K, V> res) {
+         void onResult(GridNearTxFinishResponse res) {
 +            assert backup == null;
 +
              if (res.error() != null)
                  onDone(res.error());
              else

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 6ebcb59,7b0b811..10c19e8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@@ -83,11 -89,15 +86,23 @@@ public class GridNearTxFinishRequest ex
          boolean syncRollback,
          boolean explicitLock,
          boolean storeEnabled,
-         long topVer,
 -        @NotNull AffinityTopologyVersion topVer,
 -        GridCacheVersion baseVer,
 -        Collection<GridCacheVersion> committedVers,
 -        Collection<GridCacheVersion> rolledbackVers,
++        AffinityTopologyVersion topVer,
          int txSize,
          @Nullable UUID subjId,
          int taskNameHash) {
-         super(xidVer, futId, null, threadId, commit, invalidate, sys, syncCommit, syncRollback, txSize);
 -        super(xidVer, futId, null, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer,
 -            committedVers, rolledbackVers, txSize, null);
++        super(
++            xidVer, 
++            futId, 
++            null, 
++            threadId, 
++            commit, 
++            invalidate, 
++            sys,
++            plc,
++            syncCommit, 
++            syncRollback, 
++            txSize
++        );
  
          this.explicitLock = explicitLock;
          this.storeEnabled = storeEnabled;

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index b6bc993,1db4902..b411b99
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@@ -83,11 -80,8 +80,11 @@@ public class GridNearTxLocal extends Gr
      private boolean colocatedLocallyMapped;
  
      /** Info for entries accessed locally in optimistic transaction. */
-     private Map<IgniteTxKey<K>, IgniteCacheExpiryPolicy> accessMap;
+     private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap;
  
 +    /** */
 +    private boolean needCheckBackup;
 +
      /**
       * Empty constructor required for {@link Externalizable}.
       */
@@@ -134,10 -134,13 +133,12 @@@
              timeout,
              invalidate,
              storeEnabled,
 +            false,
              txSize,
 -            grpLockKey,
 -            partLock,
              subjId,
              taskNameHash);
+ 
+         initResult();
      }
  
      /** {@inheritDoc} */
@@@ -276,7 -269,10 +281,7 @@@
      }
  
      /** {@inheritDoc} */
-     @Override public Collection<IgniteTxEntry<K, V>> optimisticLockEntries() {
+     @Override public Collection<IgniteTxEntry> optimisticLockEntries() {
 -        if (groupLock())
 -            return super.optimisticLockEntries();
 -
          return optimisticLockEntries;
      }
  
@@@ -547,13 -549,22 +551,13 @@@
  
      /**
       * @param mapping Mapping to order.
 -     * @param pendingVers Pending versions.
 -     * @param committedVers Committed versions.
 -     * @param rolledbackVers Rolled back versions.
       */
-     void readyNearLocks(GridDistributedTxMapping<K, V> mapping) {
-         Collection<IgniteTxEntry<K, V>> entries = F.concat(false, mapping.reads(), mapping.writes());
 -    void readyNearLocks(GridDistributedTxMapping mapping,
 -        Collection<GridCacheVersion> pendingVers,
 -        Collection<GridCacheVersion> committedVers,
 -        Collection<GridCacheVersion> rolledbackVers)
 -    {
 -        Collection<IgniteTxEntry> entries = groupLock() ?
 -            Collections.singletonList(groupLockEntry()) :
 -            F.concat(false, mapping.reads(), mapping.writes());
++    void readyNearLocks(GridDistributedTxMapping mapping) {
++        Collection<IgniteTxEntry> entries = F.concat(false, mapping.reads(), mapping.writes());
  
-         for (IgniteTxEntry<K, V> txEntry : entries) {
+         for (IgniteTxEntry txEntry : entries) {
              while (true) {
-                 GridCacheContext<K, V> cacheCtx = txEntry.cached().context();
+                 GridCacheContext cacheCtx = txEntry.cached().context();
  
                  assert cacheCtx.isNear();
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index d43e409,4d70afb..b7cdbb5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@@ -196,12 -189,7 +189,12 @@@ public final class GridNearTxPrepareFut
       * @param mappings Remaining mappings.
       * @param e Error.
       */
-     void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping<K, V>> mappings, Throwable e) {
+     void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) {
 +        if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
 +            if (tx.onePhaseCommit())
 +                tx.markForBackupCheck();
 +        }
 +
          if (err.compareAndSet(null, e)) {
              boolean marked = tx.setRollbackOnly();
  
@@@ -441,30 -430,32 +435,26 @@@
      private void prepare0() {
          assert tx.optimistic();
  
 -        try {
 -            prepare(
 -                tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
 -                tx.writeEntries());
 +        prepare(
-             tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry<K, V>>emptyList(),
++            tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
 +            tx.writeEntries());
  
 -            markInitialized();
 -        }
 -        catch (IgniteCheckedException e) {
 -            onDone(e);
 -        }
 +        markInitialized();
      }
  
      /**
       * @param reads Read entries.
       * @param writes Write entries.
 -     * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node.
       */
      private void prepare(
-         Iterable<IgniteTxEntry<K, V>> reads,
-         Iterable<IgniteTxEntry<K, V>> writes
+         Iterable<IgniteTxEntry> reads,
+         Iterable<IgniteTxEntry> writes
 -    ) throws IgniteCheckedException {
 +    ) {
          assert tx.optimistic();
  
-         GridDiscoveryTopologySnapshot snapshot = tx.topologySnapshot();
- 
-         assert snapshot != null;
- 
-         long topVer = snapshot.topologyVersion();
+         AffinityTopologyVersion topVer = tx.topologyVersion();
  
-         assert topVer > 0;
+         assert topVer.topologyVersion() > 0;
  
          txMapping = new GridDhtTxMapping<>();
  
@@@ -739,15 -737,16 +729,15 @@@
       * @param entry Transaction entry.
       * @param topVer Topology version.
       * @param cur Current mapping.
 -     * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
       * @return Mapping.
       */
-     private GridDistributedTxMapping<K, V> map(
-         IgniteTxEntry<K, V> entry,
-         long topVer,
-         GridDistributedTxMapping<K, V> cur,
+     private GridDistributedTxMapping map(
+         IgniteTxEntry entry,
+         AffinityTopologyVersion topVer,
+         GridDistributedTxMapping cur,
          boolean waitLock
 -    ) throws IgniteCheckedException {
 +    ) {
-         GridCacheContext<K, V> cacheCtx = entry.context();
+         GridCacheContext cacheCtx = entry.context();
  
          List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
  
@@@ -763,17 -762,23 +753,17 @@@
                  ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
          }
  
 -        if (tx.groupLock() && !primary.isLocal())
 -            throw new IgniteCheckedException("Failed to prepare group lock transaction (local node is not primary for " +
 -                " key)[key=" + entry.key() + ", primaryNodeId=" + primary.id() + ']');
 -
          // Must re-initialize cached entry while holding topology lock.
          if (cacheCtx.isNear())
-             entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer), entry.keyBytes());
+             entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
          else if (!cacheCtx.isLocal())
-             entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true), entry.keyBytes());
+             entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
          else
-             entry.cached(cacheCtx.local().entryEx(entry.key(), topVer), entry.keyBytes());
+             entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
  
          if (cacheCtx.isNear() || cacheCtx.isLocal()) {
 -            if (waitLock && entry.explicitVersion() == null) {
 -                if (!tx.groupLock() || tx.groupLockKey().equals(entry.txKey()))
 -                    lockKeys.add(entry.txKey());
 -            }
 +            if (waitLock && entry.explicitVersion() == null)
 +                lockKeys.add(entry.txKey());
          }
  
          if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 66653ad,846022c..f345e65
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@@ -93,10 -96,12 +94,10 @@@ public class GridNearTxPrepareRequest e
       */
      public GridNearTxPrepareRequest(
          IgniteUuid futId,
-         long topVer,
-         IgniteInternalTx<K, V> tx,
-         Collection<IgniteTxEntry<K, V>> reads,
-         Collection<IgniteTxEntry<K, V>> writes,
+         AffinityTopologyVersion topVer,
+         IgniteInternalTx tx,
+         Collection<IgniteTxEntry> reads,
+         Collection<IgniteTxEntry> writes,
 -        IgniteTxKey grpLockKey,
 -        boolean partLock,
          boolean near,
          Map<UUID, Collection<UUID>> txNodes,
          boolean last,

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index a119288,2456674..ded5409
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@@ -310,8 -359,14 +336,8 @@@ public class GridNearTxPrepareResponse 
  
                  writer.incrementState();
  
-             case 14:
-                 if (!writer.writeByteArray("retValBytes", retValBytes))
 -            case 17:
 -                if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
 -                    return false;
 -
 -                writer.incrementState();
 -
+             case 18:
+                 if (!writer.writeMessage("retVal", retVal))
                      return false;
  
                  writer.incrementState();
@@@ -380,8 -443,16 +414,8 @@@
  
                  reader.incrementState();
  
-             case 14:
-                 retValBytes = reader.readByteArray("retValBytes");
 -            case 17:
 -                pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
 -
 -                if (!reader.isLastRead())
 -                    return false;
 -
 -                reader.incrementState();
 -
+             case 18:
+                 retVal = reader.readMessage("retVal");
  
                  if (!reader.isLastRead())
                      return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index fd0105e,9969f65..c3a5b1d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@@ -51,8 -52,11 +52,8 @@@ public class GridNearTxRemote extends G
      private GridCacheVersion nearXidVer;
  
      /** Owned versions. */
-     private Map<IgniteTxKey<K>, GridCacheVersion> owned;
+     private Map<IgniteTxKey, GridCacheVersion> owned;
  
 -    /** Group lock flag. */
 -    private boolean grpLock;
 -
      /**
       * Empty constructor required for {@link Externalizable}.
       */
@@@ -92,13 -98,14 +94,27 @@@
          TransactionIsolation isolation,
          boolean invalidate,
          long timeout,
-         Collection<IgniteTxEntry<K, V>> writeEntries,
+         Collection<IgniteTxEntry> writeEntries,
          int txSize,
 -        @Nullable IgniteTxKey grpLockKey,
          @Nullable UUID subjId,
          int taskNameHash
      ) throws IgniteCheckedException {
-         super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
-             subjId, taskNameHash);
 -        super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
 -            txSize, grpLockKey, subjId, taskNameHash);
++        super(
++            ctx, 
++            nodeId, 
++            rmtThreadId, 
++            xidVer, 
++            commitVer, 
++            sys, 
++            plc, 
++            concurrency, 
++            isolation, 
++            invalidate, 
++            timeout, 
++            txSize,
++            subjId, 
++            taskNameHash
++        );
  
          assert nearNodeId != null;
  
@@@ -133,9 -141,10 +150,9 @@@
       * @param timeout Timeout.
       * @param ctx Cache registry.
       * @param txSize Expected transaction size.
 -     * @param grpLockKey Collection of group lock keys if this is a group-lock transaction.
       */
      public GridNearTxRemote(
-         GridCacheSharedContext<K, V> ctx,
+         GridCacheSharedContext ctx,
          UUID nodeId,
          UUID nearNodeId,
          GridCacheVersion nearXidVer,
@@@ -151,8 -161,9 +169,22 @@@
          @Nullable UUID subjId,
          int taskNameHash
      ) {
-         super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
-             subjId, taskNameHash);
 -        super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
 -            txSize, grpLockKey, subjId, taskNameHash);
++        super(
++            ctx, 
++            nodeId, 
++            rmtThreadId, 
++            xidVer, 
++            commitVer,
++            sys,
++            plc,
++            concurrency, 
++            isolation, 
++            invalidate, 
++            timeout, 
++            txSize,
++            subjId, 
++            taskNameHash
++        );
  
          assert nearNodeId != null;
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index e11a5b2,acd3202..90b5ee8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@@ -227,9 -232,10 +230,9 @@@ public abstract class IgniteTxAdapter e
       * @param isolation Isolation.
       * @param timeout Timeout.
       * @param txSize Transaction size.
 -     * @param grpLockKey Group lock key if this is group-lock transaction.
       */
      protected IgniteTxAdapter(
-         GridCacheSharedContext<K, V> cctx,
+         GridCacheSharedContext<?, ?> cctx,
          GridCacheVersion xidVer,
          boolean implicit,
          boolean implicitSingle,
@@@ -284,9 -293,10 +290,9 @@@
       * @param isolation Isolation.
       * @param timeout Timeout.
       * @param txSize Transaction size.
 -     * @param grpLockKey Group lock key if this is group-lock transaction.
       */
      protected IgniteTxAdapter(
-         GridCacheSharedContext<K, V> cctx,
+         GridCacheSharedContext<?, ?> cctx,
          UUID nodeId,
          GridCacheVersion xidVer,
          GridCacheVersion startVer,
@@@ -360,8 -385,31 +379,8 @@@
      }
  
      /** {@inheritDoc} */
-     @Override public Collection<IgniteTxEntry<K, V>> optimisticLockEntries() {
+     @Override public Collection<IgniteTxEntry> optimisticLockEntries() {
 -        if (!groupLock())
 -            return writeEntries();
 -        else {
 -            if (!F.isEmpty(invalidParts)) {
 -                assert invalidParts.size() == 1 : "Only one partition expected for group lock transaction " +
 -                    "[tx=" + this + ", invalidParts=" + invalidParts + ']';
 -                assert groupLockEntry() == null : "Group lock key should be rejected " +
 -                    "[tx=" + this + ", groupLockEntry=" + groupLockEntry() + ']';
 -                assert F.isEmpty(writeMap()) : "All entries should be rejected for group lock transaction " +
 -                    "[tx=" + this + ", writes=" + writeMap() + ']';
 -
 -                return Collections.emptyList();
 -            }
 -
 -            IgniteTxEntry grpLockEntry = groupLockEntry();
 -
 -            assert grpLockEntry != null || (near() && !local()):
 -                "Group lock entry was not enlisted into transaction [tx=" + this +
 -                ", grpLockKey=" + groupLockKey() + ']';
 -
 -            return grpLockEntry == null ?
 -                Collections.<IgniteTxEntry>emptyList() :
 -                Collections.singletonList(grpLockEntry);
 -        }
 +        return writeEntries();
      }
  
      /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 22d8cf9,95d3527..c3e734c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@@ -82,11 -75,9 +75,12 @@@ public class IgniteTxEntry implements G
  
      /** Transform. */
      @GridToStringInclude
-     private Collection<T2<EntryProcessor<K, V, ?>, Object[]>> entryProcessorsCol;
+     @GridDirectTransient
+     private Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessorsCol;
  
 +    /** Transient field for calculated entry processor value. */
 +    private V entryProcessorCalcVal;
 +
      /** Transform closure bytes. */
      @GridToStringExclude
      private byte[] transformClosBytes;
@@@ -131,15 -129,18 +132,15 @@@
      private transient boolean locked;
  
      /** Assigned node ID (required only for partitioned cache). */
-     private transient UUID nodeId;
+     @GridDirectTransient
+     private UUID nodeId;
  
      /** Flag if this node is a back up node. */
+     @GridDirectTransient
      private boolean locMapped;
  
-     /** Deployment enabled flag. */
-     private boolean depEnabled;
 -    /** Group lock entry flag. */
 -    private boolean grpLock;
--
      /** Expiry policy. */
+     @GridDirectTransient
      private ExpiryPolicy expiryPlc;
  
      /** Expiry policy transfer flag. */
@@@ -283,7 -296,7 +280,6 @@@
          cp.ttl = ttl;
          cp.conflictExpireTime = conflictExpireTime;
          cp.explicitVer = explicitVer;
-         cp.depEnabled = depEnabled;
 -        cp.grpLock = grpLock;
          cp.conflictVer = conflictVer;
          cp.expiryPlc = expiryPlc;
  
@@@ -831,89 -764,87 +747,81 @@@
      }
  
      /** {@inheritDoc} */
-     @Override public void writeExternal(ObjectOutput out) throws IOException {
-         out.writeBoolean(depEnabled);
+     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+         writer.setBuffer(buf);
  
-         if (depEnabled) {
-             U.writeByteArray(out, keyBytes);
-             U.writeByteArray(out, transformClosBytes);
-             U.writeByteArray(out, filterBytes);
-         }
-         else {
-             out.writeObject(key);
-             U.writeCollection(out, entryProcessorsCol);
-             U.writeArray(out, filters);
+         if (!writer.isHeaderWritten()) {
+             if (!writer.writeHeader(directType(), fieldsCount()))
+                 return false;
+ 
+             writer.onHeaderWritten();
          }
  
-         out.writeInt(cacheId);
+         switch (writer.state()) {
+             case 0:
+                 if (!writer.writeInt("cacheId", cacheId))
+                     return false;
  
-         val.writeTo(out);
+                 writer.incrementState();
  
-         out.writeLong(ttl);
+             case 1:
+                 if (!writer.writeLong("conflictExpireTime", conflictExpireTime))
+                     return false;
  
-         CU.writeVersion(out, explicitVer);
+                 writer.incrementState();
  
-         if (conflictExpireTime != CU.EXPIRE_TIME_CALCULATE) {
-             out.writeBoolean(true);
-             out.writeLong(conflictExpireTime);
-         }
-         else
-             out.writeBoolean(false);
+             case 2:
+                 if (!writer.writeMessage("conflictVer", conflictVer))
+                     return false;
  
-         CU.writeVersion(out, conflictVer);
+                 writer.incrementState();
  
-         out.writeObject(transferExpiryPlc ? new IgniteExternalizableExpiryPolicy(expiryPlc) : null);
-     }
+             case 3:
+                 if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+                     return false;
  
-     /** {@inheritDoc} */
-     @SuppressWarnings({"unchecked"})
-     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-         depEnabled = in.readBoolean();
- 
-         if (depEnabled) {
-             keyBytes = U.readByteArray(in);
-             transformClosBytes = U.readByteArray(in);
-             filterBytes = U.readByteArray(in);
-         }
-         else {
-             key = (K)in.readObject();
-             entryProcessorsCol = U.readCollection(in);
-             filters = GridCacheUtils.readEntryFilterArray(in);
-         }
+                 writer.incrementState();
  
-         cacheId = in.readInt();
+             case 4:
+                 if (!writer.writeMessage("explicitVer", explicitVer))
+                     return false;
  
-         val.readFrom(in);
+                 writer.incrementState();
  
-         ttl = in.readLong();
+             case 5:
+                 if (!writer.writeObjectArray("filters",
+                     !F.isEmptyOrNulls(filters) ? filters : null, MessageCollectionItemType.MSG))
+                     return false;
  
-         explicitVer = CU.readVersion(in);
+                 writer.incrementState();
  
-         conflictExpireTime = in.readBoolean() ? in.readLong() : CU.EXPIRE_TIME_CALCULATE;
-         conflictVer = CU.readVersion(in);
 -            case 6:
 -                if (!writer.writeBoolean("grpLock", grpLock))
 -                    return false;
 -
 -                writer.incrementState();
 -
+             case 7:
+                 if (!writer.writeMessage("key", key))
+                     return false;
  
-         expiryPlc = (ExpiryPolicy)in.readObject();
-     }
+                 writer.incrementState();
  
-     /** {@inheritDoc} */
-     @Override public Object ggClassId() {
-         return GG_CLASS_ID;
-     }
+             case 8:
+                 if (!writer.writeByteArray("transformClosBytes", transformClosBytes))
+                     return false;
  
-     /** {@inheritDoc} */
-     @Override public Class<?> deployClass() {
-         ClassLoader clsLdr = getClass().getClassLoader();
+                 writer.incrementState();
  
-         V val = value();
+             case 9:
+                 if (!writer.writeLong("ttl", ttl))
+                     return false;
  
-         // First of all check classes that may be loaded by class loader other than application one.
-         return key != null && !clsLdr.equals(key.getClass().getClassLoader()) ?
-             key.getClass() : val != null ? val.getClass() : getClass();
-     }
+                 writer.incrementState();
  
-     /** {@inheritDoc} */
-     @Override public ClassLoader classLoader() {
-         return deployClass().getClassLoader();
+             case 10:
+                 if (!writer.writeMessage("val", val))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+         }
+ 
+         return true;
      }
  
      /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index ca58d6b,da30a94..8e0380c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@@ -409,32 -409,17 +409,32 @@@ public class IgniteTxHandler 
          assert nodeId != null;
          assert res != null;
  
 -        GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.xid(),
 -            res.futureId());
 +        if (res.checkCommitted()) {
-             GridNearTxFinishFuture<K, V> fut = (GridNearTxFinishFuture<K, V>)ctx.mvcc().<IgniteInternalTx>future(
++            GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(
 +                res.xid(), res.futureId());
  
 -        if (fut == null) {
 -            if (log.isDebugEnabled())
 -                log.debug("Received response for unknown future (will ignore): " + res);
 +            if (fut == null) {
 +                if (log.isDebugEnabled())
 +                    log.debug("Received response for unknown future (will ignore): " + res);
  
 -            return;
 +                return;
 +            }
 +
 +            fut.onResult(nodeId, res);
          }
 +        else {
-             GridDhtTxFinishFuture<K, V> fut = (GridDhtTxFinishFuture<K, V>)ctx.mvcc().<IgniteInternalTx>future(
++            GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(
 +                res.xid(), res.futureId());
  
 -        fut.onResult(nodeId, res);
 +            if (fut == null) {
 +                if (log.isDebugEnabled())
 +                    log.debug("Received response for unknown future (will ignore): " + res);
 +
 +                return;
 +            }
 +
 +            fut.onResult(nodeId, res);
 +        }
      }
  
      /**
@@@ -756,9 -758,9 +756,9 @@@
              finish(nodeId, nearTx, req);
  
          if (dhtTx != null && !dhtTx.done()) {
-             dhtTx.finishFuture().listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+             dhtTx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
                  @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
 -                    sendReply(nodeId, req);
 +                    sendReply(nodeId, req, true);
                  }
              });
          }
@@@ -870,17 -873,9 +870,17 @@@
       * @param nodeId Node id that originated finish request.
       * @param req Request.
       */
-     protected void sendReply(UUID nodeId, GridDhtTxFinishRequest<K, V> req, boolean committed) {
 -    protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req) {
++    protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
          if (req.replyRequired()) {
-             GridDhtTxFinishResponse<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId());
 -            GridCacheMessage res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
++            GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
 +
 +            if (req.checkCommitted()) {
 +                res.checkCommitted(true);
 +
 +                if (!committed)
 +                    res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
 +                        "(transaction has been rolled back on backup node): " + req.version()));
 +            }
  
              try {
                  ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
@@@ -1090,64 -1083,15 +1092,64 @@@
       * @param nodeId Node ID.
       * @param req Request.
       */
 -    protected void processCheckPreparedTxRequest(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest req) {
 +    protected void processCheckPreparedTxRequest(
 +        final UUID nodeId,
-         final GridCacheOptimisticCheckPreparedTxRequest<K, V> req
++        final GridCacheOptimisticCheckPreparedTxRequest req
 +    ) {
          if (log.isDebugEnabled())
              log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
  
 -        boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
 +        if (req.nearCheck()) {
 +            IgniteInternalFuture<Boolean> fut = ctx.tm().nearTxCommitted(req.nearXidVersion());
 +
-             fut.listenAsync(new CI1<IgniteInternalFuture<Boolean>>() {
++            fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
 +                @Override public void apply(IgniteInternalFuture<Boolean> f) {
 +                    try {
 +                        boolean prepared = f.get();
  
 -        GridCacheOptimisticCheckPreparedTxResponse res =
 -            new GridCacheOptimisticCheckPreparedTxResponse(req.version(), req.futureId(), req.miniId(), prepared);
 +                        sendCheckPrepareTxResponse(nodeId,
 +                            new GridCacheOptimisticCheckPreparedTxResponse<K, V>(
 +                                req.version(),
 +                                req.futureId(),
 +                                req.miniId(),
 +                                prepared),
 +                            req.system());
  
 +                    }
 +                    catch (IgniteCheckedException e) {
 +                        U.error(log, "Failed to wait for transaction check prepared future " +
 +                            "(will send rolled back response): " + req.nearXidVersion(), e);
 +
 +                        sendCheckPrepareTxResponse(nodeId,
 +                            new GridCacheOptimisticCheckPreparedTxResponse<K, V>(
 +                                req.version(),
 +                                req.futureId(),
 +                                req.miniId(),
 +                                false),
 +                            req.system());
 +                    }
 +                }
 +            });
 +        }
 +        else {
 +            boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
 +
 +            sendCheckPrepareTxResponse(nodeId,
-                 new GridCacheOptimisticCheckPreparedTxResponse<K, V>(req.version(), req.futureId(), req.miniId(), prepared),
++                new GridCacheOptimisticCheckPreparedTxResponse(req.version(), req.futureId(), req.miniId(), prepared),
 +                req.system());
 +        }
 +    }
 +
 +    /**
 +     * @param nodeId Node ID.
 +     * @param res Response to send.
 +     * @param sys System pool flag.
 +     */
 +    private void sendCheckPrepareTxResponse(
 +        UUID nodeId,
-         GridCacheOptimisticCheckPreparedTxResponse<K, V> res,
++        GridCacheOptimisticCheckPreparedTxResponse res,
 +        boolean sys
 +    ) {
          try {
              if (log.isDebugEnabled())
                  log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ca18a69,ca85838..55952c8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -108,9 -122,11 +110,9 @@@ public abstract class IgniteTxLocalAdap
       * @param isolation Isolation.
       * @param timeout Timeout.
       * @param txSize Expected transaction size.
 -     * @param grpLockKey Group lock key if this is a group-lock transaction.
 -     * @param partLock {@code True} if this is a group-lock transaction and lock is acquired for whole partition.
       */
      protected IgniteTxLocalAdapter(
-         GridCacheSharedContext<K, V> cctx,
+         GridCacheSharedContext cctx,
          GridCacheVersion xidVer,
          boolean implicit,
          boolean implicitSingle,
@@@ -125,8 -141,14 +128,24 @@@
          @Nullable UUID subjId,
          int taskNameHash
      ) {
-         super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate,
-             storeEnabled, onePhaseCommit, txSize, subjId, taskNameHash);
 -        super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, plc, concurrency, isolation, timeout,
 -            invalidate, storeEnabled, txSize, grpLockKey, subjId, taskNameHash);
 -
 -        assert !partLock || grpLockKey != null;
 -
 -        this.partLock = partLock;
++        super(
++            cctx, 
++            xidVer, 
++            implicit, 
++            implicitSingle, 
++            /*local*/true, 
++            sys, 
++            plc,
++            concurrency, 
++            isolation, 
++            timeout,
++            invalidate,
++            storeEnabled, 
++            onePhaseCommit, 
++            txSize, 
++            subjId, 
++            taskNameHash
++        );
  
          minVer = xidVer;
      }
@@@ -456,16 -491,16 +483,16 @@@
       * @throws IgniteCheckedException If batch update failed.
       */
      @SuppressWarnings({"CatchGenericClass"})
-     protected void batchStoreCommit(Iterable<IgniteTxEntry<K, V>> writeEntries) throws IgniteCheckedException {
-         GridCacheStoreManager<K, V> store = store();
+     protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
+         GridCacheStoreManager store = store();
  
          if (store != null && store.writeThrough() && storeEnabled() &&
 -            (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) {
 +            !internal() && (near() || store.writeToStoreFromDht())) {
              try {
                  if (writeEntries != null) {
-                     Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null;
-                     List<K> rmvCol = null;
-                     GridCacheStoreManager<K, V> writeStore = null;
+                     Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null;
+                     List<Object> rmvCol = null;
+                     GridCacheStoreManager writeStore = null;
  
                      boolean skipNear = near() && store.writeToStoreFromDht();
  
@@@ -951,9 -981,9 +970,9 @@@
              }
          }
          else {
-             GridCacheStoreManager<K, V> store = store();
+             GridCacheStoreManager store = store();
  
 -            if (store != null && (!internal() || groupLock())) {
 +            if (store != null && !internal()) {
                  try {
                      store.txEnd(this, true);
                  }
@@@ -1013,10 -1086,10 +1032,10 @@@
  
                  cctx.tm().rollbackTx(this);
  
-                 GridCacheStoreManager<K, V> store = store();
+                 GridCacheStoreManager store = store();
  
                  if (store != null && (near() || store.writeToStoreFromDht())) {
 -                    if (!internal() || groupLock())
 +                    if (!internal())
                          store.txEnd(this, false);
                  }
              }
@@@ -1062,11 -1137,13 +1083,11 @@@
  
          cacheCtx.checkSecurity(GridSecurityPermission.CACHE_READ);
  
 -        groupLockSanityCheck(cacheCtx, keys);
 -
          boolean single = keysCnt == 1;
  
-         Collection<K> lockKeys = null;
+         Collection<KeyCacheObject> lockKeys = null;
  
-         long topVer = topologyVersion();
+         AffinityTopologyVersion topVer = topologyVersion();
  
          // In this loop we cover only read-committed or optimistic transactions.
          // Transactions that are pessimistic and not read-committed are covered
@@@ -1092,17 -1166,11 +1110,11 @@@
                      if (!F.isEmpty(txEntry.entryProcessors()))
                          val = txEntry.applyEntryProcessors(val);
  
-                     if (val != null) {
-                         V val0 = val;
- 
-                         if (cacheCtx.portableEnabled())
-                             val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
- 
-                         map.put(key, (V)CU.skipValue(val0, skipVals));
-                     }
+                     if (val != null)
+                         cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializePortable, false);
                  }
                  else {
 -                    assert txEntry.op() == TRANSFORM || (groupLock() && !txEntry.groupLockEntry());
 +                    assert txEntry.op() == TRANSFORM;
  
                      while (true) {
                          try {
@@@ -1177,9 -1246,9 +1190,9 @@@
                      try {
                          GridCacheVersion ver = entry.version();
  
-                         V val = null;
+                         CacheObject val = null;
  
 -                        if (!pessimistic() || readCommitted() || groupLock() && !skipVals) {
 +                        if ((!pessimistic() || readCommitted()) && !skipVals) {
                              IgniteCacheExpiryPolicy accessPlc =
                                  optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
  
@@@ -1336,159 -1411,182 +1352,181 @@@
          if (log.isDebugEnabled())
              log.debug("Loading missed values for missed map: " + missedMap);
  
-         final Collection<K> loaded = new HashSet<>();
- 
-         return new GridEmbeddedFuture<>(cctx.kernalContext(),
-             loadMissing(
-                 cacheCtx,
-                 true, false, missedMap.keySet(), deserializePortable, skipVals, new CI2<K, V>() {
-                 /** */
-                 private GridCacheVersion nextVer;
+         final Collection<KeyCacheObject> loaded = new HashSet<>();
  
-                 @Override public void apply(K key, V val) {
-                     if (isRollbackOnly()) {
-                         if (log.isDebugEnabled())
-                             log.debug("Ignoring loaded value for read because transaction was rolled back: " +
-                                 IgniteTxLocalAdapter.this);
+         return new GridEmbeddedFuture<>(
+             new C2<Boolean, Exception, Map<K, V>>() {
+                 @Override public Map<K, V> apply(Boolean b, Exception e) {
+                     if (e != null) {
+                         setRollbackOnly();
  
-                         return;
+                         throw new GridClosureException(e);
                      }
  
-                     GridCacheVersion ver = missedMap.get(key);
- 
-                     if (ver == null) {
-                         if (log.isDebugEnabled())
-                             log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
+                     if (!b && !readCommitted()) {
+                         // There is no store - we must mark the entries.
+                         for (KeyCacheObject key : missedMap.keySet()) {
+                             IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
  
-                         return;
+                             if (txEntry != null)
+                                 txEntry.markValid();
+                         }
                      }
  
-                     V visibleVal = val;
+                     if (readCommitted()) {
+                         Collection<KeyCacheObject> notFound = new HashSet<>(missedMap.keySet());
  
-                     IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+                         notFound.removeAll(loaded);
  
-                     IgniteTxEntry<K, V> txEntry = entry(txKey);
+                         // In read-committed mode touch entries that have just been read.
+                         for (KeyCacheObject key : notFound) {
+                             IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
  
-                     if (txEntry != null) {
-                         if (!readCommitted())
-                             txEntry.readValue(val);
+                             GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
+                                 txEntry.cached();
  
-                         if (!F.isEmpty(txEntry.entryProcessors()))
-                             visibleVal = txEntry.applyEntryProcessors(visibleVal);
+                             if (entry != null)
+                                 cacheCtx.evicts().touch(entry, topologyVersion());
+                         }
                      }
  
-                     // In pessimistic mode we hold the lock, so filter validation
-                     // should always be valid.
-                     if (pessimistic())
-                         ver = null;
+                     return map;
+                 }
+             },
+             loadMissing(
+                 cacheCtx,
+                 true,
+                 false,
+                 missedMap.keySet(),
+                 deserializePortable,
+                 skipVals,
+                 new CI2<KeyCacheObject, Object>() {
+                     /** */
+                     private GridCacheVersion nextVer;
  
-                     // Initialize next version.
-                     if (nextVer == null)
-                         nextVer = cctx.versions().next(topologyVersion());
+                     @Override public void apply(KeyCacheObject key, Object val) {
+                         if (isRollbackOnly()) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Ignoring loaded value for read because transaction was rolled back: " +
+                                     IgniteTxLocalAdapter.this);
  
-                     while (true) {
-                         assert txEntry != null || readCommitted() || skipVals;
+                             return;
+                         }
  
-                         GridCacheEntryEx<K, V> e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
+                         GridCacheVersion ver = missedMap.get(key);
  
-                         try {
-                             // Must initialize to true since even if filter didn't pass,
-                             // we still record the transaction value.
-                             boolean set;
+                         if (ver == null) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
  
-                             try {
-                                 set = e.versionedValue(val, ver, nextVer);
-                             }
-                             catch (GridCacheEntryRemovedException ignore) {
-                                 if (log.isDebugEnabled())
-                                     log.debug("Got removed entry in transaction getAll method " +
-                                         "(will try again): " + e);
+                             return;
+                         }
  
-                                 if (pessimistic() && !readCommitted() && !isRollbackOnly()) {
-                                     U.error(log, "Inconsistent transaction state (entry got removed while " +
-                                         "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
+                         CacheObject cacheVal = cacheCtx.toCacheObject(val);
  
-                                     setRollbackOnly();
+                         CacheObject visibleVal = cacheVal;
  
-                                     return;
-                                 }
+                         IgniteTxKey txKey = cacheCtx.txKey(key);
  
-                                 if (txEntry != null)
-                                     txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
+                         IgniteTxEntry txEntry = entry(txKey);
  
-                                 continue; // While loop.
-                             }
+                         if (txEntry != null) {
+                             if (!readCommitted())
+                                 txEntry.readValue(cacheVal);
  
-                             // In pessimistic mode, we should always be able to set.
-                             assert set || !pessimistic();
+                             if (!F.isEmpty(txEntry.entryProcessors()))
+                                 visibleVal = txEntry.applyEntryProcessors(visibleVal);
+                         }
  
-                             if (readCommitted() || skipVals) {
-                                 cacheCtx.evicts().touch(e, topologyVersion());
+                         // In pessimistic mode we hold the lock, so filter validation
+                         // should always be valid.
+                         if (pessimistic())
+                             ver = null;
  
-                                 if (visibleVal != null)
-                                     map.put(key, (V)CU.skipValue(visibleVal, skipVals));
-                             }
-                             else {
-                                 assert txEntry != null;
+                         // Initialize next version.
+                         if (nextVer == null)
+                             nextVer = cctx.versions().next(topologyVersion());
  
-                                 txEntry.setAndMarkValid(val);
+                         while (true) {
 -                            assert txEntry != null || readCommitted() || groupLock() || skipVals;
++                            assert txEntry != null || readCommitted() || skipVals;
  
-                                 if (visibleVal != null)
-                                     map.put(key, visibleVal);
-                             }
+                             GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
  
-                             loaded.add(key);
+                             try {
+                                 // Must initialize to true since even if filter didn't pass,
+                                 // we still record the transaction value.
+                                 boolean set;
  
-                             if (log.isDebugEnabled())
-                                 log.debug("Set value loaded from store into entry from transaction [set=" + set +
-                                     ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
+                                 try {
+                                     set = e.versionedValue(cacheVal, ver, nextVer);
+                                 }
+                                 catch (GridCacheEntryRemovedException ignore) {
+                                     if (log.isDebugEnabled())
+                                         log.debug("Got removed entry in transaction getAll method " +
+                                             "(will try again): " + e);
  
-                             break; // While loop.
-                         }
-                         catch (IgniteCheckedException ex) {
-                             throw new IgniteException("Failed to put value for cache entry: " + e, ex);
-                         }
-                     }
-                 }
-             }),
-             new C2<Boolean, Exception, Map<K, V>>() {
-                 @Override public Map<K, V> apply(Boolean b, Exception e) {
-                     if (e != null) {
-                         setRollbackOnly();
 -                                    if (pessimistic() && !readCommitted() && !isRollbackOnly() &&
 -                                        (!groupLock() || F.eq(e.key(), groupLockKey()))) {
++                                    if (pessimistic() && !readCommitted() && !isRollbackOnly()) {
+                                         U.error(log, "Inconsistent transaction state (entry got removed while " +
+                                             "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
  
-                         throw new GridClosureException(e);
-                     }
+                                         setRollbackOnly();
  
-                     if (!b && !readCommitted()) {
-                         // There is no store - we must mark the entries.
-                         for (K key : missedMap.keySet()) {
-                             IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
+                                         return;
+                                     }
  
-                             if (txEntry != null)
-                                 txEntry.markValid();
-                         }
-                     }
+                                     if (txEntry != null)
+                                         txEntry.cached(entryEx(cacheCtx, txKey));
  
-                     if (readCommitted()) {
-                         Collection<K> notFound = new HashSet<>(missedMap.keySet());
+                                     continue; // While loop.
+                                 }
  
-                         notFound.removeAll(loaded);
+                                 // In pessimistic mode, we should always be able to set.
+                                 assert set || !pessimistic();
  
-                         // In read-committed mode touch entries that have just been read.
-                         for (K key : notFound) {
-                             IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
 -                                if (readCommitted() || groupLock() || skipVals) {
++                                if (readCommitted() || skipVals) {
+                                     cacheCtx.evicts().touch(e, topologyVersion());
  
-                             GridCacheEntryEx<K, V> entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
-                                 txEntry.cached();
+                                     if (visibleVal != null) {
+                                         cacheCtx.addResult(map,
+                                             key,
+                                             visibleVal,
+                                             skipVals,
+                                             keepCacheObjects,
+                                             deserializePortable,
+                                             false);
+                                     }
+                                 }
+                                 else {
+                                     assert txEntry != null;
+ 
+                                     txEntry.setAndMarkValid(cacheVal);
+ 
+                                     if (visibleVal != null) {
+                                         cacheCtx.addResult(map,
+                                             key,
+                                             visibleVal,
+                                             skipVals,
+                                             keepCacheObjects,
+                                             deserializePortable,
+                                             false);
+                                     }
+                                 }
  
-                             if (entry != null)
-                                 cacheCtx.evicts().touch(entry, topologyVersion());
+                                 loaded.add(key);
+ 
+                                 if (log.isDebugEnabled())
+                                     log.debug("Set value loaded from store into entry from transaction [set=" + set +
+                                         ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
+ 
+                                 break; // While loop.
+                             }
+                             catch (IgniteCheckedException ex) {
+                                 throw new IgniteException("Failed to put value for cache entry: " + e, ex);
+                             }
                          }
                      }
- 
-                     return map;
-                 }
-             });
+                 })
+         );
      }
  
      /** {@inheritDoc} */
@@@ -1526,13 -1625,14 +1565,14 @@@
                  missed,
                  keysCnt,
                  deserializePortable,
-                 skipVals);
+                 skipVals,
+                 keepCacheObjects);
  
              if (single && missed.isEmpty())
-                 return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
+                 return new GridFinishedFuture<>(retMap);
  
              // Handle locks.
 -            if (pessimistic() && !readCommitted() && !groupLock() && !skipVals) {
 +            if (pessimistic() && !readCommitted() && !skipVals) {
                  if (expiryPlc == null)
                      expiryPlc = cacheCtx.expiry();
  
@@@ -1677,9 -1789,9 +1729,9 @@@
                  }
              }
              else {
 -                assert optimistic() || readCommitted() || groupLock() || skipVals;
 +                assert optimistic() || readCommitted() || skipVals;
  
-                 final Collection<K> redos = new ArrayList<>();
+                 final Collection<KeyCacheObject> redos = new ArrayList<>();
  
                  if (!missed.isEmpty()) {
                      if (!readCommitted())
@@@ -1872,7 -2002,9 +1942,7 @@@
              if (invokeMap != null)
                  transform = true;
  
-             for (K key : keys) {
 -            groupLockSanityCheck(cacheCtx, keys);
 -
+             for (Object key : keys) {
                  if (key == null) {
                      setRollbackOnly();
  
@@@ -2027,9 -2158,12 +2096,9 @@@
                              if (!implicit() && readCommitted())
                                  cacheCtx.evicts().touch(entry, topologyVersion());
  
-                             enlisted.add(key);
 -                            if (groupLock() && !lockOnly)
 -                                txEntry.groupLockEntry(true);
 -
+                             enlisted.add(cacheKey);
  
 -                            if ((!pessimistic() && !implicit()) || (groupLock() && !lockOnly)) {
 +                            if ((!pessimistic() && !implicit())) {
                                  txEntry.markValid();
  
                                  if (old == null) {
@@@ -2496,22 -2602,11 +2537,11 @@@
                  drMap,
                  null);
  
 -            if (pessimistic() && !groupLock()) {
 +            if (pessimistic()) {
                  // Loose all skipped.
-                 final Set<K> loaded = loadFut.get();
- 
-                 final Collection<K> keys;
+                 final Set<KeyCacheObject> loaded = loadFut.get();
  
-                 if (keySet != null ) {
-                     keys = new ArrayList<>(keySet.size());
- 
-                     for (K k : keySet) {
-                         if (k != null && (loaded == null || !loaded.contains(k)))
-                             keys.add(k);
-                     }
-                 }
-                 else
-                     keys = Collections.emptyList();
+                 final Collection<KeyCacheObject> keys = F.view(enlisted, F0.notIn(loaded));
  
                  if (log.isDebugEnabled())
                      log.debug("Before acquiring transaction lock for put on keys: " + keys);
@@@ -2733,9 -2824,9 +2759,9 @@@
              // Acquire locks only after having added operation to the write set.
              // Otherwise, during rollback we will not know whether locks need
              // to be rolled back.
 -            if (pessimistic() && !groupLock()) {
 +            if (pessimistic()) {
                  // Loose all skipped.
-                 final Collection<? extends K> passedKeys = F.view(enlisted, F0.notIn(loadFut.get()));
+                 final Collection<KeyCacheObject> passedKeys = F.view(enlisted, F0.notIn(loadFut.get()));
  
                  if (log.isDebugEnabled())
                      log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys);

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index e372095,9f1a70e..e06cde0
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@@ -131,13 -138,27 +133,13 @@@ public interface IgniteTxLocalEx extend
       * @return Future for asynchronous remove.
       */
      public IgniteInternalFuture<?> removeAllDrAsync(
-         GridCacheContext<K, V> cacheCtx,
-         Map<? extends K, GridCacheVersion> drMap);
+         GridCacheContext cacheCtx,
+         Map<KeyCacheObject, GridCacheVersion> drMap);
  
      /**
 -     * Performs keys locking for affinity-based group lock transactions.
 -     *
 -     * @param cacheCtx Cache context.
 -     * @param keys Keys to lock.
 -     * @return Lock future.
 -     */
 -    public <K> IgniteInternalFuture<?> groupLockAsync(GridCacheContext cacheCtx, Collection<K> keys);
 -
 -    /**
 -     * @return {@code True} if keys from the same partition are allowed to be enlisted in group-lock transaction.
 -     */
 -    public boolean partitionLock();
 -
 -    /**
       * @return Return value for
       */
-     public GridCacheReturn<V> implicitSingleResult();
+     public GridCacheReturn implicitSingleResult();
  
      /**
       * Finishes transaction (either commit or rollback).

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 974144a,7ea6e3a..14e1db2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@@ -200,9 -205,10 +205,9 @@@ public class IgniteTxManager extends Gr
                  }
  
                  if (tx instanceof IgniteTxRemoteEx) {
-                     IgniteTxRemoteEx<K, V> rmtTx = (IgniteTxRemoteEx<K, V>)tx;
+                     IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
  
 -                    rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(),
 -                        Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList());
 +                    rmtTx.doneRemote(tx.xidVersion());
                  }
  
                  tx.commit();
@@@ -334,8 -340,8 +339,8 @@@
       * @return {@code True} if transaction has been committed or rolled back,
       *      {@code false} otherwise.
       */
-     public boolean isCompleted(IgniteInternalTx<K, V> tx) {
+     public boolean isCompleted(IgniteInternalTx tx) {
 -        return completedVers.containsKey(tx.xidVersion());
 +        return completedVers.containsKey(tx.writeVersion());
      }
  
      /**
@@@ -345,19 -351,24 +350,21 @@@
       * @param isolation Isolation.
       * @param timeout transaction timeout.
       * @param txSize Expected transaction size.
 -     * @param grpLockKey Group lock key if this is a group-lock transaction.
 -     * @param partLock {@code True} if partition is locked.
       * @return New transaction.
       */
-     public IgniteTxLocalAdapter<K, V> newTx(
+     public IgniteTxLocalAdapter newTx(
          boolean implicit,
          boolean implicitSingle,
-         boolean sys,
+         @Nullable GridCacheContext sysCacheCtx,
          TransactionConcurrency concurrency,
          TransactionIsolation isolation,
          long timeout,
          boolean invalidate,
          boolean storeEnabled,
 -        int txSize,
 -        @Nullable IgniteTxKey grpLockKey,
 -        boolean partLock) {
 +        int txSize
 +    ) {
+         assert sysCacheCtx == null || sysCacheCtx.system();
 -
++        
          UUID subjId = null; // TODO GG-9141 how to get subj ID?
  
          int taskNameHash = cctx.kernalContext().job().currentTaskNameHash();
@@@ -936,18 -969,44 +963,18 @@@
      }
  
      /**
 -     * Gets committed transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive?
 -     *
 -     * @param min Start (or minimum) version.
 -     * @return Committed transactions starting from the given version (non-inclusive).
 -     */
 -    public Collection<GridCacheVersion> committedVersions(GridCacheVersion min) {
 -        ConcurrentNavigableMap<GridCacheVersion, Boolean> tail
 -            = completedVers.tailMap(min, true);
 -
 -        return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, true);
 -    }
 -
 -    /**
 -     * Gets rolledback transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive?
 -     *
 -     * @param min Start (or minimum) version.
 -     * @return Committed transactions starting from the given version (non-inclusive).
 -     */
 -    public Collection<GridCacheVersion> rolledbackVersions(GridCacheVersion min) {
 -        ConcurrentNavigableMap<GridCacheVersion, Boolean> tail
 -            = completedVers.tailMap(min, true);
 -
 -        return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, false);
 -    }
 -
 -    /**
       * @param tx Tx to remove.
       */
-     public void removeCommittedTx(IgniteInternalTx<K, V> tx) {
+     public void removeCommittedTx(IgniteInternalTx tx) {
 -        completedVers.remove(tx.xidVersion(), true);
 +        completedVers.remove(tx.writeVersion(), true);
      }
  
      /**
       * @param tx Committed transaction.
       * @return If transaction was not already present in committed set.
       */
-     public boolean addCommittedTx(IgniteInternalTx<K, V> tx) {
+     public boolean addCommittedTx(IgniteInternalTx tx) {
 -        return addCommittedTx(tx.xidVersion(), tx.nearXidVersion());
 +        return addCommittedTx(tx.writeVersion(), tx.nearXidVersion());
      }
  
      /**
@@@ -1069,9 -1198,18 +1096,9 @@@
                  completedVers.firstKey() + ", lastVer=" + completedVers.lastKey() + ", tx=" + tx.xid() + ']');
          }
  
-         ConcurrentMap<GridCacheVersion, IgniteInternalTx<K, V>> txIdMap = transactionMap(tx);
+         ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
  
          if (txIdMap.remove(tx.xidVersion(), tx)) {
 -            // 2. Must process completed entries before unlocking!
 -            processCompletedEntries(tx);
 -
 -            if (tx instanceof GridDhtTxLocal) {
 -                GridDhtTxLocal dhtTxLoc = (GridDhtTxLocal)tx;
 -
 -                collectPendingVersions(dhtTxLoc);
 -            }
 -
              // 3.1 Call dataStructures manager.
              cctx.kernalContext().dataStructures().onTxCommitted(tx);
  
@@@ -1277,11 -1445,11 +1331,11 @@@
      /**
       * @param tx Transaction to notify evictions for.
       */
-     private void notifyEvitions(IgniteInternalTx<K, V> tx) {
+     private void notifyEvitions(IgniteInternalTx tx) {
 -        if (tx.internal() && !tx.groupLock())
 +        if (tx.internal())
              return;
  
-         for (IgniteTxEntry<K, V> txEntry : tx.allEntries())
+         for (IgniteTxEntry txEntry : tx.allEntries())
              txEntry.cached().context().evicts().touch(txEntry, tx.local());
      }
  
@@@ -1721,9 -1911,10 +1775,9 @@@
          }
  
          if (tx instanceof GridDistributedTxRemoteAdapter) {
-             IgniteTxRemoteEx<K,V> rmtTx = (IgniteTxRemoteEx<K, V>)tx;
+             IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
  
 -            rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList(),
 -                Collections.<GridCacheVersion>emptyList());
 +            rmtTx.doneRemote(tx.xidVersion());
          }
  
          if (commit)
@@@ -1747,9 -1938,12 +1801,9 @@@
          }
  
          if (tx instanceof GridDistributedTxRemoteAdapter) {
-             IgniteTxRemoteEx<K,V> rmtTx = (IgniteTxRemoteEx<K, V>)tx;
+             IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
  
 -            rmtTx.doneRemote(tx.xidVersion(),
 -                Collections.<GridCacheVersion>emptyList(),
 -                Collections.<GridCacheVersion>emptyList(),
 -                Collections.<GridCacheVersion>emptyList());
 +            rmtTx.doneRemote(tx.xidVersion());
          }
  
          try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/905a139e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
----------------------------------------------------------------------