You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/14 15:16:05 UTC
[3/5] ignite git commit: ignite-4768 txs
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index d448446..7a69a6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -27,10 +27,12 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.UUID;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
@@ -61,11 +63,17 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -904,7 +912,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
private static class SessionData {
/** */
@GridToStringExclude
- private final IgniteInternalTx tx;
+ private final TxProxy tx;
/** */
private String cacheName;
@@ -914,7 +922,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
private Map<Object, Object> props;
/** */
- private Object attachment;
+ private Object attach;
/** */
private final Set<CacheStoreManager> started =
@@ -927,8 +935,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
* @param tx Current transaction.
* @param cacheName Cache name.
*/
- private SessionData(@Nullable IgniteInternalTx tx, @Nullable String cacheName) {
- this.tx = tx;
+ private SessionData(@Nullable final IgniteInternalTx tx, @Nullable String cacheName) {
+ this.tx = tx != null ? new TxProxy(tx) : null;
this.cacheName = cacheName;
}
@@ -936,7 +944,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
* @return Transaction.
*/
@Nullable private Transaction transaction() {
- return tx != null ? tx.proxy() : null;
+ return tx;
}
/**
@@ -950,12 +958,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/**
- * @param attachment Attachment.
+ * @param attach Attachment.
*/
- private Object attach(Object attachment) {
- Object prev = this.attachment;
+ private Object attach(Object attach) {
+ Object prev = this.attach;
- this.attachment = attachment;
+ this.attach = attach;
return prev;
}
@@ -964,7 +972,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
* @return Attachment.
*/
private Object attachment() {
- return attachment;
+ return attach;
}
/**
@@ -998,7 +1006,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(SessionData.class, this, "tx", CU.txString(tx));
+ return S.toString(SessionData.class, this, "tx", CU.txString(tx != null ? tx.tx : null));
}
}
@@ -1298,4 +1306,116 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
}
}
+
+ /**
+ *
+ */
+ private static class TxProxy implements Transaction {
+ /** */
+ private final IgniteInternalTx tx;
+
+ /**
+ * @param tx Transaction.
+ */
+ TxProxy(IgniteInternalTx tx) {
+ assert tx != null;
+
+ this.tx = tx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid xid() {
+ return tx.xid();
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID nodeId() {
+ return tx.nodeId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long threadId() {
+ return tx.threadId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return tx.startTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public TransactionIsolation isolation() {
+ return tx.isolation();
+ }
+
+ /** {@inheritDoc} */
+ @Override public TransactionConcurrency concurrency() {
+ return tx.concurrency();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean implicit() {
+ return tx.implicit();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isInvalidate() {
+ return tx.isInvalidate();
+ }
+
+ /** {@inheritDoc} */
+ @Override public TransactionState state() {
+ return tx.state();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long timeout() {
+ return tx.timeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long timeout(long timeout) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean setRollbackOnly() {
+ return tx.setRollbackOnly();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isRollbackOnly() {
+ return tx.isRollbackOnly();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commit() throws IgniteException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollback() throws IgniteException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteAsyncSupport withAsync() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isAsync() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> IgniteFuture<R> future() {
+ throw new UnsupportedOperationException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index dd900fe..e3adfc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -57,9 +57,6 @@ public interface IgniteInternalTx extends AutoCloseable {
/** Transaction is being finalized by user. */
USER_FINISH,
- /** Recovery request is received, user finish requests should be ignored. */
- RECOVERY_WAIT,
-
/** Transaction is being finalized by recovery procedure. */
RECOVERY_FINISH
}
@@ -689,11 +686,6 @@ public interface IgniteInternalTx extends AutoCloseable {
public boolean hasTransforms();
/**
- * @return Public API proxy.
- */
- public TransactionProxy proxy();
-
- /**
* @param topVer New topology version.
*/
public void onRemap(AffinityTopologyVersion topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
index 7c7b5a8..ddafbac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -22,6 +22,7 @@ import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.IgniteTransactionsEx;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.transactions.Transaction;
@@ -91,7 +92,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
}
/** {@inheritDoc} */
- @Override public IgniteInternalTx txStartEx(
+ @Override public GridNearTxLocal txStartEx(
GridCacheContext ctx,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
@@ -113,7 +114,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
}
/** {@inheritDoc} */
- @Override public IgniteInternalTx txStartEx(
+ @Override public GridNearTxLocal txStartEx(
GridCacheContext ctx,
TransactionConcurrency concurrency,
TransactionIsolation isolation)
@@ -141,7 +142,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
* @return Transaction.
*/
@SuppressWarnings("unchecked")
- private IgniteInternalTx txStart0(
+ private GridNearTxLocal txStart0(
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
@@ -151,11 +152,12 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
cctx.kernalContext().gateway().readLock();
try {
- IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx);
+ GridNearTxLocal tx = cctx.tm().userTx(sysCacheCtx);
if (tx != null)
throw new IllegalStateException("Failed to start new transaction " +
"(current thread already has a transaction): " + tx);
+
tx = cctx.tm().newTx(
false,
false,
@@ -178,7 +180,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
/** {@inheritDoc} */
@Nullable @Override public Transaction tx() {
- IgniteInternalTx tx = cctx.tm().userTx();
+ GridNearTxLocal tx = cctx.tm().userTx();
return tx != null ? tx.proxy() : null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b07a117..98f1140 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -251,10 +251,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
/** Store used flag. */
protected boolean storeEnabled = true;
- /** */
- @GridToStringExclude
- private TransactionProxyImpl proxy;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -546,15 +542,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
break;
- case RECOVERY_WAIT:
- FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT);
-
- FinalizationStatus cur = finalizing;
-
- res = cur == FinalizationStatus.RECOVERY_WAIT || cur == FinalizationStatus.RECOVERY_FINISH;
-
- break;
-
case RECOVERY_FINISH:
FinalizationStatus old = finalizing;
@@ -564,7 +551,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
default:
throw new IllegalArgumentException("Cannot set finalization status: " + status);
-
}
if (res) {
@@ -1257,7 +1243,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
* @throws IgniteCheckedException If batch update failed.
*/
@SuppressWarnings({"CatchGenericClass"})
- protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
+ protected final void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
if (!storeEnabled() || internal() ||
(!local() && near())) // No need to work with local store at GridNearTxRemote.
return;
@@ -1806,14 +1792,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public TransactionProxy proxy() {
- if (proxy == null)
- proxy = new TransactionProxyImpl(this, cctx, false);
-
- return proxy;
- }
-
- /** {@inheritDoc} */
@Override public boolean equals(Object o) {
return o == this || (o instanceof IgniteTxAdapter && xidVer.equals(((IgniteTxAdapter)o).xidVer));
}
@@ -2398,11 +2376,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
- @Override public TransactionProxy proxy() {
- return null;
- }
-
- /** {@inheritDoc} */
@Override public boolean equals(Object o) {
return this == o || o instanceof IgniteInternalTx && xid.equals(((IgniteInternalTx)o).xid());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 14a7ed0..e1d12af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -423,7 +423,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/**
* @param val Value to set.
*/
- void setAndMarkValid(CacheObject val) {
+ public void setAndMarkValid(CacheObject val) {
setAndMarkValid(op(), val, this.val.hasWriteValue(), this.val.hasReadValue());
}
@@ -451,7 +451,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
* Marks this entry as value-has-bean-read. Effectively, makes values enlisted to transaction visible
* to further peek operations.
*/
- void markValid() {
+ public void markValid() {
prevVal.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 56a7fa2..331ca31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1551,7 +1551,7 @@ public class IgniteTxHandler {
// Prepare prior to reordering, so the pending locks added
// in prepare phase will get properly ordered as well.
- tx.prepare();
+ tx.prepareRemoteTx();
if (req.last()) {
assert !F.isEmpty(req.transactionNodes()) :
@@ -1644,7 +1644,7 @@ public class IgniteTxHandler {
// Prepare prior to reordering, so the pending locks added
// in prepare phase will get properly ordered as well.
- tx.prepare();
+ tx.prepareRemoteTx();
if (req.last())
tx.state(PREPARED);