You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/29 15:00:57 UTC
[2/2] ignite git commit: ignite-1.5 debug
ignite-1.5 debug
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad658eb2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad658eb2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad658eb2
Branch: refs/heads/ignite-1537
Commit: ad658eb2d58c83a71e41d70eebf1aee0b4aba2ff
Parents: 474e394
Author: sboikov <sb...@gridgain.com>
Authored: Tue Dec 29 17:00:49 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Dec 29 17:00:49 2015 +0300
----------------------------------------------------------------------
.../GridDistributedTxFinishRequest.java | 4 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 27 +-
.../near/GridNearTxFinishFuture.java | 384 +++++++++++++------
3 files changed, 295 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad658eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 34b3112..692c70c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.lang.IgniteUuid;
@@ -85,6 +83,8 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
* @param invalidate Invalidate flag.
* @param sys System transaction flag.
* @param plc IO policy.
+ * @param syncCommit Sync commit flag.
+ * @param syncRollback Sync rollback flag.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad658eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 65f1cb4..74325e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.Collections;
import java.util.UUID;
import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -44,6 +44,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+
/** Near node ID. */
private UUID nearNodeId;
@@ -64,7 +67,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> pendingVers;
- /** Check comitted flag. */
+ /** Check committed flag. */
private boolean checkCommitted;
/** Partition update counter. */
@@ -81,6 +84,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** Task name hash. */
private int taskNameHash;
+ /** */
+ private byte flags;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -100,6 +106,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param commit Commit flag.
* @param invalidate Invalidate flag.
* @param sys System flag.
+ * @param plc IO policy.
* @param sysInvalidate System invalidation flag.
* @param syncCommit Synchronous commit flag.
* @param syncRollback Synchronous rollback flag.
@@ -180,6 +187,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param commit Commit flag.
* @param invalidate Invalidate flag.
* @param sys System flag.
+ * @param plc IO policy.
* @param sysInvalidate System invalidation flag.
* @param syncCommit Synchronous commit flag.
* @param syncRollback Synchronous rollback flag.
@@ -302,20 +310,17 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
- * Gets versions of not acquired locks with version less then one of transaction being committed.
- *
- * @return Versions of locks for entries participating in transaction that have not been acquired yet
- * have version less then one of transaction being committed.
+ * @return Check committed flag.
*/
- public Collection<GridCacheVersion> pendingVersions() {
- return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
+ public boolean checkCommitted() {
+ return checkCommitted;
}
/**
- * @return Check committed flag.
+ * @return {@code True}
*/
- public boolean checkCommitted() {
- return checkCommitted;
+ public boolean waitRemoteTxs() {
+ return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad658eb2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 00f5141..f225d6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
@@ -65,6 +68,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
public static final IgniteProductVersion FINISH_NEAR_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.4.0");
/** */
+ public static final IgniteProductVersion WAIT_REMOTE_TXS_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+ /** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
@@ -123,22 +129,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public boolean onNodeLeft(UUID nodeId) {
- for (IgniteInternalFuture<?> fut : futures())
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ boolean found = false;
- if (f.node().id().equals(nodeId)) {
- // Remove previous mapping.
- mappings.remove(nodeId);
+ ClusterTopologyCheckedException err =
+ new ClusterTopologyCheckedException("Remote node left grid: " + nodeId);
- f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will fail): " + nodeId));
+ for (IgniteInternalFuture<?> fut : futures())
+ if (isMini(fut)) {
+ MinFuture f = (MinFuture)fut;
- return true;
- }
+ if (f.onNodeLeft(nodeId, err))
+ found = true;
}
- return false;
+ return found;
}
/** {@inheritDoc} */
@@ -160,13 +166,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
public void onResult(UUID nodeId, GridNearTxFinishResponse res) {
if (!isDone())
for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ if (fut.getClass() == FinishMiniFuture.class) {
+ FinishMiniFuture f = (FinishMiniFuture)fut;
if (f.futureId().equals(res.miniId())) {
assert f.node().id().equals(nodeId);
- f.onResult(res);
+ f.onNearFinishResponse(res);
}
}
}
@@ -179,15 +185,21 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
if (!isDone())
for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ if (fut.getClass() == CheckBackupMiniFuture.class) {
+ CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
if (f.futureId().equals(res.miniId())) {
assert f.node().id().equals(nodeId);
- f.onResult(res);
+ f.onDhtFinishResponse(res);
}
}
+ else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
+ CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId()))
+ f.onDhtFinishResponse(nodeId);
+ }
}
}
@@ -290,11 +302,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
- * @param f Future.
+ * @param fut Future.
* @return {@code True} if mini-future.
*/
- private boolean isMini(IgniteInternalFuture<?> f) {
- return f.getClass().equals(MiniFuture.class);
+ private boolean isMini(IgniteInternalFuture<?> fut) {
+ return fut.getClass() == FinishMiniFuture.class ||
+ fut.getClass() == CheckBackupMiniFuture.class ||
+ fut.getClass() == CheckRemoteTxMiniFuture.class;
}
/**
@@ -394,7 +408,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
ClusterNode backup = cctx.discovery().node(backupId);
- final MiniFuture mini = new MiniFuture(backup, mapping);
+ final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(backup, mapping);
add(mini);
@@ -445,32 +459,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
else {
- GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
- cctx.localNodeId(),
- futureId(),
- mini.futureId(),
- tx.topologyVersion(),
- tx.xidVersion(),
- tx.commitVersion(),
- tx.threadId(),
- tx.isolation(),
- true,
- false,
- tx.system(),
- tx.ioPolicy(),
- false,
- tx.syncCommit(),
- tx.syncRollback(),
- null,
- null,
- null,
- null,
- 0,
- null,
- 0,
- tx.activeCachesDeploymentEnabled());
-
- finishReq.checkCommitted(true);
+ GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId());
try {
if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0)
@@ -482,10 +471,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
catch (ClusterTopologyCheckedException e) {
- mini.onResult(e);
+ mini.onNodeLeft(backupId, e);
}
catch (IgniteCheckedException e) {
- mini.onResult(e);
+ mini.onDone(e);
}
}
}
@@ -603,7 +592,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
add(fut);
}
else {
- MiniFuture fut = new MiniFuture(m);
+ FinishMiniFuture fut = new FinishMiniFuture(m);
req.miniId(fut.futureId());
@@ -623,11 +612,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
// Remove previous mapping.
mappings.remove(m.node().id());
- fut.onResult(e);
+ fut.onNodeLeft(n.id(), e);
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
- fut.onResult(e);
+ fut.onDone(e);
}
}
}
@@ -637,10 +626,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@SuppressWarnings("unchecked")
@Override public String apply(IgniteInternalFuture<?> f) {
- if (isMini(f)) {
- MiniFuture m = (MiniFuture)f;
+ if (f.getClass().equals(FinishMiniFuture.class)) {
+ FinishMiniFuture fut = (FinishMiniFuture)f;
- return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+ return "FinishFuture[node=" + fut.node().id() +
+ ", loc=" + fut.node().isLocal() +
+ ", done=" + fut.isDone() + "]";
+ }
+ else if (f.getClass().equals(CheckBackupMiniFuture.class)) {
+ CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
+
+ return "CheckBackupFuture[node=" + fut.node().id() +
+ ", loc=" + fut.node().isLocal() +
+ ", done=" + f.isDone() + "]";
+ }
+ else if (f.getClass().equals(CheckRemoteTxMiniFuture.class)) {
+ CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f;
+
+ return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]";
}
else
return "[loc=true, done=" + f.isDone() + "]";
@@ -653,108 +656,219 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
+ * @param miniId Mini future ID.
+ * @return Finish request.
+ */
+ private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) {
+ GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
+ cctx.localNodeId(),
+ futureId(),
+ miniId,
+ tx.topologyVersion(),
+ tx.xidVersion(),
+ tx.commitVersion(),
+ tx.threadId(),
+ tx.isolation(),
+ true,
+ false,
+ tx.system(),
+ tx.ioPolicy(),
+ false,
+ tx.syncCommit(),
+ tx.syncRollback(),
+ null,
+ null,
+ null,
+ null,
+ 0,
+ null,
+ 0,
+ tx.activeCachesDeploymentEnabled());
+
+ finishReq.checkCommitted(true);
+
+ return finishReq;
+ }
+
+ /**
+ *
+ */
+ private abstract class MinFuture extends GridFutureAdapter<IgniteInternalTx> {
+ /** */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /**
+ * @param nodeId Node ID.
+ * @param err Error.
+ * @return {@code True} future processed node failure.
+ */
+ abstract boolean onNodeLeft(UUID nodeId, ClusterTopologyCheckedException err);
+
+ /**
+ * @return Future ID.
+ */
+ final IgniteUuid futureId() {
+ return futId;
+ }
+ }
+
+ /**
* Mini-future for get operations. Mini-futures are only waiting on a single
* node as opposed to multiple nodes.
*/
- private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+ private class FinishMiniFuture extends MinFuture {
/** */
private static final long serialVersionUID = 0L;
- /** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
/** Keys. */
@GridToStringInclude
private GridDistributedTxMapping m;
- /** Backup check flag. */
- private ClusterNode backup;
-
/**
* @param m Mapping.
*/
- MiniFuture(GridDistributedTxMapping m) {
+ FinishMiniFuture(GridDistributedTxMapping m) {
this.m = m;
}
/**
- * @param backup Backup to check.
- * @param m Mapping associated with the backup.
+ * @return Node ID.
*/
- MiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
- this.backup = backup;
- this.m = m;
+ ClusterNode node() {
+ return m.node();
}
/**
- * @return Future ID.
+ * @return Keys.
*/
- IgniteUuid futureId() {
- return futId;
+ public GridDistributedTxMapping mapping() {
+ return m;
}
/**
- * @return Node ID.
+ * @param e Node failure.
*/
- public ClusterNode node() {
- assert m != null || backup != null;
+ boolean onNodeLeft(UUID nodeId, ClusterTopologyCheckedException e) {
+ if (nodeId.equals(m.node().id())) {
+ if (log.isDebugEnabled())
+ log.debug("Remote node left grid while sending or waiting for reply: " + this);
+
+ // Remove previous mapping.
+ mappings.remove(nodeId);
+
+ if (isSync()) {
+ Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
+
+ assert txNodes != null : tx;
+
+ Collection<UUID> backups = txNodes.get(nodeId);
+
+ if (!F.isEmpty(backups)) {
+ final CheckRemoteTxMiniFuture mini = new CheckRemoteTxMiniFuture(new HashSet<>(backups));
+
+ add(mini);
+
+ GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId());
- return backup != null ? backup : m.node();
+ for (UUID backupId : backups) {
+ ClusterNode backup = cctx.discovery().node(backupId);
+
+ if (backup != null && WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) <= 0) {
+ if (backup.isLocal()) {
+ IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(tx.nearXidVersion());
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ mini.onDhtFinishResponse(cctx.localNodeId());
+ }
+ });
+ }
+ else {
+ try {
+ cctx.io().send(backup, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e0) {
+ mini.onNodeLeft(backupId, e0);
+ }
+ catch (IgniteCheckedException e0) {
+ mini.onDone(e0);
+ }
+ }
+ }
+ else
+ mini.onDhtFinishResponse(backupId);
+ }
+ }
+ }
+
+ onDone(tx);
+
+ return true;
+ }
+
+ return false;
}
/**
- * @return Keys.
+ * @param res Result callback.
*/
- public GridDistributedTxMapping mapping() {
- return m;
+ void onNearFinishResponse(GridNearTxFinishResponse res) {
+ if (res.error() != null)
+ onDone(res.error());
+ else
+ onDone(tx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(FinishMiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
}
+ }
+
+ /**
+ *
+ */
+ private class CheckBackupMiniFuture extends MinFuture {
+ /** Keys. */
+ @GridToStringInclude
+ private GridDistributedTxMapping m;
+
+ /** Backup node to check. */
+ private ClusterNode backup;
/**
- * @param e Error.
+ * @param backup Backup to check.
+ * @param m Mapping associated with the backup.
*/
- void onResult(Throwable e) {
- if (log.isDebugEnabled())
- log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
- // Fail.
- onDone(e);
+ CheckBackupMiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
+ this.backup = backup;
+ this.m = m;
}
/**
- * @param e Node failure.
+ * @return Node ID.
*/
- void onResult(ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
+ public ClusterNode node() {
+ return backup;
+ }
- if (backup != null) {
+ /** {@inheritDoc} */
+ @Override boolean onNodeLeft(UUID nodeId, ClusterTopologyCheckedException e) {
+ if (nodeId.equals(backup.id())) {
readyNearMappingFromBackup(m);
onDone(e);
- }
- else
- // Complete future with tx.
- onDone(tx);
- }
- /**
- * @param res Result callback.
- */
- void onResult(GridNearTxFinishResponse res) {
- assert backup == null;
+ return true;
+ }
- if (res.error() != null)
- onDone(res.error());
- else
- onDone(tx);
+ return false;
}
/**
* @param res Response.
*/
- void onResult(GridDhtTxFinishResponse res) {
- assert backup != null;
-
+ void onDhtFinishResponse(GridDhtTxFinishResponse res) {
readyNearMappingFromBackup(m);
Throwable err = res.checkCommittedError();
@@ -774,9 +888,65 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
onDone(tx);
}
+ }
+
+ /**
+ *
+ */
+ private class CheckRemoteTxMiniFuture extends MinFuture {
+ /** */
+ private Set<UUID> nodes;
+
+ /**
+ * @param nodes Backup nodes.
+ */
+ public CheckRemoteTxMiniFuture(Set<UUID> nodes) {
+ this.nodes = nodes;
+ }
+
+ Set<UUID> nodes() {
+ synchronized (this) {
+ return new HashSet<>(nodes);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean onNodeLeft(UUID nodeId, ClusterTopologyCheckedException err) {
+ boolean done;
+
+ boolean ret;
+
+ synchronized (this) {
+ ret = nodes.remove(nodeId);
+
+ done = nodes.isEmpty();
+ }
+
+ if (done)
+ onDone(tx);
+
+ return ret;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ void onDhtFinishResponse(UUID nodeId) {
+ boolean done;
+
+ synchronized (this) {
+ nodes.remove(nodeId);
+
+ done = nodes.isEmpty();
+ }
+
+ if (done)
+ onDone(tx);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+ return S.toString(CheckRemoteTxMiniFuture.class, this);
}
}
}