You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/23 16:03:51 UTC
[3/6] ignite git commit: IGNITE-6827 Configurable rollback for long
running transactions before partition exchange IGNITE-7910 Improved
transaction debugging support
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
new file mode 100644
index 0000000..89bf274
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.tx;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorDataTransferObject;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionState;
+
+/**
+ */
+public class VisorTxInfo extends VisorDataTransferObject {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private IgniteUuid xid;
+
+ /** */
+ private long duration;
+
+ /** */
+ private TransactionIsolation isolation;
+
+ /** */
+ private TransactionConcurrency concurrency;
+
+ /** */
+ private long timeout;
+
+ /** */
+ private String lb;
+
+ /** */
+ private Collection<UUID> primaryNodes;
+
+ /** */
+ private TransactionState state;
+
+ /** */
+ private int size;
+
+ /**
+ * Default constructor.
+ */
+ public VisorTxInfo() {
+ // No-op.
+ }
+
+ /**
+ * @param xid Xid.
+ * @param duration Duration.
+ * @param isolation Isolation.
+ * @param concurrency Concurrency.
+ * @param timeout Timeout.
+ * @param lb Label.
+ * @param primaryNodes Primary nodes.
+ * @param state State.
+ * @param size Size.
+ */
+ public VisorTxInfo(IgniteUuid xid, long duration, TransactionIsolation isolation,
+ TransactionConcurrency concurrency, long timeout, String lb, Collection<UUID> primaryNodes,
+ TransactionState state, int size) {
+ this.xid = xid;
+ this.duration = duration;
+ this.isolation = isolation;
+ this.concurrency = concurrency;
+ this.timeout = timeout;
+ this.lb = lb;
+ this.primaryNodes = primaryNodes;
+ this.state = state;
+ this.size = size;
+ }
+
+ /** */
+ public IgniteUuid getXid() {
+ return xid;
+ }
+
+ /** */
+ public long getDuration() {
+ return duration;
+ }
+
+ /** */
+ public TransactionIsolation getIsolation() {
+ return isolation;
+ }
+
+ /** */
+ public TransactionConcurrency getConcurrency() {
+ return concurrency;
+ }
+
+ /** */
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /** */
+ public String getLabel() {
+ return lb;
+ }
+
+ /** */
+ public Collection<UUID> getPrimaryNodes() {
+ return primaryNodes;
+ }
+
+ /** */
+ public TransactionState getState() {
+ return state;
+ }
+
+ /** */
+ public int getSize() {
+ return size;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+ U.writeGridUuid(out, xid);
+ out.writeLong(duration);
+ U.writeEnum(out, isolation);
+ U.writeEnum(out, concurrency);
+ out.writeLong(timeout);
+ U.writeString(out, lb);
+ U.writeCollection(out, primaryNodes);
+ U.writeEnum(out, state);
+ out.writeInt(size);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException {
+ xid = U.readGridUuid(in);
+ duration = in.readLong();
+ isolation = TransactionIsolation.fromOrdinal(in.readByte());
+ concurrency = TransactionConcurrency.fromOrdinal(in.readByte());
+ timeout = in.readLong();
+ lb = U.readString(in);
+ primaryNodes = U.readCollection(in);
+ state = TransactionState.fromOrdinal(in.readByte());
+ size = in.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VisorTxInfo.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxOperation.java
new file mode 100644
index 0000000..4aaab8e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxOperation.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.tx;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Defines possible Visor operation on transactions.
+ */
+public enum VisorTxOperation {
+ /** List matching transactions. */
+ LIST,
+ /** Kill matching transactions. */
+ KILL;
+
+ /** Enumerated values. */
+ private static final VisorTxOperation[] VALS = values();
+
+ /**
+ * Efficiently gets enumerated value from its ordinal.
+ *
+ * @param ord Ordinal value.
+ * @return Enumerated value or {@code null} if ordinal out of range.
+ */
+ @Nullable public static VisorTxOperation fromOrdinal(int ord) {
+ return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxProjection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxProjection.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxProjection.java
new file mode 100644
index 0000000..4c81cc6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxProjection.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.tx;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ */
+public enum VisorTxProjection {
+ /** Server nodes. */
+ SERVER,
+ /** Client nodes. */
+ CLIENT;
+
+ /** Enumerated values. */
+ private static final VisorTxProjection[] VALS = values();
+
+ /**
+ * Efficiently gets enumerated value from its ordinal.
+ *
+ * @param ord Ordinal value.
+ * @return Enumerated value or {@code null} if ordinal out of range.
+ */
+ @Nullable public static VisorTxProjection fromOrdinal(int ord) {
+ return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxSortOrder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxSortOrder.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxSortOrder.java
new file mode 100644
index 0000000..382cf91
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxSortOrder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.tx;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ */
+public enum VisorTxSortOrder {
+ /** Sort by duration. */
+ DURATION,
+ /** Sort by size. */
+ SIZE;
+
+ /** Enumerated values. */
+ private static final VisorTxSortOrder[] VALS = values();
+
+ /**
+ * Efficiently gets enumerated value from its ordinal.
+ *
+ * @param ord Ordinal value.
+ * @return Enumerated value or {@code null} if ordinal out of range.
+ */
+ @Nullable public static VisorTxSortOrder fromOrdinal(int ord) {
+ return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+ }
+
+ /**
+ * @param name Name.
+ */
+ public static VisorTxSortOrder fromString(String name) {
+ if (DURATION.toString().equals(name))
+ return DURATION;
+
+ if (SIZE.toString().equals(name))
+ return SIZE;
+
+ throw new IllegalArgumentException("Sort order is unknown: " + name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
new file mode 100644
index 0000000..72b1740
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.tx;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+@GridInternal
+public class VisorTxTask extends VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterNode, VisorTxTaskResult>, VisorTxTaskResult> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override protected VisorJob<VisorTxTaskArg, VisorTxTaskResult> job(VisorTxTaskArg arg) {
+ return new VisorTxJob(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<UUID> jobNodes(VisorTaskArgument<VisorTxTaskArg> arg) {
+ final VisorTxTaskArg taskArg = arg.getArgument();
+
+ if (taskArg.getConsistentIds() != null) {
+ return F.transform(ignite.cluster().forPredicate(new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return taskArg.getConsistentIds().contains((String)node.consistentId().toString());
+ }
+ }).nodes(), new IgniteClosure<ClusterNode, UUID>() {
+ @Override public UUID apply(ClusterNode node) {
+ return node.id();
+ }
+ });
+ }
+
+ if (taskArg.getProjection() == VisorTxProjection.SERVER) {
+ return F.transform(ignite.cluster().forServers().nodes(), new IgniteClosure<ClusterNode, UUID>() {
+ @Override public UUID apply(ClusterNode node) {
+ return node.id();
+ }
+ });
+ }
+
+ if (taskArg.getProjection() == VisorTxProjection.CLIENT) {
+ return F.transform(ignite.cluster().forClients().nodes(), new IgniteClosure<ClusterNode, UUID>() {
+ @Override public UUID apply(ClusterNode node) {
+ return node.id();
+ }
+ });
+ }
+
+ return F.transform(ignite.cluster().nodes(), new IgniteClosure<ClusterNode, UUID>() {
+ @Override public UUID apply(ClusterNode node) {
+ return node.id();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override protected Map<ClusterNode, VisorTxTaskResult> reduce0(List<ComputeJobResult> results) throws IgniteException {
+ Map<ClusterNode, VisorTxTaskResult> mapRes = new TreeMap<>();
+
+ for (ComputeJobResult result : results) {
+ VisorTxTaskResult data = result.getData();
+
+ if (data == null || data.getInfos().isEmpty())
+ continue;
+
+ mapRes.put(result.getNode(), data);
+ }
+
+ return mapRes;
+ }
+
+ /**
+ *
+ */
+ private static class VisorTxJob extends VisorJob<VisorTxTaskArg, VisorTxTaskResult> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final int DEFAULT_LIMIT = 50;
+
+ /**
+ * @param arg Formal job argument.
+ * @param debug Debug flag.
+ */
+ private VisorTxJob(VisorTxTaskArg arg, boolean debug) {
+ super(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected VisorTxTaskResult run(@Nullable VisorTxTaskArg arg) throws IgniteException {
+ if (arg == null)
+ return new VisorTxTaskResult(Collections.emptyList());
+
+ Collection<Transaction> transactions = ignite.transactions().localActiveTransactions();
+
+ List<VisorTxInfo> infos = new ArrayList<>();
+
+ int limit = arg.getLimit() == null ? DEFAULT_LIMIT : arg.getLimit();
+
+ Pattern lbMatch = null;
+
+ if (arg.getLabelRegex() != null) {
+ try {
+ lbMatch = Pattern.compile(arg.getLabelRegex());
+ }
+ catch (PatternSyntaxException ignored) {
+ // No-op.
+ }
+ }
+
+ for (Transaction transaction : transactions) {
+ GridNearTxLocal locTx = ((TransactionProxyImpl)transaction).tx();
+
+ if (arg.getXid() != null && !locTx.xid().toString().equals(arg.getXid()))
+ continue;
+
+ if (arg.getState() != null && locTx.state() != arg.getState())
+ continue;
+
+ long duration = U.currentTimeMillis() - transaction.startTime();
+
+ if (arg.getMinDuration() != null &&
+ duration < arg.getMinDuration())
+ continue;
+
+ if (arg.getMinSize() != null && locTx.size() < arg.getMinSize())
+ continue;
+
+ if (lbMatch != null && (locTx.label() == null || !lbMatch.matcher(locTx.label()).matches()))
+ continue;
+
+ Collection<UUID> mappings = new ArrayList<>();
+
+ int size = 0;
+
+ if (locTx.mappings() != null) {
+ for (GridDistributedTxMapping mapping : locTx.mappings().mappings()) {
+ mappings.add(mapping.primary().id());
+
+ size += mapping.entries().size(); // Entries are not synchronized so no visibility guaranties.
+ }
+ }
+
+ infos.add(new VisorTxInfo(locTx.xid(), duration, locTx.isolation(), locTx.concurrency(),
+ locTx.timeout(), locTx.label(), mappings, locTx.state(), size));
+
+ if (arg.getOperation() == VisorTxOperation.KILL)
+ locTx.rollbackAsync();
+
+ if (infos.size() == limit)
+ break;
+ }
+
+ Comparator<VisorTxInfo> comp = TxDurationComparator.INSTANCE;
+
+ if (arg.getSortOrder() != null) {
+ switch (arg.getSortOrder()) {
+ case DURATION:
+ comp = TxDurationComparator.INSTANCE;
+
+ break;
+
+ case SIZE:
+ comp = TxSizeComparator.INSTANCE;
+
+ break;
+
+ default:
+ }
+ }
+
+ Collections.sort(infos, comp);
+
+ return new VisorTxTaskResult(infos);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TxDurationComparator implements Comparator<VisorTxInfo> {
+ /** Instance. */
+ public static final TxDurationComparator INSTANCE = new TxDurationComparator();
+
+ /** {@inheritDoc} */
+ @Override public int compare(VisorTxInfo o1, VisorTxInfo o2) {
+ return Long.compare(o2.getDuration(), o1.getDuration());
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TxSizeComparator implements Comparator<VisorTxInfo> {
+ /** Instance. */
+ public static final TxSizeComparator INSTANCE = new TxSizeComparator();
+
+ /** {@inheritDoc} */
+ @Override public int compare(VisorTxInfo o1, VisorTxInfo o2) {
+ return Long.compare(o2.getSize(), o1.getSize());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTaskArg.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTaskArg.java
new file mode 100644
index 0000000..80bf8b5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTaskArg.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.tx;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorDataTransferObject;
+import org.apache.ignite.internal.visor.baseline.VisorBaselineTask;
+import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Argument for {@link VisorTxTask}.
+ */
+public class VisorTxTaskArg extends VisorDataTransferObject {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private VisorTxOperation op;
+
+ /** */
+ private @Nullable Integer limit;
+
+ /** */
+ private @Nullable Long minDuration;
+
+ /** */
+ private @Nullable Integer minSize;
+
+ /** */
+ private @Nullable TransactionState state;
+
+ /** */
+ private @Nullable VisorTxProjection proj;
+
+ /** */
+ private @Nullable List<String> consistentIds;
+
+ /** */
+ private @Nullable String xid;
+
+ /** */
+ private @Nullable String lbRegex;
+
+ /** */
+ private @Nullable VisorTxSortOrder sortOrder;
+
+ /**
+ * Default constructor.
+ */
+ public VisorTxTaskArg() {
+ // No-op.
+ }
+
+ /**
+ * @param limit Limit to collect.
+ * @param minDuration Min duration.
+ * @param minSize Min size.
+ * @param state State.
+ * @param proj Projection.
+ * @param consistentIds Consistent ids for NODES projection.
+ * @param xid Xid.
+ * @param lbRegex Label regex.
+ * @param sortOrder Sort order.
+ */
+ public VisorTxTaskArg(VisorTxOperation op, @Nullable Integer limit, @Nullable Long minDuration, @Nullable Integer minSize,
+ @Nullable TransactionState state, @Nullable VisorTxProjection proj, @Nullable List<String> consistentIds,
+ @Nullable String xid, @Nullable String lbRegex, @Nullable VisorTxSortOrder sortOrder) {
+ this.op = op;
+ this.limit = limit;
+ this.minDuration = minDuration;
+ this.minSize = minSize;
+ this.state = state;
+ this.proj = proj;
+ this.consistentIds = consistentIds;
+ this.lbRegex = lbRegex;
+ this.xid = xid;
+ this.sortOrder = sortOrder;
+ }
+
+ /** */
+ public VisorTxOperation getOperation() {
+ return op;
+ }
+
+ /** */
+ @Nullable public Integer getLimit() {
+ return limit;
+ }
+
+ /** */
+ @Nullable public Long getMinDuration() {
+ return minDuration;
+ }
+
+ /** */
+ @Nullable public Integer getMinSize() {
+ return minSize;
+ }
+
+ /** */
+ @Nullable public TransactionState getState() {
+ return state;
+ }
+
+ /** */
+ public VisorTxProjection getProjection() {
+ return proj;
+ }
+
+ /** */
+ @Nullable public List<String> getConsistentIds() {
+ return consistentIds;
+ }
+
+ /** */
+ @Nullable public String getLabelRegex() {
+ return lbRegex;
+ }
+
+ /** */
+ @Nullable public String getXid() {
+ return xid;
+ }
+
+ /** */
+ @Nullable public VisorTxSortOrder getSortOrder() {
+ return sortOrder;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+ U.writeEnum(out, op);
+ out.writeInt(limit == null ? -1 : limit);
+ out.writeLong(minDuration == null ? -1 : minDuration);
+ out.writeInt(minSize == null ? -1 : minSize);
+ U.writeEnum(out, state);
+ U.writeEnum(out, proj);
+ U.writeCollection(out, consistentIds);
+ out.writeUTF(lbRegex == null ? "" : lbRegex);
+ out.writeUTF(xid == null ? "" : xid);
+ U.writeEnum(out, sortOrder);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException {
+ op = VisorTxOperation.fromOrdinal(in.readByte());
+ limit = fixNull(in.readInt());
+ minDuration = fixNull(in.readLong());
+ minSize = fixNull(in.readInt());
+ state = TransactionState.fromOrdinal(in.readByte());
+ proj = VisorTxProjection.fromOrdinal(in.readByte());
+ consistentIds = U.readList(in);
+ lbRegex = fixNull(in.readUTF());
+ xid = fixNull(in.readUTF());
+ sortOrder = VisorTxSortOrder.fromOrdinal(in.readByte());
+ }
+
+ /**
+ * @param val Value.
+ */
+ private Integer fixNull(int val) {
+ return val == -1 ? null : val;
+ }
+
+ /**
+ * @param val Value.
+ */
+ private Long fixNull(long val) {
+ return val == -1 ? null : val;
+ }
+
+ /**
+ * @param val Value.
+ */
+ private String fixNull(String val) {
+ return "".equals(val) ? null : val;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VisorTxTaskArg.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTaskResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTaskResult.java
new file mode 100644
index 0000000..4864a77
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTaskResult.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.tx;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorDataTransferObject;
+import org.apache.ignite.internal.visor.baseline.VisorBaselineNode;
+import org.apache.ignite.internal.visor.baseline.VisorBaselineTask;
+
+/**
+ * Result for {@link VisorTxTask}.
+ */
+public class VisorTxTaskResult extends VisorDataTransferObject {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private List<VisorTxInfo> infos;
+
+ /**
+ * Default constructor.
+ */
+ public VisorTxTaskResult() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param infos Infos.
+ */
+ public VisorTxTaskResult(List<VisorTxInfo> infos) {
+ this.infos = infos;
+ }
+
+ /** */
+ public List<VisorTxInfo> getInfos() {
+ return infos;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+ U.writeCollection(out, infos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException {
+ infos = U.readList(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VisorTxTaskResult.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java
new file mode 100644
index 0000000..2a125fa
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mxbean;
+
+import org.apache.ignite.configuration.TransactionConfiguration;
+
+/**
+ * Transactions MXBean interface.
+ */
+@MXBeanDescription("MBean that provides access to Ignite transactions.")
+public interface TransactionsMXBean {
+ /**
+ * @param minDuration Minimum duration.
+ * @param minSize Minimum size.
+ * @param prj Projection.
+ * @param consistentIds Consistent ids.
+ * @param xid Xid.
+ * @param lbRegex Label regex.
+ * @param limit Limit.
+ * @param order Order.
+ * @param detailed Detailed.
+ * @param kill Kill.
+ */
+ @MXBeanDescription("Returns or kills transactions matching the filter conditions.")
+ @MXBeanParametersNames(
+ {
+ "minDuration",
+ "minSize",
+ "prj",
+ "consistentIds",
+ "xid",
+ "lbRegex",
+ "limit",
+ "order",
+ "detailed",
+ "kill"
+ }
+ )
+ @MXBeanParametersDescriptions(
+ {
+ "Minimum duration (seconds).",
+ "Minimum size.",
+ "Projection (servers|clients).",
+ "Consistent ids (separated by comma).",
+ "Transaction XID.",
+ "Label regexp.",
+ "Limit a number of transactions collected on each node.",
+ "Order by DURATION|SIZE.",
+ "Show detailed description, otherwise only count.",
+ "Kill matching transactions (be careful)."
+ }
+ )
+ public String getActiveTransactions(Long minDuration, Integer minSize, String prj,
+ String consistentIds, String xid, String lbRegex, Integer limit, String order, boolean detailed, boolean kill);
+
+ /**
+ * Gets transaction timeout on partition map exchange.
+ * <p>
+ * @see TransactionConfiguration#getTxTimeoutOnPartitionMapExchange
+ *
+ * @return Transaction timeout on partition map exchange in milliseconds.
+ */
+ @MXBeanDescription("Returns transaction timeout on partition map exchange in milliseconds.")
+ public long getTxTimeoutOnPartitionMapExchange();
+
+ /**
+ * Sets transaction timeout on partition map exchange.
+ * <p>
+ * If not set, default value is {@link TransactionConfiguration#TX_TIMEOUT_ON_PARTITION_MAP_EXCHANGE} which means
+ * transactions will never be rolled back on partition map exchange.
+ * <p>
+ * @see TransactionConfiguration#setTxTimeoutOnPartitionMapExchange
+ *
+ * @param timeout Transaction timeout on partition map exchange in milliseconds.
+ */
+ @MXBeanDescription("Sets transaction timeout on partition map exchange in milliseconds.")
+ @MXBeanParametersNames(
+ "timeout"
+ )
+ @MXBeanParametersDescriptions(
+ "Transaction timeout on partition map exchange in milliseconds."
+ )
+ public void setTxTimeoutOnPartitionMapExchange(long timeout);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
index a1b4d78..b0f90c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
@@ -24,6 +24,7 @@ import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
/**
* Ignite cache transaction. Cache transactions have a default 2PC (two-phase-commit) behavior and
@@ -259,6 +260,7 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport {
/**
* Rolls back this transaction.
+ * Note, that it's allowed to roll back transaction from any thread at any time.
*
* @throws IgniteException If rollback failed.
*/
@@ -267,6 +269,7 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport {
/**
* Asynchronously rolls back this transaction.
+ * Note, that it's allowed to roll back transaction from any thread at any time.
*
* @return a Future representing pending completion of the rollback.
* @throws IgniteException If rollback failed.
@@ -274,16 +277,25 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport {
public IgniteFuture<Void> rollbackAsync() throws IgniteException;
/**
- * Resume transaction if it was previously suspended. <strong>Supported only for optimistic transactions.</strong>
+ * Resume a transaction if it was previously suspended. <strong>Supported only for optimistic transactions.</strong>
*
* @throws IgniteException If resume failed.
*/
public void resume() throws IgniteException;
/**
- * Suspends transaction. It could be resumed later. <strong>Supported only for optimistic transactions.</strong>
+ * Suspends a transaction. It could be resumed later. <strong>Supported only for optimistic transactions.</strong>
*
* @throws IgniteException If suspension failed.
*/
public void suspend() throws IgniteException;
-}
\ No newline at end of file
+
+ /**
+ * Returns transaction's label.
+ * <p>
+ * Use {@link IgniteTransactions#withLabel(java.lang.String)} to assign a label to a newly created transaction.
+ *
+ * @return Label.
+ */
+ public @Nullable String label();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 6dc3d85..d939b02 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2069,6 +2069,13 @@ org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException
org.apache.ignite.internal.visor.util.VisorEventMapper
org.apache.ignite.internal.visor.util.VisorExceptionWrapper
org.apache.ignite.internal.visor.util.VisorTaskUtils$4
+org.apache.ignite.internal.visor.tx.VisorTxInfo
+org.apache.ignite.internal.visor.tx.VisorTxOperation
+org.apache.ignite.internal.visor.tx.VisorTxProjection
+org.apache.ignite.internal.visor.tx.VisorTxSortOrder
+org.apache.ignite.internal.visor.tx.VisorTxTask
+org.apache.ignite.internal.visor.tx.VisorTxTaskArg
+org.apache.ignite.internal.visor.tx.VisorTxTaskResult
org.apache.ignite.internal.websession.WebSessionAttributeProcessor
org.apache.ignite.internal.websession.WebSessionEntity
org.apache.ignite.lang.IgniteBiClosure
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index cf4f059..5d12d9a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -105,7 +105,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
}
if (block) {
- ignite.log().info("Block message [node=" + node.id() +
+ ignite.log().info("Block message [node=" + node.id() + ", order=" + node.order() +
", msg=" + ioMsg.message() + ']');
blockedMsgs.add(new T2<>(node, ioMsg));
@@ -273,6 +273,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
try {
ignite.log().info("Send blocked message [node=" + msg.get1().id() +
+ ", order=" + msg.get1().order() +
", msg=" + msg.get2().message() + ']');
super.sendMessage(msg.get1(), msg.get2());
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/TransactionMetricsMxBeanImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TransactionMetricsMxBeanImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/TransactionMetricsMxBeanImplTest.java
index 80b0dc0..91bd9bc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TransactionMetricsMxBeanImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TransactionMetricsMxBeanImplTest.java
@@ -100,7 +100,7 @@ public class TransactionMetricsMxBeanImplTest extends GridCommonAbstractTest {
awaitPartitionMapExchange();
- TransactionMetricsMxBean txMXBean = txMXBean(0);
+ TransactionMetricsMxBean txMXBean = txMetricsMXBean(0);
final IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
@@ -176,7 +176,7 @@ public class TransactionMetricsMxBeanImplTest extends GridCommonAbstractTest {
IgniteEx primaryNode2 = startGrid(1);
IgniteEx nearNode = startGrid(2);
- TransactionMetricsMxBean txMXBeanBackup = txMXBean(2);
+ TransactionMetricsMxBean txMXBeanBackup = txMetricsMXBean(2);
awaitPartitionMapExchange();
@@ -285,10 +285,10 @@ public class TransactionMetricsMxBeanImplTest extends GridCommonAbstractTest {
/**
*
*/
- private TransactionMetricsMxBean txMXBean(int igniteInt) throws Exception {
+ private TransactionMetricsMxBean txMetricsMXBean(int igniteInt) throws Exception {
ObjectName mbeanName = U.makeMBeanName(
getTestIgniteInstanceName(igniteInt),
- "Transactions",
+ "TransactionMetrics",
TransactionMetricsMxBeanImpl.class.getSimpleName()
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/TransactionsMXBeanImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TransactionsMXBeanImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/TransactionsMXBeanImplTest.java
new file mode 100644
index 0000000..d358c72
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TransactionsMXBeanImplTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.mxbean.TransactionsMXBean;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class TransactionsMXBeanImplTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(name);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ final CacheConfiguration cCfg = new CacheConfiguration()
+ .setName(DEFAULT_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ .setBackups(1)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setRebalanceMode(CacheRebalanceMode.ASYNC)
+ .setWriteSynchronizationMode(FULL_SYNC);
+
+ cfg.setCacheConfiguration(cCfg);
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ public void testBasic() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ TransactionsMXBean bean = txMXBean(0);
+
+ ignite.transactions().txStart();
+
+ ignite.cache(DEFAULT_CACHE_NAME).put(0, 0);
+
+ String res = bean.getActiveTransactions(null, null, null, null, null, null, null, null, false, false);
+
+ assertEquals("1", res);
+
+ res = bean.getActiveTransactions(null, null, null, null, null, null, null, null, true, false);
+
+ assertTrue(res.indexOf("Tx:") > 0);
+
+ res = bean.getActiveTransactions(null, null, null, null, null, null, null, null, false, true);
+
+ assertEquals("1", res);
+
+ doSleep(500);
+
+ res = bean.getActiveTransactions(null, null, null, null, null, null, null, null, false, false);
+
+ assertEquals("0", res);
+ }
+
+ /**
+ *
+ */
+ private TransactionsMXBean txMXBean(int igniteInt) throws Exception {
+ ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(igniteInt), "Transactions",
+ TransactionsMXBeanImpl.class.getSimpleName());
+
+ MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
+
+ if (!mbeanSrv.isRegistered(mbeanName))
+ fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+ return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, TransactionsMXBean.class, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
index cfbf1a6..049500a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/commandline/CommandHandlerParsingTest.java
@@ -19,7 +19,11 @@
package org.apache.ignite.internal.commandline;
+import java.util.Arrays;
import junit.framework.TestCase;
+import org.apache.ignite.internal.visor.tx.VisorTxProjection;
+import org.apache.ignite.internal.visor.tx.VisorTxSortOrder;
+import org.apache.ignite.internal.visor.tx.VisorTxTaskArg;
import static java.util.Arrays.asList;
import static org.apache.ignite.internal.commandline.CommandHandler.DFLT_HOST;
@@ -109,4 +113,100 @@ public class CommandHandlerParsingTest extends TestCase {
}
}
}
+
+ /**
+ * test parsing dump transaction arguments
+ */
+ @SuppressWarnings("Null")
+ public void testTransactionArguments() {
+ CommandHandler hnd = new CommandHandler();
+ Arguments args;
+
+ args = hnd.parseAndValidate(asList("--tx"));
+
+ try {
+ hnd.parseAndValidate(asList("--tx", "minDuration"));
+
+ fail("Expected exception");
+ }
+ catch (IllegalArgumentException ignored) {
+ }
+
+ try {
+ hnd.parseAndValidate(asList("--tx", "minDuration", "-1"));
+
+ fail("Expected exception");
+ }
+ catch (IllegalArgumentException ignored) {
+ }
+
+ try {
+ hnd.parseAndValidate(asList("--tx", "minSize"));
+
+ fail("Expected exception");
+ }
+ catch (IllegalArgumentException ignored) {
+ }
+
+ try {
+ hnd.parseAndValidate(asList("--tx", "minSize", "-1"));
+
+ fail("Expected exception");
+ }
+ catch (IllegalArgumentException ignored) {
+ }
+
+ try {
+ hnd.parseAndValidate(asList("--tx", "label"));
+
+ fail("Expected exception");
+ }
+ catch (IllegalArgumentException ignored) {
+ }
+
+ try {
+ hnd.parseAndValidate(asList("--tx", "label", "tx123["));
+
+ fail("Expected exception");
+ }
+ catch (IllegalArgumentException ignored) {
+ }
+
+ try {
+ hnd.parseAndValidate(asList("--tx", "servers", "nodes", "1,2,3"));
+
+ fail("Expected exception");
+ }
+ catch (IllegalArgumentException ignored) {
+ }
+
+ args = hnd.parseAndValidate(asList("--tx", "minDuration", "120", "minSize", "10", "limit", "100", "order", "SIZE",
+ "servers"));
+
+ VisorTxTaskArg arg = args.transactionArguments();
+
+ assertEquals(Long.valueOf(120 * 1000L), arg.getMinDuration());
+ assertEquals(Integer.valueOf(10), arg.getMinSize());
+ assertEquals(Integer.valueOf(100), arg.getLimit());
+ assertEquals(VisorTxSortOrder.SIZE, arg.getSortOrder());
+ assertEquals(VisorTxProjection.SERVER, arg.getProjection());
+
+ args = hnd.parseAndValidate(asList("--tx", "minDuration", "130", "minSize", "1", "limit", "60", "order", "DURATION",
+ "clients"));
+
+ arg = args.transactionArguments();
+
+ assertEquals(Long.valueOf(130 * 1000L), arg.getMinDuration());
+ assertEquals(Integer.valueOf(1), arg.getMinSize());
+ assertEquals(Integer.valueOf(60), arg.getLimit());
+ assertEquals(VisorTxSortOrder.DURATION, arg.getSortOrder());
+ assertEquals(VisorTxProjection.CLIENT, arg.getProjection());
+
+ args = hnd.parseAndValidate(asList("--tx", "nodes", "1,2,3"));
+
+ arg = args.transactionArguments();
+
+ assertNull(arg.getProjection());
+ assertEquals(Arrays.asList("1", "2", "3"), arg.getConsistentIds());
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
index dac64d9..4aa693c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
@@ -33,4 +33,8 @@ public class CacheGetEntryPessimisticRepeatableReadSeltTest extends CacheGetEntr
@Override protected TransactionIsolation isolation() {
return TransactionIsolation.REPEATABLE_READ;
}
-}
\ No newline at end of file
+
+ @Override public void testReplicatedTransactional() throws Exception {
+ super.testReplicatedTransactional();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java
new file mode 100644
index 0000000..3152349
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TransactionsMXBeanImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.mxbean.TransactionsMXBean;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class SetTxTimeoutOnPartitionMapExchangeTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Wait condition timeout. */
+ private static final long WAIT_CONDITION_TIMEOUT = 10_000L;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ public void testDefaultTxTimeoutOnPartitionMapExchange() throws Exception {
+ IgniteEx ig1 = startGrid(1);
+ IgniteEx ig2 = startGrid(2);
+
+ TransactionConfiguration txCfg1 = ig1.configuration().getTransactionConfiguration();
+ TransactionConfiguration txCfg2 = ig2.configuration().getTransactionConfiguration();
+
+ final long expDfltTimeout = TransactionConfiguration.TX_TIMEOUT_ON_PARTITION_MAP_EXCHANGE;
+
+ assertEquals(expDfltTimeout, txCfg1.getTxTimeoutOnPartitionMapExchange());
+ assertEquals(expDfltTimeout, txCfg2.getTxTimeoutOnPartitionMapExchange());
+ }
+
+ /**
+ *
+ */
+ public void testJmxSetTxTimeoutOnPartitionMapExchange() throws Exception {
+ startGrid(1);
+ startGrid(2);
+
+ TransactionsMXBean mxBean1 = txMXBean(1);
+ TransactionsMXBean mxBean2 = txMXBean(2);
+
+ final long expTimeout1 = 20_000L;
+ final long expTimeout2 = 30_000L;
+
+ mxBean1.setTxTimeoutOnPartitionMapExchange(expTimeout1);
+ assertTxTimeoutOnPartitionMapExchange(expTimeout1);
+ assertEquals(expTimeout1, mxBean1.getTxTimeoutOnPartitionMapExchange());
+
+ mxBean2.setTxTimeoutOnPartitionMapExchange(expTimeout2);
+ assertTxTimeoutOnPartitionMapExchange(expTimeout2);
+ assertEquals(expTimeout2, mxBean2.getTxTimeoutOnPartitionMapExchange());
+ }
+
+ /**
+ *
+ */
+ public void testClusterSetTxTimeoutOnPartitionMapExchange() throws Exception {
+ Ignite ig1 = startGrid(1);
+ Ignite ig2 = startGrid(2);
+
+ final long expTimeout1 = 20_000L;
+ final long expTimeout2 = 30_000L;
+
+ ig1.cluster().setTxTimeoutOnPartitionMapExchange(expTimeout1);
+ assertTxTimeoutOnPartitionMapExchange(expTimeout1);
+
+ ig2.cluster().setTxTimeoutOnPartitionMapExchange(expTimeout2);
+ assertTxTimeoutOnPartitionMapExchange(expTimeout2);
+ }
+
+ /**
+ *
+ */
+ private TransactionsMXBean txMXBean(int igniteInt) throws Exception {
+ ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(igniteInt), "Transactions",
+ TransactionsMXBeanImpl.class.getSimpleName());
+
+ MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
+
+ if (!mbeanSrv.isRegistered(mbeanName))
+ fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+ return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, TransactionsMXBean.class, true);
+ }
+
+ /**
+ * Checking the transaction timeout on all grids.
+ *
+ * @param expTimeout Expected timeout.
+ * @throws IgniteInterruptedCheckedException If failed.
+ */
+ private void assertTxTimeoutOnPartitionMapExchange(final long expTimeout)
+ throws IgniteInterruptedCheckedException {
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ for (Ignite ignite : G.allGrids()) {
+ long actualTimeout = ignite.configuration()
+ .getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+
+ if (actualTimeout != expTimeout) {
+ log.warning(String.format(
+ "Wrong transaction timeout on partition map exchange [grid=%s, timeout=%d, expected=%d]",
+ ignite.name(), actualTimeout, expTimeout));
+
+ return false;
+ }
+ }
+
+ return true;
+
+ }
+ }, WAIT_CONDITION_TIMEOUT));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index e456047..787095e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -219,11 +219,6 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
parts == null ? RendezvousAffinityFunction.DFLT_PARTITION_COUNT : parts);
}
- @Override
- protected void beforeTest() throws Exception {
- super.beforeTest();
- }
-
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsNearTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsNearTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsNearTest.java
new file mode 100644
index 0000000..fe8b1fb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsNearTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.configuration.NearCacheConfiguration;
+
+/**
+ * Test correctness of rollback a transaction with timeout during the grid stop.
+ */
+public class IgniteTxRemoveTimeoutObjectsNearTest extends IgniteTxRemoveTimeoutObjectsTest {
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return new NearCacheConfiguration();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java
index 752d854..5713621 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
@@ -205,4 +206,9 @@ public class IgniteTxRemoveTimeoutObjectsTest extends GridCacheAbstractSelfTest
return GridTestUtils.getFieldValue(timeout, timeout.getClass(), "timeoutObjs");
}
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index d147d31..00f9729 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -481,7 +481,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
@Override public boolean apply() {
GridDhtTxLocal locTx = null;
- for (IgniteInternalTx tx : tm.txs()) {
+ for (IgniteInternalTx tx : tm.activeTransactions()) {
if (tx instanceof GridDhtTxLocal) {
assertNull("Only one tx is expected.", locTx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
index b644cda..11c4c67 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
@@ -98,7 +98,7 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest {
@Override protected void afterTest() throws Exception {
try {
for (Ignite node : G.allGrids()) {
- Collection<IgniteInternalTx> txs = ((IgniteKernal)node).context().cache().context().tm().txs();
+ Collection<IgniteInternalTx> txs = ((IgniteKernal)node).context().cache().context().tm().activeTransactions();
assertTrue("Unfinished txs [node=" + node.name() + ", txs=" + txs + ']', txs.isEmpty());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxMultiNodeSelfTest.java
index 0ca10f8..07ee991 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxMultiNodeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxMultiNodeSelfTest.java
@@ -242,7 +242,7 @@ public class GridCacheNearTxMultiNodeSelfTest extends GridCommonAbstractTest {
*/
@SuppressWarnings( {"unchecked"})
private void checkTm(Ignite g, IgniteTxManager tm) {
- Collection<IgniteInternalTx> txs = tm.txs();
+ Collection<IgniteInternalTx> txs = tm.activeTransactions();
info(">>> Number of transactions in the set [size=" + txs.size() +
", nodeId=" + g.cluster().localNode().id() + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
index 256e8da..2e4ad92 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java
@@ -181,8 +181,8 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes
List<Integer> dhtSizes = new ArrayList<>(GRID_CNT - 1);
for (int i = 1; i < GRID_CNT; i++) {
- nearSizes.add(near(i).context().tm().txs().size());
- dhtSizes.add(dht(i).context().tm().txs().size());
+ nearSizes.add(near(i).context().tm().activeTransactions().size());
+ dhtSizes.add(dht(i).context().tm().activeTransactions().size());
}
stopNodeAndSleep(SALVAGE_TIMEOUT - DELTA_BEFORE);
@@ -247,7 +247,7 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes
* @param ctx Cache context.
*/
private void checkTxsEmpty(GridCacheContext ctx) {
- Collection txs = ctx.tm().txs();
+ Collection txs = ctx.tm().activeTransactions();
assert txs.isEmpty() : "Not all transactions were salvaged: " + txs;
}
@@ -259,7 +259,7 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes
* @param exp Expected amount of transactions.
*/
private void checkTxsNotEmpty(GridCacheContext ctx, int exp) {
- int size = ctx.tm().txs().size();
+ int size = ctx.tm().activeTransactions().size();
assertEquals("Some transactions were salvaged unexpectedly", exp, size);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxMultiThreadedSelfTest.java
index 9e46b40..388a7bf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxMultiThreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxMultiThreadedSelfTest.java
@@ -73,7 +73,7 @@ public class GridCacheLocalTxMultiThreadedSelfTest extends IgniteTxMultiThreaded
/** {@inheritDoc} */
@Override protected int threadCount() {
- return 8;
+ return 2;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxLabelTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxLabelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxLabelTest.java
new file mode 100644
index 0000000..d89ba0b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxLabelTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
+import org.apache.ignite.transactions.Transaction;
+
+/**
+ * Tests transaction labels.
+ */
+public class TxLabelTest extends GridCacheAbstractSelfTest {
+ /**
+ * Tests transaction labels.
+ */
+ public void testLabel() {
+ testLabel0(grid(0), "lbl0");
+ testLabel0(grid(0), "lbl1");
+
+ try {
+ testLabel0(grid(0), null);
+
+ fail();
+ }
+ catch (Exception e) {
+ // Expected.
+ }
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param lbl Label.
+ */
+ private void testLabel0(Ignite ignite, String lbl) {
+ try(Transaction tx = ignite.transactions().withLabel(lbl).txStart()) {
+ assertEquals(lbl, tx.label());
+
+ ignite.cache(DEFAULT_CACHE_NAME).put(0, 0);
+
+ tx.commit();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b5d2fc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxMultiCacheAsyncOpsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxMultiCacheAsyncOpsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxMultiCacheAsyncOpsTest.java
new file mode 100644
index 0000000..261af70
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxMultiCacheAsyncOpsTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class TxMultiCacheAsyncOpsTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ public static final int GRID_COUNT = 3;
+
+ /** Caches count. */
+ public static final int CACHES_CNT = 3;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(cacheConfigurations());
+
+ return cfg;
+ }
+
+ /** */
+ private CacheConfiguration[] cacheConfigurations() {
+ return IntStream.range(0, CACHES_CNT).mapToObj(
+ this::cacheConfiguration).collect(Collectors.toList()).toArray(new CacheConfiguration[CACHES_CNT]);
+ }
+
+ /**
+ * @param idx Index.
+ */
+ private CacheConfiguration cacheConfiguration(int idx) {
+ CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME + idx);
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setBackups(2);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setOnheapCacheEnabled(false);
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(GRID_COUNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ *
+ */
+ public void testCommitAfterAsyncPut() {
+ CacheConfiguration[] caches = cacheConfigurations();
+
+ try {
+ for (int i = 0; i < caches.length; i++)
+ grid(0).cache(caches[i].getName()).put(1, i + 1);
+
+ try (Transaction tx = grid(0).transactions().txStart()) {
+ for (int i = 0; i < caches.length; i++)
+ grid(0).cache(caches[i].getName()).putAsync(1, (i + 1) * 10);
+
+ tx.commit();
+ }
+ catch (Exception e) {
+ System.out.println();
+ }
+
+ for (int i = 0; i < caches.length; i++)
+ assertEquals((i + 1) * 10, grid(0).cache(caches[i].getName()).get(1));
+ }
+ finally {
+ for (int i = 0; i < caches.length; i++)
+ grid(0).cache(caches[i].getName()).removeAll();
+ }
+ }
+
+ /**
+ *
+ */
+ public void testCommitAfterAsyncGet() {
+ CacheConfiguration[] caches = cacheConfigurations();
+
+ try {
+ for (int i = 0; i < caches.length; i++)
+ grid(0).cache(caches[i].getName()).put(1, i + 1);
+
+ List<IgniteFuture> futs = new ArrayList<>();
+
+ try (Transaction tx = grid(0).transactions().txStart()) {
+ for (int i = 0; i < caches.length; i++)
+ futs.add(grid(0).cache(caches[i].getName()).getAsync(1));
+
+ tx.commit();
+ }
+
+ for (int i = 0; i < futs.size(); i++)
+ assertEquals(i + 1, futs.get(i).get());
+ }
+ finally {
+ for (int i = 0; i < caches.length; i++)
+ grid(0).cache(caches[i].getName()).removeAll();
+ }
+ }
+}