You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/19 07:16:09 UTC

[2/9] incubator-ignite git commit: GG-9141 - Fixes for DR transactions.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
index 2b974e9..df4dd58 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -81,6 +81,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
      * @param ctx   Cache registry.
      * @param implicit Implicit flag.
      * @param implicitSingle Implicit with one key flag.
+     * @param sys System flag.
      * @param concurrency Concurrency.
      * @param isolation Isolation.
      * @param timeout Timeout.
@@ -92,6 +93,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
         GridCacheSharedContext<K, V> ctx,
         boolean implicit,
         boolean implicitSingle,
+        boolean sys,
         GridCacheTxConcurrency concurrency,
         GridCacheTxIsolation isolation,
         long timeout,
@@ -104,10 +106,11 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
         int taskNameHash
     ) {
         super(
+            ctx,
             ctx.versions().next(),
             implicit,
             implicitSingle,
-            ctx,
+            sys,
             concurrency,
             isolation,
             timeout,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 5bc5a3e..9098b73 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -30,6 +30,7 @@ import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
 import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
 
 /**
@@ -640,7 +641,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
             add(fut); // Append new future.
 
             try {
-                cctx.io().send(n, req);
+                cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
             }
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 30b7aef..431e134 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -242,19 +242,19 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
         }
 
         switch (commState.idx) {
-            case 21:
+            case 22:
                 if (!commState.putGridUuid(futId))
                     return false;
 
                 commState.idx++;
 
-            case 22:
+            case 23:
                 if (!commState.putBoolean(last))
                     return false;
 
                 commState.idx++;
 
-            case 23:
+            case 24:
                 if (lastBackups != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(lastBackups.size()))
@@ -281,31 +281,31 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
 
                 commState.idx++;
 
-            case 24:
+            case 25:
                 if (!commState.putGridUuid(miniId))
                     return false;
 
                 commState.idx++;
 
-            case 25:
+            case 26:
                 if (!commState.putBoolean(near))
                     return false;
 
                 commState.idx++;
 
-            case 26:
+            case 27:
                 if (!commState.putLong(topVer))
                     return false;
 
                 commState.idx++;
 
-            case 27:
+            case 28:
                 if (!commState.putUuid(subjId))
                     return false;
 
                 commState.idx++;
 
-            case 28:
+            case 29:
                 if (!commState.putInt(taskNameHash))
                     return false;
 
@@ -325,7 +325,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
             return false;
 
         switch (commState.idx) {
-            case 21:
+            case 22:
                 IgniteUuid futId0 = commState.getGridUuid();
 
                 if (futId0 == GRID_UUID_NOT_READ)
@@ -335,7 +335,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
 
                 commState.idx++;
 
-            case 22:
+            case 23:
                 if (buf.remaining() < 1)
                     return false;
 
@@ -343,7 +343,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
 
                 commState.idx++;
 
-            case 23:
+            case 24:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -372,7 +372,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
 
                 commState.idx++;
 
-            case 24:
+            case 25:
                 IgniteUuid miniId0 = commState.getGridUuid();
 
                 if (miniId0 == GRID_UUID_NOT_READ)
@@ -382,7 +382,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
 
                 commState.idx++;
 
-            case 25:
+            case 26:
                 if (buf.remaining() < 1)
                     return false;
 
@@ -390,7 +390,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
 
                 commState.idx++;
 
-            case 26:
+            case 27:
                 if (buf.remaining() < 8)
                     return false;
 
@@ -398,7 +398,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
 
                 commState.idx++;
 
-            case 27:
+            case 28:
                 UUID subjId0 = commState.getUuid();
 
                 if (subjId0 == UUID_NOT_READ)
@@ -408,7 +408,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
 
                 commState.idx++;
 
-            case 28:
+            case 29:
                 if (buf.remaining() < 4)
                     return false;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
index aa3546b..938e2d2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -62,6 +62,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
      * @param rmtThreadId Remote thread ID.
      * @param xidVer XID version.
      * @param commitVer Commit version.
+     * @param sys System flag.
      * @param concurrency Concurrency level (should be pessimistic).
      * @param isolation Transaction isolation.
      * @param invalidate Invalidate flag.
@@ -73,24 +74,25 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
      * @throws IgniteCheckedException If unmarshalling failed.
      */
     public GridNearTxRemote(
+        GridCacheSharedContext<K, V> ctx,
         ClassLoader ldr,
         UUID nodeId,
         UUID nearNodeId,
         long rmtThreadId,
         GridCacheVersion xidVer,
         GridCacheVersion commitVer,
+        boolean sys,
         GridCacheTxConcurrency concurrency,
         GridCacheTxIsolation isolation,
         boolean invalidate,
         long timeout,
         Collection<GridCacheTxEntry<K, V>> writeEntries,
-        GridCacheSharedContext<K, V> ctx,
         int txSize,
         @Nullable GridCacheTxKey grpLockKey,
         @Nullable UUID subjId,
         int taskNameHash
     ) throws IgniteCheckedException {
-        super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize,
+        super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
             grpLockKey, subjId, taskNameHash);
 
         assert nearNodeId != null;
@@ -119,6 +121,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
      * @param rmtThreadId Remote thread ID.
      * @param xidVer XID version.
      * @param commitVer Commit version.
+     * @param sys System flag.
      * @param concurrency Concurrency level (should be pessimistic).
      * @param isolation Transaction isolation.
      * @param invalidate Invalidate flag.
@@ -128,23 +131,24 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
      * @param grpLockKey Collection of group lock keys if this is a group-lock transaction.
      */
     public GridNearTxRemote(
+        GridCacheSharedContext<K, V> ctx,
         UUID nodeId,
         UUID nearNodeId,
         GridCacheVersion nearXidVer,
         long rmtThreadId,
         GridCacheVersion xidVer,
         GridCacheVersion commitVer,
+        boolean sys,
         GridCacheTxConcurrency concurrency,
         GridCacheTxIsolation isolation,
         boolean invalidate,
         long timeout,
-        GridCacheSharedContext<K, V> ctx,
         int txSize,
         @Nullable GridCacheTxKey grpLockKey,
         @Nullable UUID subjId,
         int taskNameHash
     ) {
-        super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize,
+        super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
             grpLockKey, subjId, taskNameHash);
 
         assert nearNodeId != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
index cebd888..aac60fd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
@@ -60,8 +60,8 @@ class GridLocalTx<K, V> extends GridCacheTxLocalAdapter<K, V> {
         @Nullable UUID subjId,
         int taskNameHash
     ) {
-        super(ctx, ctx.versions().next(), implicit, implicitSingle, concurrency, isolation, timeout, false, true, txSize,
-            null, false, subjId, taskNameHash);
+        super(ctx, ctx.versions().next(), implicit, implicitSingle, false, concurrency, isolation, timeout, false, true,
+            txSize, null, false, subjId, taskNameHash);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
index 60eec9e..aece7cf 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -12,6 +12,7 @@ package org.gridgain.grid.kernal.processors.cache.transactions;
 import org.apache.ignite.*;
 import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
@@ -25,7 +26,7 @@ import static org.gridgain.grid.cache.GridCacheTxIsolation.*;
 /**
  * Grid transactions implementation.
  */
-public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
+public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
     /** Cache shared context. */
     private GridCacheSharedContext<K, V> cctx;
 
@@ -44,7 +45,9 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
             cfg.getDefaultTxConcurrency(),
             cfg.getDefaultTxIsolation(),
             cfg.getDefaultTxTimeout(),
-            0);
+            0,
+            false
+        );
     }
 
     /** {@inheritDoc} */
@@ -58,7 +61,8 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
             concurrency,
             isolation,
             cfg.getDefaultTxTimeout(),
-            0
+            0,
+            false
         );
     }
 
@@ -74,7 +78,24 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
             concurrency,
             isolation,
             timeout,
-            txSize
+            txSize,
+            false
+        );
+    }
+
+    @Override public GridCacheTx txStartSystem(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation,
+        long timeout, int txSize) {
+        A.notNull(concurrency, "concurrency");
+        A.notNull(isolation, "isolation");
+        A.ensure(timeout >= 0, "timeout cannot be negative");
+        A.ensure(txSize >= 0, "transaction size cannot be negative");
+
+        return txStart0(
+            concurrency,
+            isolation,
+            timeout,
+            txSize,
+            true
         );
     }
 
@@ -83,10 +104,11 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
      * @param isolation Transaction isolation.
      * @param timeout Transaction timeout.
      * @param txSize Expected transaction size.
+     * @param sys System flag.
      * @return Transaction.
      */
     private GridCacheTx txStart0(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation,
-        long timeout, int txSize) {
+        long timeout, int txSize, boolean sys) {
         GridTransactionsConfiguration cfg = cctx.gridConfig().getTransactionsConfiguration();
 
         if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE)
@@ -102,6 +124,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
         tx = cctx.tm().newTx(
             false,
             false,
+            sys,
             concurrency,
             isolation,
             timeout,
@@ -128,7 +151,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
             throw new IllegalArgumentException("Failed to find cache with given name (cache is not configured): " +
                 cacheName);
 
-        return txStartGroupLock(cache.context(), affinityKey, concurrency, isolation, false, timeout, txSize);
+        return txStartGroupLock(cache.context(), affinityKey, concurrency, isolation, false, timeout, txSize, false);
     }
 
     /** {@inheritDoc} */
@@ -142,7 +165,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
 
         Object grpLockKey = cache.context().affinity().partitionAffinityKey(partId);
 
-        return txStartGroupLock(cache.context(), grpLockKey, concurrency, isolation, true, timeout, txSize);
+        return txStartGroupLock(cache.context(), grpLockKey, concurrency, isolation, true, timeout, txSize, false);
     }
 
     /**
@@ -155,13 +178,14 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
      *      should be a unique partition-specific key.
      * @param timeout Tx timeout.
      * @param txSize Expected transaction size.
+     * @param sys System flag.
      * @return Started transaction.
      * @throws IllegalStateException If other transaction was already started.
      * @throws IgniteCheckedException In case of error.
      */
     @SuppressWarnings("unchecked")
     private GridCacheTx txStartGroupLock(GridCacheContext ctx, Object grpLockKey, GridCacheTxConcurrency concurrency,
-        GridCacheTxIsolation isolation, boolean partLock, long timeout, int txSize)
+        GridCacheTxIsolation isolation, boolean partLock, long timeout, int txSize, boolean sys)
         throws IllegalStateException, IgniteCheckedException {
         GridCacheTx tx = cctx.tm().userTx();
 
@@ -172,6 +196,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
         GridCacheTxLocalAdapter<K, V> tx0 = cctx.tm().newTx(
             false,
             false,
+            sys,
             concurrency,
             isolation,
             timeout,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java b/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
index 6044900..e1e24bb 100644
--- a/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
+++ b/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
@@ -64,16 +64,17 @@ public class GridCacheJtaManager<K, V> extends GridCacheJtaManagerAdapter<K, V>
                                 .getTransactionsConfiguration();
 
                             tx = cctx.tm().newTx(
-                                false,
-                                false,
+                                /*implicit*/false,
+                                /*implicit single*/false,
+                                /*system*/false,
                                 tCfg.getDefaultTxConcurrency(),
                                 tCfg.getDefaultTxIsolation(),
                                 tCfg.getDefaultTxTimeout(),
-                                false,
-                                true,
-                                0,
-                                /** group lock keys */null,
-                                /** partition lock */false
+                                /*invalidate*/false,
+                                /*store enabled*/true,
+                                /*tx size*/0,
+                                /*group lock keys*/null,
+                                /*partition lock*/false
                             );
                         }