You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/14 15:16:05 UTC

[3/5] ignite git commit: ignite-4768 txs

http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index d448446..7a69a6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -27,10 +27,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.UUID;
 import javax.cache.Cache;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreSession;
@@ -61,11 +63,17 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncSupport;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -904,7 +912,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     private static class SessionData {
         /** */
         @GridToStringExclude
-        private final IgniteInternalTx tx;
+        private final TxProxy tx;
 
         /** */
         private String cacheName;
@@ -914,7 +922,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
         private Map<Object, Object> props;
 
         /** */
-        private Object attachment;
+        private Object attach;
 
         /** */
         private final Set<CacheStoreManager> started =
@@ -927,8 +935,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
          * @param tx Current transaction.
          * @param cacheName Cache name.
          */
-        private SessionData(@Nullable IgniteInternalTx tx, @Nullable String cacheName) {
-            this.tx = tx;
+        private SessionData(@Nullable final IgniteInternalTx tx, @Nullable String cacheName) {
+            this.tx = tx != null ? new TxProxy(tx) : null;
             this.cacheName = cacheName;
         }
 
@@ -936,7 +944,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
          * @return Transaction.
          */
         @Nullable private Transaction transaction() {
-            return tx != null ? tx.proxy() : null;
+            return tx;
         }
 
         /**
@@ -950,12 +958,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
         }
 
         /**
-         * @param attachment Attachment.
+         * @param attach Attachment.
          */
-        private Object attach(Object attachment) {
-            Object prev = this.attachment;
+        private Object attach(Object attach) {
+            Object prev = this.attach;
 
-            this.attachment = attachment;
+            this.attach = attach;
 
             return prev;
         }
@@ -964,7 +972,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
          * @return Attachment.
          */
         private Object attachment() {
-            return attachment;
+            return attach;
         }
 
         /**
@@ -998,7 +1006,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(SessionData.class, this, "tx", CU.txString(tx));
+            return S.toString(SessionData.class, this, "tx", CU.txString(tx != null ? tx.tx : null));
         }
     }
 
@@ -1298,4 +1306,116 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             }
         }
     }
+
+    /**
+     *
+     */
+    private static class TxProxy implements Transaction {
+        /** */
+        private final IgniteInternalTx tx;
+
+        /**
+         * @param tx Transaction.
+         */
+        TxProxy(IgniteInternalTx tx) {
+            assert tx != null;
+
+            this.tx = tx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid xid() {
+            return tx.xid();
+        }
+
+        /** {@inheritDoc} */
+        @Override public UUID nodeId() {
+            return tx.nodeId();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long threadId() {
+            return tx.threadId();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long startTime() {
+            return tx.startTime();
+        }
+
+        /** {@inheritDoc} */
+        @Override public TransactionIsolation isolation() {
+            return tx.isolation();
+        }
+
+        /** {@inheritDoc} */
+        @Override public TransactionConcurrency concurrency() {
+            return tx.concurrency();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean implicit() {
+            return tx.implicit();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isInvalidate() {
+            return tx.isInvalidate();
+        }
+
+        /** {@inheritDoc} */
+        @Override public TransactionState state() {
+            return tx.state();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long timeout() {
+            return tx.timeout();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long timeout(long timeout) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean setRollbackOnly() {
+            return tx.setRollbackOnly();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isRollbackOnly() {
+            return tx.isRollbackOnly();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void commit() throws IgniteException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void rollback() throws IgniteException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteAsyncSupport withAsync() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isAsync() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public <R> IgniteFuture<R> future() {
+            throw new UnsupportedOperationException();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index dd900fe..e3adfc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -57,9 +57,6 @@ public interface IgniteInternalTx extends AutoCloseable {
         /** Transaction is being finalized by user. */
         USER_FINISH,
 
-        /** Recovery request is received, user finish requests should be ignored. */
-        RECOVERY_WAIT,
-
         /** Transaction is being finalized by recovery procedure. */
         RECOVERY_FINISH
     }
@@ -689,11 +686,6 @@ public interface IgniteInternalTx extends AutoCloseable {
     public boolean hasTransforms();
 
     /**
-     * @return Public API proxy.
-     */
-    public TransactionProxy proxy();
-
-    /**
      * @param topVer New topology version.
      */
     public void onRemap(AffinityTopologyVersion topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
index 7c7b5a8..ddafbac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -22,6 +22,7 @@ import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.IgniteTransactionsEx;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.transactions.Transaction;
@@ -91,7 +92,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalTx txStartEx(
+    @Override public GridNearTxLocal txStartEx(
         GridCacheContext ctx,
         TransactionConcurrency concurrency,
         TransactionIsolation isolation,
@@ -113,7 +114,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalTx txStartEx(
+    @Override public GridNearTxLocal txStartEx(
         GridCacheContext ctx,
         TransactionConcurrency concurrency,
         TransactionIsolation isolation)
@@ -141,7 +142,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
      * @return Transaction.
      */
     @SuppressWarnings("unchecked")
-    private IgniteInternalTx txStart0(
+    private GridNearTxLocal txStart0(
         TransactionConcurrency concurrency,
         TransactionIsolation isolation,
         long timeout,
@@ -151,11 +152,12 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
         cctx.kernalContext().gateway().readLock();
 
         try {
-            IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx);
+            GridNearTxLocal tx = cctx.tm().userTx(sysCacheCtx);
 
             if (tx != null)
                 throw new IllegalStateException("Failed to start new transaction " +
                     "(current thread already has a transaction): " + tx);
+
             tx = cctx.tm().newTx(
                 false,
                 false,
@@ -178,7 +180,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
 
     /** {@inheritDoc} */
     @Nullable @Override public Transaction tx() {
-        IgniteInternalTx tx = cctx.tm().userTx();
+        GridNearTxLocal tx = cctx.tm().userTx();
 
         return tx != null ? tx.proxy() : null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b07a117..98f1140 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -251,10 +251,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     /** Store used flag. */
     protected boolean storeEnabled = true;
 
-    /** */
-    @GridToStringExclude
-    private TransactionProxyImpl proxy;
-
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -546,15 +542,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
 
                 break;
 
-            case RECOVERY_WAIT:
-                FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT);
-
-                FinalizationStatus cur = finalizing;
-
-                res = cur == FinalizationStatus.RECOVERY_WAIT || cur == FinalizationStatus.RECOVERY_FINISH;
-
-                break;
-
             case RECOVERY_FINISH:
                 FinalizationStatus old = finalizing;
 
@@ -564,7 +551,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
 
             default:
                 throw new IllegalArgumentException("Cannot set finalization status: " + status);
-
         }
 
         if (res) {
@@ -1257,7 +1243,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
      * @throws IgniteCheckedException If batch update failed.
      */
     @SuppressWarnings({"CatchGenericClass"})
-    protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
+    protected final void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
         if (!storeEnabled() || internal() ||
             (!local() && near())) // No need to work with local store at GridNearTxRemote.
             return;
@@ -1806,14 +1792,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     }
 
     /** {@inheritDoc} */
-    @Override public TransactionProxy proxy() {
-        if (proxy == null)
-            proxy = new TransactionProxyImpl(this, cctx, false);
-
-        return proxy;
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
         return o == this || (o instanceof IgniteTxAdapter && xidVer.equals(((IgniteTxAdapter)o).xidVer));
     }
@@ -2398,11 +2376,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public TransactionProxy proxy() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Override public boolean equals(Object o) {
             return this == o || o instanceof IgniteInternalTx && xid.equals(((IgniteInternalTx)o).xid());
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 14a7ed0..e1d12af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -423,7 +423,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     /**
      * @param val Value to set.
      */
-    void setAndMarkValid(CacheObject val) {
+    public void setAndMarkValid(CacheObject val) {
         setAndMarkValid(op(), val, this.val.hasWriteValue(), this.val.hasReadValue());
     }
 
@@ -451,7 +451,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
      * Marks this entry as value-has-bean-read. Effectively, makes values enlisted to transaction visible
      * to further peek operations.
      */
-    void markValid() {
+    public void markValid() {
         prevVal.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 56a7fa2..331ca31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1551,7 +1551,7 @@ public class IgniteTxHandler {
 
             // Prepare prior to reordering, so the pending locks added
             // in prepare phase will get properly ordered as well.
-            tx.prepare();
+            tx.prepareRemoteTx();
 
             if (req.last()) {
                 assert !F.isEmpty(req.transactionNodes()) :
@@ -1644,7 +1644,7 @@ public class IgniteTxHandler {
 
             // Prepare prior to reordering, so the pending locks added
             // in prepare phase will get properly ordered as well.
-            tx.prepare();
+            tx.prepareRemoteTx();
 
             if (req.last())
                 tx.state(PREPARED);