You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/20 17:45:58 UTC
[5/6] ignite git commit: IGNITE-7910 Transaction rollback on PME
IGNITE-7910 Transaction rollback on PME
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee5fbce3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee5fbce3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee5fbce3
Branch: refs/heads/ignite-2.5.1.b5
Commit: ee5fbce35f95ede8f115a21a785adccb7f85aca6
Parents: 553e0ff
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Apr 20 20:18:33 2018 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Apr 20 20:18:34 2018 +0300
----------------------------------------------------------------------
.../ignite/tests/utils/TestTransaction.java | 5 +
.../java/org/apache/ignite/IgniteCluster.java | 7 +
.../org/apache/ignite/IgniteTransactions.java | 23 +-
.../configuration/TransactionConfiguration.java | 41 +
.../ignite/internal/TransactionsMXBeanImpl.java | 162 ++++
.../cluster/IgniteClusterAsyncImpl.java | 5 +
.../internal/cluster/IgniteClusterImpl.java | 15 +
.../ignite/internal/commandline/Arguments.java | 16 +-
.../ignite/internal/commandline/Command.java | 5 +-
.../internal/commandline/CommandHandler.java | 345 ++++++-
.../optimized/OptimizedMarshallerUtils.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 88 +-
.../GridCachePartitionExchangeManager.java | 29 +-
.../processors/cache/GridCacheProcessor.java | 103 ++-
.../cache/GridCacheSharedContext.java | 19 +-
.../processors/cache/GridCacheUtils.java | 4 +-
...eoutOnPartitionMapExchangeChangeMessage.java | 128 +++
...TimeoutOnPartitionMapExchangeChangeTask.java | 57 ++
.../distributed/GridCacheTxFinishSync.java | 3 +-
.../distributed/GridDistributedTxMapping.java | 2 +-
.../GridDistributedTxRemoteAdapter.java | 23 +-
.../distributed/dht/GridDhtLockFuture.java | 363 ++++----
.../dht/GridDhtTransactionalCacheAdapter.java | 48 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 29 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 187 +++-
.../distributed/dht/GridDhtTxPrepareFuture.java | 2 +-
.../dht/GridDhtTxPrepareRequest.java | 36 +-
.../dht/colocated/GridDhtColocatedCache.java | 4 +-
.../colocated/GridDhtColocatedLockFuture.java | 121 ++-
.../GridDhtPartitionsExchangeFuture.java | 25 +-
.../distributed/near/GridNearLockFuture.java | 115 ++-
...arOptimisticSerializableTxPrepareFuture.java | 6 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 3 +-
.../near/GridNearTxFastFinishFuture.java | 12 +-
.../near/GridNearTxFinishFuture.java | 96 +-
.../cache/distributed/near/GridNearTxLocal.java | 395 +++++---
.../cache/local/GridLocalLockFuture.java | 80 +-
.../GridCacheDatabaseSharedManager.java | 1 -
.../store/GridCacheStoreManagerAdapter.java | 5 +
.../transactions/IgniteTransactionsImpl.java | 36 +-
.../cache/transactions/IgniteTxAdapter.java | 19 +-
.../cache/transactions/IgniteTxHandler.java | 7 +-
.../transactions/IgniteTxLocalAdapter.java | 20 +-
.../cache/transactions/IgniteTxManager.java | 121 ++-
.../transactions/TransactionProxyImpl.java | 9 +
.../TransactionProxyRollbackOnlyImpl.java | 80 ++
.../internal/util/GridPartitionStateMap.java | 2 +-
.../ignite/internal/visor/tx/VisorTxInfo.java | 130 +++
.../internal/visor/tx/VisorTxOperation.java | 43 +
.../internal/visor/tx/VisorTxProjection.java | 42 +
.../internal/visor/tx/VisorTxSortOrder.java | 55 ++
.../ignite/internal/visor/tx/VisorTxTask.java | 248 +++++
.../internal/visor/tx/VisorTxTaskArg.java | 205 +++++
.../internal/visor/tx/VisorTxTaskResult.java | 80 ++
.../ignite/mxbean/TransactionsMXBean.java | 99 ++
.../apache/ignite/transactions/Transaction.java | 18 +-
.../resources/META-INF/classnames.properties | 7 +
.../internal/TestRecordingCommunicationSpi.java | 3 +-
.../internal/TransactionsMXBeanImplTest.java | 118 +++
.../commandline/CommandHandlerParsingTest.java | 100 ++
.../SetTxTimeoutOnPartitionMapExchangeTest.java | 166 ++++
.../CacheLateAffinityAssignmentTest.java | 5 -
.../IgniteTxRemoveTimeoutObjectsNearTest.java | 30 +
.../IgniteTxRemoveTimeoutObjectsTest.java | 6 +
...ePrimaryNodeFailureRecoveryAbstractTest.java | 2 +-
.../dht/IgniteCacheTxRecoveryRollbackTest.java | 2 +-
.../near/GridCacheNearTxMultiNodeSelfTest.java | 2 +-
.../GridCachePartitionedTxSalvageSelfTest.java | 8 +-
.../GridCacheLocalTxMultiThreadedSelfTest.java | 2 +-
.../cache/transactions/TxLabelTest.java | 63 ++
.../transactions/TxMultiCacheAsyncOpsTest.java | 136 +++
...OptimisticPrepareOnUnstableTopologyTest.java | 250 +++++
.../TxPessimisticDeadlockDetectionTest.java | 2 +-
.../TxRollbackAsyncNearCacheTest.java | 28 +
.../cache/transactions/TxRollbackAsyncTest.java | 919 +++++++++++++++++++
.../TxRollbackAsyncWithPersistenceTest.java | 59 ++
...ollbackOnTimeoutNoDeadlockDetectionTest.java | 6 +-
.../transactions/TxRollbackOnTimeoutTest.java | 118 ++-
.../TxRollbackOnTopologyChangeTest.java | 228 +++++
.../igfs/IgfsDataManagerSelfTest.java | 6 +-
.../cache/GridAbstractCacheStoreSelfTest.java | 7 +-
.../multijvm/IgniteClusterProcessProxy.java | 5 +
.../ignite/testsuites/IgniteBasicTestSuite.java | 4 +
.../testsuites/IgniteCacheTestSuite3.java | 2 +
.../testsuites/IgniteCacheTestSuite6.java | 18 +
.../ignite/util/GridCommandHandlerTest.java | 247 +++++
.../processors/cache/jta/CacheJtaManager.java | 3 +-
.../ApiParity/ClusterParityTest.cs | 1 +
.../ApiParity/TransactionsParityTest.cs | 13 +-
.../Cache/CacheAbstractTransactionalTest.cs | 18 +-
90 files changed, 5599 insertions(+), 809 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
index e587bd7..be2211f 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
@@ -147,6 +147,11 @@ public class TestTransaction implements Transaction {
}
/** {@inheritDoc} */
+ @Nullable @Override public String label() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public void resume() throws IgniteException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
index 7329d68..b501333 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
@@ -421,6 +421,13 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport {
public void enableStatistics(Collection<String> caches, boolean enabled);
/**
+ * Sets transaction timeout on partition map exchange.
+ *
+ * @param timeout Transaction timeout on partition map exchange in milliseconds.
+ */
+ public void setTxTimeoutOnPartitionMapExchange(long timeout);
+
+ /**
* If local client node disconnected from cluster returns future
* that will be completed when client reconnected.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
index dfe6a1a..2bb7101 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
@@ -17,6 +17,7 @@
package org.apache.ignite;
+import java.util.Collection;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.transactions.Transaction;
@@ -103,4 +104,24 @@ public interface IgniteTransactions {
* Resets transaction metrics.
*/
public void resetMetrics();
-}
\ No newline at end of file
+
+ /**
+ * Returns a list of active transactions initiated by this node.
+ * <p>
+ * Note: returned transaction handle will only support getters, {@link Transaction#close()},
+ * {@link Transaction#rollback()}, {@link Transaction#rollbackAsync()} methods.
+ * Trying to invoke other methods will lead to UnsupportedOperationException.
+ *
+ * @return Transactions started on local node.
+ */
+ public Collection<Transaction> localActiveTransactions();
+
+ /**
+ * Returns instance of Ignite Transactions to mark a transaction with a special label.
+ *
+ * @param lb label.
+ * @return {@code This} for chaining.
+ * @throws NullPointerException if label is null.
+ */
+ public IgniteTransactions withLabel(String lb);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
index 0063afc..818f823 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
@@ -42,6 +42,9 @@ public class TransactionConfiguration implements Serializable {
/** Default transaction timeout. */
public static final long DFLT_TRANSACTION_TIMEOUT = 0;
+ /** Transaction timeout on partition map synchronization. */
+ public static final long TX_TIMEOUT_ON_PARTITION_MAP_EXCHANGE = 0;
+
/** Default size of pessimistic transactions log. */
public static final int DFLT_PESSIMISTIC_TX_LOG_LINGER = 10_000;
@@ -57,6 +60,9 @@ public class TransactionConfiguration implements Serializable {
/** Default transaction timeout. */
private long dfltTxTimeout = DFLT_TRANSACTION_TIMEOUT;
+ /** Transaction timeout on partition map exchange. */
+ private volatile long txTimeoutOnPartitionMapExchange = TX_TIMEOUT_ON_PARTITION_MAP_EXCHANGE;
+
/** Pessimistic tx log size. */
private int pessimisticTxLogSize;
@@ -89,6 +95,7 @@ public class TransactionConfiguration implements Serializable {
dfltConcurrency = cfg.getDefaultTxConcurrency();
dfltIsolation = cfg.getDefaultTxIsolation();
dfltTxTimeout = cfg.getDefaultTxTimeout();
+ txTimeoutOnPartitionMapExchange = cfg.getTxTimeoutOnPartitionMapExchange();
pessimisticTxLogLinger = cfg.getPessimisticTxLogLinger();
pessimisticTxLogSize = cfg.getPessimisticTxLogSize();
txSerEnabled = cfg.isTxSerializableEnabled();
@@ -192,6 +199,40 @@ public class TransactionConfiguration implements Serializable {
}
/**
+ * Some Ignite operations provoke partition map exchange process within Ignite to ensure the partitions distribution
+ * state is synchronized cluster-wide. Topology update events and a start of a new distributed cache are examples
+ * of those operations.
+ * <p>
+ * When the partition map exchange starts, Ignite acquires a global lock at a particular stage. The lock can't be
+ * obtained until pending transactions are running in parallel. If there is a transaction that runs for a while,
+ * then it will prevent the partition map exchange process from the start freezing some operations such as a new
+ * node join process.
+ * <p>
+ * This property allows to rollback such long transactions to let Ignite acquire the lock faster and initiate the
+ * partition map exchange process. The timeout is enforced only at the time of the partition map exchange process.
+ * <p>
+ * If not set, default value is {@link #TX_TIMEOUT_ON_PARTITION_MAP_EXCHANGE} which means transactions will never be
+ * rolled back on partition map exchange.
+ *
+ * @return Transaction timeout for partition map synchronization in milliseconds.
+ */
+ public long getTxTimeoutOnPartitionMapExchange() {
+ return txTimeoutOnPartitionMapExchange;
+ }
+
+ /**
+ * Sets the transaction timeout that will be enforced if the partition map exchange process starts.
+ *
+ * @param txTimeoutOnPartitionMapExchange Transaction timeout value in milliseconds.
+ * @return {@code this} for chaining.
+ */
+ public TransactionConfiguration setTxTimeoutOnPartitionMapExchange(long txTimeoutOnPartitionMapExchange) {
+ this.txTimeoutOnPartitionMapExchange = txTimeoutOnPartitionMapExchange;
+
+ return this;
+ }
+
+ /**
* Gets size of pessimistic transactions log stored on node in order to recover transaction commit if originating
* node has left grid before it has sent all messages to transaction nodes.
* <p>
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
new file mode 100644
index 0000000..6937ebd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.tx.VisorTxInfo;
+import org.apache.ignite.internal.visor.tx.VisorTxOperation;
+import org.apache.ignite.internal.visor.tx.VisorTxProjection;
+import org.apache.ignite.internal.visor.tx.VisorTxSortOrder;
+import org.apache.ignite.internal.visor.tx.VisorTxTask;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskResult;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.mxbean.TransactionsMXBean;
+
+/**
+ * TransactionsMXBean implementation.
+ */
+public class TransactionsMXBeanImpl implements TransactionsMXBean {
+ /** */
+ private final GridKernalContextImpl ctx;
+
+ /**
+ * @param ctx Context.
+ */
+ TransactionsMXBeanImpl(GridKernalContextImpl ctx) {
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getActiveTransactions(Long minDuration, Integer minSize, String prj, String consistentIds,
+ String xid, String lbRegex, Integer limit, String order, boolean detailed, boolean kill) {
+ try {
+ IgniteCompute compute = ctx.cluster().get().compute();
+
+ VisorTxProjection proj = null;
+
+ if (prj != null) {
+ if ("clients".equals(prj))
+ proj = VisorTxProjection.CLIENT;
+ else if ("servers".equals(prj))
+ proj = VisorTxProjection.SERVER;
+ }
+
+ List<String> consIds = null;
+
+ if (consistentIds != null)
+ consIds = Arrays.stream(consistentIds.split(",")).collect(Collectors.toList());
+
+ VisorTxSortOrder sortOrder = null;
+
+ if (order != null) {
+ if ("DURATION".equals(order))
+ sortOrder = VisorTxSortOrder.DURATION;
+ else if ("SIZE".equals(order))
+ sortOrder = VisorTxSortOrder.SIZE;
+ }
+
+ VisorTxTaskArg arg = new VisorTxTaskArg(kill ? VisorTxOperation.KILL : VisorTxOperation.LIST,
+ limit, minDuration == null ? null : minDuration * 1000, minSize, null, proj, consIds, xid, lbRegex, sortOrder);
+
+ Map<ClusterNode, VisorTxTaskResult> res = compute.execute(new VisorTxTask(),
+ new VisorTaskArgument<>(ctx.cluster().get().localNode().id(), arg, false));
+
+ if (detailed) {
+ StringWriter sw = new StringWriter();
+
+ PrintWriter w = new PrintWriter(sw);
+
+ for (Map.Entry<ClusterNode, VisorTxTaskResult> entry : res.entrySet()) {
+ if (entry.getValue().getInfos().isEmpty())
+ continue;
+
+ ClusterNode key = entry.getKey();
+
+ w.println(key.toString());
+
+ for (VisorTxInfo info : entry.getValue().getInfos())
+ w.println(" Tx: [xid=" + info.getXid() +
+ ", label=" + info.getLabel() +
+ ", state=" + info.getState() +
+ ", duration=" + info.getDuration() / 1000 +
+ ", isolation=" + info.getIsolation() +
+ ", concurrency=" + info.getConcurrency() +
+ ", timeout=" + info.getTimeout() +
+ ", size=" + info.getSize() +
+ ", dhtNodes=" + F.transform(info.getPrimaryNodes(), new IgniteClosure<UUID, String>() {
+ @Override public String apply(UUID id) {
+ return U.id8(id);
+ }
+ }) +
+ ']');
+ }
+
+ w.flush();
+
+ return sw.toString();
+ }
+ else {
+ int cnt = 0;
+
+ for (VisorTxTaskResult result : res.values())
+ cnt += result.getInfos().size();
+
+ return Integer.toString(cnt);
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getTxTimeoutOnPartitionMapExchange() {
+ return ctx.config().getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setTxTimeoutOnPartitionMapExchange(long timeout) {
+ try {
+ ctx.grid().context().cache().setTxTimeoutOnPartitionMapExchange(timeout);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TransactionsMXBeanImpl.class, this);
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
index 43e97b5..98fb8ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
@@ -172,6 +172,11 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
}
/** {@inheritDoc} */
+ @Override public void setTxTimeoutOnPartitionMapExchange(long timeout) {
+ cluster.setTxTimeoutOnPartitionMapExchange(timeout);
+ }
+
+ /** {@inheritDoc} */
@Override public Ignite ignite() {
return cluster.ignite();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index b69923b..e28342c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -502,6 +502,21 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
}
/** {@inheritDoc} */
+ @Override public void setTxTimeoutOnPartitionMapExchange(long timeout) {
+ guard();
+
+ try {
+ ctx.cache().setTxTimeoutOnPartitionMapExchange(timeout);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteCluster withAsync() {
return new IgniteClusterAsyncImpl(this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/commandline/Arguments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/Arguments.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/Arguments.java
index 23c8eec..660edf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/Arguments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/Arguments.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.commandline;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
+
/**
* Bean with all parsed and validated arguments.
*/
@@ -49,6 +51,9 @@ public class Arguments {
*/
private String baselineArgs;
+ /** Transaction arguments. */
+ private final VisorTxTaskArg txArg;
+
/**
* @param cmd Command.
* @param host Host.
@@ -57,11 +62,12 @@ public class Arguments {
* @param pwd Password.
* @param baselineAct Baseline action.
* @param baselineArgs Baseline args.
+ * @param txArg TX arg.
* @param force Force flag.
*/
public Arguments(Command cmd, String host, String port, String user, String pwd,
String baselineAct, String baselineArgs,
- boolean force
+ VisorTxTaskArg txArg, boolean force
) {
this.cmd = cmd;
this.host = host;
@@ -71,6 +77,7 @@ public class Arguments {
this.baselineAct = baselineAct;
this.baselineArgs = baselineArgs;
this.force = force;
+ this.txArg = txArg;
}
/**
@@ -123,6 +130,13 @@ public class Arguments {
}
/**
+ * @return Transaction arguments.
+ */
+ public VisorTxTaskArg transactionArguments() {
+ return txArg;
+ }
+
+ /**
* @return Force option.
*/
public boolean force() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/commandline/Command.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/Command.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/Command.java
index e73a24f..c8c7db5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/Command.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/Command.java
@@ -31,7 +31,10 @@ public enum Command {
STATE("--state"),
/** */
- BASELINE("--baseline");
+ BASELINE("--baseline"),
+
+ /** */
+ TX("--tx");
/** */
private final String text;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
index 5993f59..ba267d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
@@ -24,6 +24,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
+import java.util.UUID;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridClientAuthenticationException;
import org.apache.ignite.internal.client.GridClientClosedException;
@@ -39,15 +44,23 @@ import org.apache.ignite.internal.client.GridServerUnreachableException;
import org.apache.ignite.internal.client.impl.connection.GridClientConnectionResetException;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.internal.visor.baseline.VisorBaselineNode;
import org.apache.ignite.internal.visor.baseline.VisorBaselineOperation;
import org.apache.ignite.internal.visor.baseline.VisorBaselineTask;
import org.apache.ignite.internal.visor.baseline.VisorBaselineTaskArg;
import org.apache.ignite.internal.visor.baseline.VisorBaselineTaskResult;
+import org.apache.ignite.internal.visor.tx.VisorTxInfo;
+import org.apache.ignite.internal.visor.tx.VisorTxOperation;
+import org.apache.ignite.internal.visor.tx.VisorTxProjection;
+import org.apache.ignite.internal.visor.tx.VisorTxSortOrder;
+import org.apache.ignite.internal.visor.tx.VisorTxTask;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskResult;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.plugin.security.SecurityCredentialsBasicProvider;
-import org.jetbrains.annotations.NotNull;
import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
@@ -55,6 +68,7 @@ import static org.apache.ignite.internal.commandline.Command.ACTIVATE;
import static org.apache.ignite.internal.commandline.Command.BASELINE;
import static org.apache.ignite.internal.commandline.Command.DEACTIVATE;
import static org.apache.ignite.internal.commandline.Command.STATE;
+import static org.apache.ignite.internal.commandline.Command.TX;
import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.ADD;
import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.COLLECT;
import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.REMOVE;
@@ -65,6 +79,9 @@ import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.V
* Class that execute several commands passed via command line.
*/
public class CommandHandler {
+ /** Logger. */
+ private static final Logger log = Logger.getLogger(CommandHandler.class.getName());
+
/** */
static final String DFLT_HOST = "127.0.0.1";
@@ -87,6 +104,9 @@ public class CommandHandler {
private static final String CMD_USER = "--user";
/** */
+ public static final String CONFIRM_MSG = "yes";
+
+ /** */
private static final String BASELINE_ADD = "add";
/** */
@@ -126,11 +146,44 @@ public class CommandHandler {
private static final Scanner IN = new Scanner(System.in);
/** */
+ private static final String TX_LIMIT = "limit";
+
+ /** */
+ private static final String TX_ORDER = "order";
+
+ /** */
+ private static final String TX_SERVERS = "servers";
+
+ /** */
+ private static final String TX_CLIENTS = "clients";
+
+ /** */
+ private static final String TX_DURATION = "minDuration";
+
+ /** */
+ private static final String TX_SIZE = "minSize";
+
+ /** */
+ private static final String TX_LABEL = "label";
+
+ /** */
+ private static final String TX_NODES = "nodes";
+
+ /** */
+ private static final String TX_XID = "xid";
+
+ /** */
+ private static final String TX_KILL = "kill";
+
+ /** */
private Iterator<String> argsIt;
/** */
private String peekedArg;
+ /** */
+ private Object lastOperationResult;
+
/**
* Output specified string to console.
*
@@ -198,7 +251,7 @@ public class CommandHandler {
if (prompt == null)
return true;
- return "y".equalsIgnoreCase(readLine(prompt));
+ return CONFIRM_MSG.equalsIgnoreCase(readLine(prompt));
}
/**
@@ -219,9 +272,15 @@ public class CommandHandler {
case BASELINE:
if (!BASELINE_COLLECT.equals(args.baselineAction()))
str = "Warning: the command will perform changes in baseline.";
+ break;
+
+ case TX:
+ if (args.transactionArguments().getOperation() == VisorTxOperation.KILL)
+ str = "Warning: the command will kill some transactions.";
+ break;
}
- return str == null ? null : str + "\nPress 'y' to continue...";
+ return str == null ? null : str + "\nPress 'yes' to continue . . . ";
}
/**
@@ -301,14 +360,39 @@ public class CommandHandler {
}
/**
+ * @param client Client.
+ * @param arg Task argument.
+ * @return Task result.
+ * @throws GridClientException If failed to execute task.
+ */
+ private Map<UUID, VisorTxTaskResult> executeTransactionsTask(GridClient client,
+ VisorTxTaskArg arg) throws GridClientException {
+
+ return executeTask(client, VisorTxTask.class, arg);
+ }
+
+ /**
*
- * @param client Client
+ * @param client Client.
+ * @param taskCls Task class.
+ * @param taskArgs Task arguments.
* @return Task result.
* @throws GridClientException If failed to execute task.
*/
private <R> R executeTask(GridClient client, Class<?> taskCls, Object taskArgs) throws GridClientException {
GridClientCompute compute = client.compute();
+ GridClientNode node = getBalancedNode(compute);
+
+ return compute.execute(taskCls.getName(),
+ new VisorTaskArgument<>(node.nodeId(), taskArgs, false));
+ }
+
+ /**
+ * @param compute instance
+ * @return balanced node
+ */
+ private GridClientNode getBalancedNode(GridClientCompute compute) throws GridClientException {
List<GridClientNode> nodes = new ArrayList<>();
for (GridClientNode node : compute.nodes())
@@ -318,10 +402,7 @@ public class CommandHandler {
if (F.isEmpty(nodes))
throw new GridClientDisconnectedException("Connectable node not found", null);
- GridClientNode node = compute.balancer().balancedNode(nodes);
-
- return compute.projection(node).execute(taskCls.getName(),
- new VisorTaskArgument<>(node.nodeId(), taskArgs, false));
+ return compute.balancer().balancedNode(nodes);
}
/**
@@ -368,13 +449,7 @@ public class CommandHandler {
case ADD:
case REMOVE:
case SET:
- if(F.isEmpty(s))
- throw new IllegalArgumentException("Empty list of consistent IDs");
-
- List<String> consistentIds = new ArrayList<>();
-
- for (String consistentId : s.split(","))
- consistentIds.add(consistentId.trim());
+ List<String> consistentIds = getConsistentIds(s);
return new VisorBaselineTaskArg(op, -1, consistentIds);
@@ -394,6 +469,22 @@ public class CommandHandler {
}
/**
+ * @param s String of consisted ids delimited by comma.
+ * @return List of consistent ids.
+ */
+ private List<String> getConsistentIds(String s) {
+ if (F.isEmpty(s))
+ throw new IllegalArgumentException("Empty list of consistent IDs");
+
+ List<String> consistentIds = new ArrayList<>();
+
+ for (String consistentId : s.split(","))
+ consistentIds.add(consistentId.trim());
+
+ return consistentIds;
+ }
+
+ /**
* Print baseline topology.
*
* @param res Task result with baseline topology.
@@ -404,6 +495,7 @@ public class CommandHandler {
nl();
Map<String, VisorBaselineNode> baseline = res.getBaseline();
+
Map<String, VisorBaselineNode> servers = res.getServers();
if (F.isEmpty(baseline))
@@ -532,6 +624,57 @@ public class CommandHandler {
}
/**
+ * Dump transactions information.
+ *
+ * @param client Client.
+ * @param arg Transaction search arguments
+ */
+ private void transactions(GridClient client, VisorTxTaskArg arg) throws GridClientException {
+ try {
+ Map<ClusterNode, VisorTxTaskResult> res = executeTask(client, VisorTxTask.class, arg);
+
+ lastOperationResult = res;
+
+ if (res.isEmpty())
+ log("Nothing found.");
+ else if (arg.getOperation() == VisorTxOperation.KILL)
+ log("Killed transactions:");
+ else
+ log("Matching transactions:");
+
+ for (Map.Entry<ClusterNode, VisorTxTaskResult> entry : res.entrySet()) {
+ if (entry.getValue().getInfos().isEmpty())
+ continue;
+
+ ClusterNode key = entry.getKey();
+
+ log(key.toString());
+
+ for (VisorTxInfo info : entry.getValue().getInfos())
+ log(" Tx: [xid=" + info.getXid() +
+ ", label=" + info.getLabel() +
+ ", state=" + info.getState() +
+ ", duration=" + info.getDuration() / 1000 +
+ ", isolation=" + info.getIsolation() +
+ ", concurrency=" + info.getConcurrency() +
+ ", timeout=" + info.getTimeout() +
+ ", size=" + info.getSize() +
+ ", dhtNodes=" + F.transform(info.getPrimaryNodes(), new IgniteClosure<UUID, String>() {
+ @Override public String apply(UUID id) {
+ return U.id8(id);
+ }
+ }) +
+ ']');
+ }
+ }
+ catch (Throwable e) {
+ log("Failed to perform operation.");
+
+ throw e;
+ }
+ }
+
+ /**
* @param e Exception to check.
* @return {@code true} if specified exception is {@link GridClientAuthenticationException}.
*/
@@ -603,7 +746,7 @@ public class CommandHandler {
* @return Arguments bean.
* @throws IllegalArgumentException In case arguments aren't valid.
*/
- @NotNull Arguments parseAndValidate(List<String> rawArgs) {
+ Arguments parseAndValidate(List<String> rawArgs) {
String host = DFLT_HOST;
String port = DFLT_PORT;
@@ -622,6 +765,8 @@ public class CommandHandler {
initArgIterator(rawArgs);
+ VisorTxTaskArg txArgs = null;
+
while (hasNextArg()) {
String str = nextArg("").toLowerCase();
@@ -632,7 +777,14 @@ public class CommandHandler {
case ACTIVATE:
case DEACTIVATE:
case STATE:
- commands.add(Command.of(str));
+ commands.add(cmd);
+ break;
+
+ case TX:
+ commands.add(TX);
+
+ txArgs = parseTransactionArguments();
+
break;
case BASELINE:
@@ -652,6 +804,11 @@ public class CommandHandler {
baselineArgs = nextArg("Expected baseline arguments");
}
}
+
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unexpected command: " + str);
}
}
else {
@@ -685,6 +842,7 @@ public class CommandHandler {
case CMD_FORCE:
force = true;
break;
+
default:
throw new IllegalArgumentException("Unexpected argument: " + str);
}
@@ -707,7 +865,138 @@ public class CommandHandler {
if (hasUsr != hasPwd)
throw new IllegalArgumentException("Both user and password should be specified");
- return new Arguments(cmd, host, port, user, pwd, baselineAct, baselineArgs, force);
+ return new Arguments(cmd, host, port, user, pwd, baselineAct, baselineArgs, txArgs, force);
+ }
+
+ /**
+ * @return Transaction arguments.
+ */
+ private VisorTxTaskArg parseTransactionArguments() {
+ VisorTxProjection proj = null;
+
+ Integer limit = null;
+
+ VisorTxSortOrder sortOrder = null;
+
+ Long duration = null;
+
+ Integer size = null;
+
+ String lbRegex = null;
+
+ List<String> consistentIds = null;
+
+ VisorTxOperation op = VisorTxOperation.LIST;
+
+ String xid = null;
+
+ boolean end = false;
+
+ do {
+ String str = peekNextArg();
+
+ if (str == null)
+ break;
+
+ switch (str) {
+ case TX_LIMIT:
+ nextArg("");
+
+ limit = (int) nextLongArg(TX_LIMIT);
+ break;
+
+ case TX_ORDER:
+ nextArg("");
+
+ sortOrder = VisorTxSortOrder.fromString(nextArg(TX_ORDER));
+
+ break;
+
+ case TX_SERVERS:
+ nextArg("");
+
+ proj = VisorTxProjection.SERVER;
+ break;
+
+ case TX_CLIENTS:
+ nextArg("");
+
+ proj = VisorTxProjection.CLIENT;
+ break;
+
+ case TX_NODES:
+ nextArg("");
+
+ consistentIds = getConsistentIds(nextArg(TX_NODES));
+ break;
+
+ case TX_DURATION:
+ nextArg("");
+
+ duration = nextLongArg(TX_DURATION) * 1000L;
+ break;
+
+ case TX_SIZE:
+ nextArg("");
+
+ size = (int) nextLongArg(TX_SIZE);
+ break;
+
+ case TX_LABEL:
+ nextArg("");
+
+ lbRegex = nextArg(TX_LABEL);
+
+ try {
+ Pattern.compile(lbRegex);
+ }
+ catch (PatternSyntaxException e) {
+ throw new IllegalArgumentException("Illegal regex syntax");
+ }
+
+ break;
+
+ case TX_XID:
+ nextArg("");
+
+ xid = nextArg(TX_XID);
+ break;
+
+ case TX_KILL:
+ nextArg("");
+
+ op = VisorTxOperation.KILL;
+ break;
+
+ default:
+ end = true;
+ }
+ }
+ while (!end);
+
+ if (proj != null && consistentIds != null)
+ throw new IllegalArgumentException("Projection can't be used together with list of consistent ids.");
+
+ return new VisorTxTaskArg(op, limit, duration, size, null, proj, consistentIds, xid, lbRegex, sortOrder);
+ }
+
+ /**
+ * @return Numeric value.
+ */
+ private long nextLongArg(String lb) {
+ String str = nextArg("Expecting " + lb);
+
+ try {
+ long val = Long.parseLong(str);
+
+ if (val < 0)
+ throw new IllegalArgumentException("Invalid value for " + lb + ": " + val);
+
+ return val;
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid value for " + lb + ": " + str);
+ }
}
/**
@@ -734,8 +1023,11 @@ public class CommandHandler {
usage(" Remove nodes from baseline topology:", BASELINE, " remove consistentId1[,consistentId2,....,consistentIdN] [--force]");
usage(" Set baseline topology:", BASELINE, " set consistentId1[,consistentId2,....,consistentIdN] [--force]");
usage(" Set baseline topology based on version:", BASELINE, " version topologyVersion [--force]");
+ usage(" List or kill transactions:", TX, " [xid XID] [minDuration SECONDS] " +
+ "[minSize SIZE] [label PATTERN_REGEX] [servers|clients] " +
+ "[nodes consistentId1[,consistentId2,....,consistentIdN] [limit NUMBER] [order DURATION|SIZE] [kill] [--force]");
- log("By default cluster deactivation and changes in baseline topology commands request interactive confirmation. ");
+ log("By default commands affecting the cluster require interactive confirmation. ");
log(" --force option can be used to execute commands without prompting for confirmation.");
nl();
@@ -757,7 +1049,7 @@ public class CommandHandler {
Arguments args = parseAndValidate(rawArgs);
if (!confirm(args)) {
- log("Operation canceled.");
+ log("Operation cancelled.");
return EXIT_CODE_OK;
}
@@ -789,6 +1081,10 @@ public class CommandHandler {
case BASELINE:
baseline(client, args.baselineAction(), args.baselineArguments());
break;
+
+ case TX:
+ transactions(client, args.transactionArguments());
+ break;
}
}
@@ -816,5 +1112,14 @@ public class CommandHandler {
System.exit(hnd.execute(Arrays.asList(args)));
}
+
+ /**
+ * Used for tests.
+ * @return Last operation result;
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T getLastOperationResult() {
+ return (T)lastOperationResult;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
index aa4bfd6..4626df7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
@@ -203,7 +203,7 @@ class OptimizedMarshallerUtils {
try {
registered = ctx.registerClassName(JAVA_ID, typeId, cls.getName());
}
- catch (IgniteCheckedException e) {
+ catch (Exception e) {
throw new IOException("Failed to register class: " + cls.getName(), e);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index bd613a1..44a64f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4081,7 +4081,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
READ_COMMITTED,
tCfg.getDefaultTxTimeout(),
!ctx.skipStore(),
- 0
+ 0,
+ null
);
assert tx != null;
@@ -4105,7 +4106,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
tx.xid(), e);
}
catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
- U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
+ U.error(log, "Failed to rollback transaction (cache may contain stale locks): " +
+ CU.txString(tx), e1);
if (e != e1)
e.addSuppressed(e1);
@@ -4180,7 +4182,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
READ_COMMITTED,
txCfg.getDefaultTxTimeout(),
!skipStore,
- 0);
+ 0,
+ null);
return asyncOp(tx, op, opCtx, /*retry*/false);
}
@@ -4224,6 +4227,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final GridNearTxLocal tx0 = tx;
+ final CX1 clo = new CX1<IgniteInternalFuture<T>, T>() {
+ @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
+ try {
+ return tFut.get();
+ }
+ catch (IgniteTxTimeoutCheckedException | IgniteTxRollbackCheckedException | NodeStoppingException e) {
+ throw e;
+ }
+ catch (IgniteCheckedException e1) {
+ try {
+ tx0.rollbackNearTxLocalAsync();
+ }
+ catch (Throwable e2) {
+ if (e1 != e2)
+ e1.addSuppressed(e2);
+ }
+
+ throw e1;
+ }
+ finally {
+ ctx.shared().txContextReset();
+ }
+ }
+ };
+
if (fut != null && !fut.isDone()) {
IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
new IgniteOutClosure<IgniteInternalFuture>() {
@@ -4233,31 +4261,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
try {
- return op.op(tx0, opCtx).chain(new CX1<IgniteInternalFuture<T>, T>() {
- @Override
- public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
- try {
- return tFut.get();
- }
- catch (IgniteTxRollbackCheckedException | NodeStoppingException e) {
- throw e;
- }
- catch (IgniteCheckedException e1) {
- try {
- tx0.rollbackNearTxLocalAsync();
- }
- catch (Throwable e2) {
- if (e1 != e2)
- e1.addSuppressed(e2);
- }
-
- throw e1;
- }
- finally {
- ctx.shared().txContextReset();
- }
- }
- });
+ return op.op(tx0, opCtx).chain(clo);
}
finally {
// It is necessary to clear tx context in this thread as well.
@@ -4274,30 +4278,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<T> f;
try {
- f = op.op(tx, opCtx).chain(new CX1<IgniteInternalFuture<T>, T>() {
- @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
- try {
- return tFut.get();
- }
- catch (IgniteTxRollbackCheckedException | NodeStoppingException e) {
- throw e;
- }
- catch (IgniteCheckedException e1) {
- try {
- tx0.rollbackNearTxLocalAsync(e1 instanceof IgniteTxTimeoutCheckedException);
- }
- catch (Throwable e2) {
- if (e2 != e1)
- e1.addSuppressed(e2);
- }
-
- throw e1;
- }
- finally {
- ctx.shared().txContextReset();
- }
- }
- });
+ f = op.op(tx, opCtx).chain(clo);
}
finally {
// It is necessary to clear tx context in this thread as well.
@@ -4858,7 +4839,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
READ_COMMITTED,
CU.transactionConfiguration(ctx, ctx.kernalContext().config()).getDefaultTxTimeout(),
opCtx == null || !opCtx.skipStore(),
- 0);
+ 0,
+ null);
IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx, retry);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7ea161a..90f57c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -47,6 +46,7 @@ import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -2417,23 +2417,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
int dumpCnt = 0;
- final long futTimeout = 2 * cctx.gridConfig().getNetworkTimeout();
+ IgniteConfiguration cfg = cctx.gridConfig();
+
+ long rollbackTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+
+ final long dumpTimeout = 2 * cctx.gridConfig().getNetworkTimeout();
long nextDumpTime = 0;
while (true) {
try {
- resVer = exchFut.get(futTimeout, TimeUnit.MILLISECONDS);
+ resVer = exchFut.get(rollbackTimeout > 0 ? rollbackTimeout : dumpTimeout);
break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
- U.warn(diagnosticLog, "Failed to wait for partition map exchange [" +
- "topVer=" + exchFut.initialVersion() +
- ", node=" + cctx.localNodeId() + "]. " +
- "Dumping pending objects that might be the cause: ");
-
if (nextDumpTime <= U.currentTimeMillis()) {
+ U.warn(diagnosticLog, "Failed to wait for partition map exchange [" +
+ "topVer=" + exchFut.initialVersion() +
+ ", node=" + cctx.localNodeId() + "]. " +
+ (rollbackTimeout == 0 ? "Consider changing TransactionConfiguration.txTimeoutOnPartitionMapSynchronization to non default value to avoid this message. " : "") +
+ "Dumping pending objects that might be the cause: ");
+
try {
dumpDebugInfo(exchFut);
}
@@ -2441,7 +2446,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
U.error(diagnosticLog, "Failed to dump debug information: " + e, e);
}
- nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout);
+ nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, dumpTimeout);
+ }
+
+ if (rollbackTimeout > 0) {
+ rollbackTimeout = 0; // Try automatic rollback only once.
+
+ cctx.tm().rollbackOnTopologyChange(exchFut.initialVersion());
}
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index bceb8c7..39c7e71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -229,6 +229,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Enable/disable cache statistics futures. */
private ConcurrentMap<UUID, EnableStatisticsFuture> enableStatisticsFuts = new ConcurrentHashMap<>();
+ /** The futures for changing transaction timeout on partition map exchange. */
+ private ConcurrentMap<UUID, TxTimeoutOnPartitionMapExchangeChangeFuture> txTimeoutOnPartitionMapExchangeFuts =
+ new ConcurrentHashMap<>();
+
/** */
private ClusterCachesInfo cachesInfo;
@@ -380,6 +384,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
processStatisticsModeChange(task0.message());
}
+ else if (task instanceof TxTimeoutOnPartitionMapExchangeChangeTask) {
+ TxTimeoutOnPartitionMapExchangeChangeTask task0 = (TxTimeoutOnPartitionMapExchangeChangeTask)task;
+
+ processTxTimeoutOnPartitionMapExchangeChange(task0.message());
+ }
else if (task instanceof StopCachesOnClientReconnectExchangeTask) {
StopCachesOnClientReconnectExchangeTask task0 = (StopCachesOnClientReconnectExchangeTask)task;
@@ -683,7 +692,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx = createSharedContext(ctx, sessionListeners);
- transactions = new IgniteTransactionsImpl(sharedCtx);
+ transactions = new IgniteTransactionsImpl(sharedCtx, null);
// Start shared managers.
for (GridCacheSharedManager mgr : sharedCtx.managers())
@@ -1057,6 +1066,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (EnableStatisticsFuture fut : enableStatisticsFuts.values())
fut.onDone(err);
+ for (TxTimeoutOnPartitionMapExchangeChangeFuture fut : txTimeoutOnPartitionMapExchangeFuts.values())
+ fut.onDone(err);
+
for (CacheGroupContext grp : cacheGrps.values())
grp.onDisconnected(reconnectFut);
@@ -2618,6 +2630,43 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Callback invoked from discovery thread when discovery custom message is received.
+ *
+ * @param msg Discovery message for changing transaction timeout on partition map exchange.
+ */
+ public void onTxTimeoutOnPartitionMapExchangeChange(TxTimeoutOnPartitionMapExchangeChangeMessage msg) {
+ assert msg != null;
+
+ if (msg.isInit()) {
+ TransactionConfiguration cfg = ctx.config().getTransactionConfiguration();
+
+ if (cfg.getTxTimeoutOnPartitionMapExchange() != msg.getTimeout())
+ cfg.setTxTimeoutOnPartitionMapExchange(msg.getTimeout());
+ }
+ else {
+ TxTimeoutOnPartitionMapExchangeChangeFuture fut = txTimeoutOnPartitionMapExchangeFuts.get(
+ msg.getRequestId());
+
+ if (fut != null)
+ fut.onDone();
+ }
+ }
+
+ /**
+ * The task for changing transaction timeout on partition map exchange processed by exchange worker.
+ *
+ * @param msg Message.
+ */
+ public void processTxTimeoutOnPartitionMapExchangeChange(TxTimeoutOnPartitionMapExchangeChangeMessage msg) {
+ assert msg != null;
+
+ long timeout = ctx.config().getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+
+ if (timeout != msg.getTimeout())
+ ctx.config().getTransactionConfiguration().setTxTimeoutOnPartitionMapExchange(msg.getTimeout());
+ }
+
+ /**
* @param stoppedCaches Stopped caches.
*/
private void stopCachesOnClientReconnect(Collection<GridCacheAdapter> stoppedCaches) {
@@ -3413,6 +3462,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (msg instanceof CacheStatisticsModeChangeMessage)
onCacheStatisticsModeChange((CacheStatisticsModeChangeMessage)msg);
+ if (msg instanceof TxTimeoutOnPartitionMapExchangeChangeMessage)
+ onTxTimeoutOnPartitionMapExchangeChange((TxTimeoutOnPartitionMapExchangeChangeMessage)msg);
+
return false;
}
@@ -3963,6 +4015,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
for (EnableStatisticsFuture fut : enableStatisticsFuts.values())
fut.onDone(err);
+
+ for (TxTimeoutOnPartitionMapExchangeChangeFuture fut : txTimeoutOnPartitionMapExchangeFuts.values())
+ fut.onDone(err);
}
/**
@@ -4392,6 +4447,26 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Sets transaction timeout on partition map exchange.
+ *
+ * @param timeout Transaction timeout on partition map exchange in milliseconds.
+ */
+ public void setTxTimeoutOnPartitionMapExchange(long timeout) throws IgniteCheckedException {
+ UUID requestId = UUID.randomUUID();
+
+ TxTimeoutOnPartitionMapExchangeChangeFuture fut = new TxTimeoutOnPartitionMapExchangeChangeFuture(requestId);
+
+ txTimeoutOnPartitionMapExchangeFuts.put(requestId, fut);
+
+ TxTimeoutOnPartitionMapExchangeChangeMessage msg = new TxTimeoutOnPartitionMapExchangeChangeMessage(
+ requestId, timeout);
+
+ ctx.grid().context().discovery().sendCustomEvent(msg);
+
+ fut.get();
+ }
+
+ /**
* @param obj Object to clone.
* @return Object copy.
* @throws IgniteCheckedException If failed.
@@ -4624,4 +4699,30 @@ public class GridCacheProcessor extends GridProcessorAdapter {
return S.toString(EnableStatisticsFuture.class, this);
}
}
+
+ /**
+ * The future for changing transaction timeout on partition map exchange.
+ */
+ private class TxTimeoutOnPartitionMapExchangeChangeFuture extends GridFutureAdapter<Void> {
+ /** */
+ private UUID id;
+
+ /**
+ * @param id Future ID.
+ */
+ private TxTimeoutOnPartitionMapExchangeChangeFuture(UUID id) {
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+ txTimeoutOnPartitionMapExchangeFuts.remove(id, this);
+ return super.onDone(res, err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TxTimeoutOnPartitionMapExchangeChangeFuture.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index b5c4096..b195508 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -71,6 +71,7 @@ import org.apache.ignite.plugin.PluginProvider;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY;
+import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
/**
* Shared context.
@@ -955,9 +956,14 @@ public class GridCacheSharedContext<K, V> {
* @throws IgniteCheckedException If failed.
*/
public void endTx(GridNearTxLocal tx) throws IgniteCheckedException {
- tx.txState().awaitLastFuture(this);
+ boolean clearThreadMap = txMgr.threadLocalTx(null) == tx;
+
+ if (clearThreadMap)
+ tx.txState().awaitLastFuture(this);
+ else
+ tx.state(MARKED_ROLLBACK);
- tx.close();
+ tx.close(clearThreadMap);
}
/**
@@ -983,9 +989,14 @@ public class GridCacheSharedContext<K, V> {
* @return Rollback future.
*/
public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) throws IgniteCheckedException {
- tx.txState().awaitLastFuture(this);
+ boolean clearThreadMap = txMgr.threadLocalTx(null) == tx;
+
+ if (clearThreadMap)
+ tx.txState().awaitLastFuture(this);
+ else
+ tx.state(MARKED_ROLLBACK);
- return tx.rollbackNearTxLocalAsync();
+ return tx.rollbackNearTxLocalAsync(clearThreadMap, false);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index e244c75..bb64cc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -817,13 +817,15 @@ public class GridCacheUtils {
if (tx == null)
return "null";
- return tx.getClass().getSimpleName() + "[id=" + tx.xid() +
+ return tx.getClass().getSimpleName() + "[xid=" + tx.xid() +
+ ", xidVersion=" + tx.xidVersion() +
", concurrency=" + tx.concurrency() +
", isolation=" + tx.isolation() +
", state=" + tx.state() +
", invalidate=" + tx.isInvalidate() +
", rollbackOnly=" + tx.isRollbackOnly() +
", nodeId=" + tx.nodeId() +
+ ", timeout=" + tx.timeout() +
", duration=" + (U.currentTimeMillis() - tx.startTime()) + ']';
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeMessage.java
new file mode 100644
index 0000000..5589c94
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeMessage.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Discovery message for changing transaction timeout on partition map exchange.
+ */
+public class TxTimeoutOnPartitionMapExchangeChangeMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** Request ID. */
+ private final UUID reqId;
+
+ /** Transaction timeout on partition map exchange in milliseconds. */
+ private final long timeout;
+
+ /** Init flag. */
+ private final boolean isInit;
+
+ /**
+ * Constructor for response.
+ *
+ * @param req Request message.
+ */
+ public TxTimeoutOnPartitionMapExchangeChangeMessage(TxTimeoutOnPartitionMapExchangeChangeMessage req) {
+ this.reqId = req.reqId;
+ this.timeout = req.timeout;
+ this.isInit = false;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param reqId Request ID.
+ * @param timeout Transaction timeout on partition map exchange in milliseconds.
+ */
+ public TxTimeoutOnPartitionMapExchangeChangeMessage(UUID reqId, long timeout) {
+ this.reqId = reqId;
+ this.timeout = timeout;
+ this.isInit = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return isInit() ? new TxTimeoutOnPartitionMapExchangeChangeMessage(this) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean stopProcess() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
+ DiscoCache discoCache) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Gets request ID.
+ *
+ * @return Request ID.
+ */
+ public UUID getRequestId() {
+ return reqId;
+ }
+
+ /**
+ * Gets transaction timeout on partition map exchange in milliseconds.
+ *
+ * @return Transaction timeout on partition map exchange in milliseconds.
+ */
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Gets init flag.
+ *
+ * @return Init flag.
+ */
+ public boolean isInit() {
+ return isInit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TxTimeoutOnPartitionMapExchangeChangeMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeTask.java
new file mode 100644
index 0000000..6edfa58
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TxTimeoutOnPartitionMapExchangeChangeTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * The task for changing transaction timeout on partition map exchange.
+ */
+public class TxTimeoutOnPartitionMapExchangeChangeTask implements CachePartitionExchangeWorkerTask {
+ /** Discovery message. */
+ private final TxTimeoutOnPartitionMapExchangeChangeMessage msg;
+
+ /**
+ * Constructor.
+ *
+ * @param msg Discovery message.
+ */
+ public TxTimeoutOnPartitionMapExchangeChangeTask(TxTimeoutOnPartitionMapExchangeChangeMessage msg) {
+ assert msg != null;
+ this.msg = msg;
+ }
+
+ /**
+ * Gets discovery message.
+ *
+ * @return Discovery message.
+ */
+ public TxTimeoutOnPartitionMapExchangeChangeMessage message() {
+ return msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipForExchangeMerge() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TxTimeoutOnPartitionMapExchangeChangeTask.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
index 02d1e8e..7cc368a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
@@ -338,4 +339,4 @@ public class GridCacheTxFinishSync<K, V> {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 45903aa..481c954 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -41,7 +41,7 @@ public class GridDistributedTxMapping {
/** Entries. */
@GridToStringInclude
- private Collection<IgniteTxEntry> entries;
+ private final Collection<IgniteTxEntry> entries;
/** Explicit lock flag. */
private boolean explicitLock;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee5fbce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 27044eb..a692b2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -117,6 +117,9 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
@GridToStringInclude
protected IgniteTxRemoteState txState;
+ /** {@code True} if tx should skip adding itself to completed version map on finish. */
+ private boolean skipCompletedVers;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -228,8 +231,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
@Override public GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
boolean failFast,
KeyCacheObject key)
- throws GridCacheFilterFailedException
- {
+ throws GridCacheFilterFailedException {
assert false : "Method peek can only be called on user transaction: " + this;
throw new IllegalStateException("Method peek can only be called on user transaction: " + this);
@@ -869,7 +871,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
// Note that we don't evict near entries here -
// they will be deleted by their corresponding transactions.
if (state(ROLLING_BACK) || state() == UNKNOWN) {
- cctx.tm().rollbackTx(this, false);
+ cctx.tm().rollbackTx(this, false, skipCompletedVers);
state(ROLLED_BACK);
}
@@ -899,6 +901,20 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/**
+ * @return {@code True} if tx should skip adding itself to completed version map on finish.
+ */
+ public boolean skipCompletedVersions() {
+ return skipCompletedVers;
+ }
+
+ /**
+ * @param skipCompletedVers {@code True} if tx should skip adding itself to completed version map on finish.
+ */
+ public void skipCompletedVersions(boolean skipCompletedVers) {
+ this.skipCompletedVers = skipCompletedVers;
+ }
+
+ /**
* Adds explicit version if there is one.
*
* @param e Transaction entry.
@@ -925,4 +941,5 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
@Override public String toString() {
return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString());
}
+
}