You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/01/13 14:55:31 UTC
[47/50] [abbrv] ignite git commit: Fixes: - allow 'committing' ->
'marked_rollback' tx state change only for thread committing transaction -
fixed 'full_sync' mode for case when tx primary nodes fail - fixed race
between statically configured cache st
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 291c88a..1b40d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
@@ -44,6 +47,7 @@ import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -64,6 +68,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
public static final IgniteProductVersion FINISH_NEAR_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.4.0");
/** */
+ public static final IgniteProductVersion WAIT_REMOTE_TXS_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+ /** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
@@ -122,22 +129,23 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public boolean onNodeLeft(UUID nodeId) {
+ boolean found = false;
+
for (IgniteInternalFuture<?> fut : futures())
if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ MinFuture f = (MinFuture)fut;
- if (f.node().id().equals(nodeId)) {
+ if (f.onNodeLeft(nodeId)) {
// Remove previous mapping.
mappings.remove(nodeId);
- f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will fail): " + nodeId));
-
- return true;
+ found = true;
}
}
- return false;
+ return found;
}
/** {@inheritDoc} */
@@ -156,19 +164,32 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param nodeId Sender.
* @param res Result.
*/
+ @SuppressWarnings("ForLoopReplaceableByForEach")
public void onResult(UUID nodeId, GridNearTxFinishResponse res) {
- if (!isDone())
- for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ if (!isDone()) {
+ FinishMiniFuture finishFut = null;
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
+ synchronized (futs) {
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i);
+
+ if (fut.getClass() == FinishMiniFuture.class) {
+ FinishMiniFuture f = (FinishMiniFuture)fut;
- f.onResult(res);
+ if (f.futureId().equals(res.miniId())) {
+ assert f.node().id().equals(nodeId);
+
+ finishFut = f;
+
+ break;
+ }
}
}
}
+
+ if (finishFut != null)
+ finishFut.onNearFinishResponse(res);
+ }
}
/**
@@ -178,15 +199,21 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
if (!isDone())
for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ if (fut.getClass() == CheckBackupMiniFuture.class) {
+ CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
if (f.futureId().equals(res.miniId())) {
assert f.node().id().equals(nodeId);
- f.onResult(res);
+ f.onDhtFinishResponse(res);
}
}
+ else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
+ CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId()))
+ f.onDhtFinishResponse(nodeId);
+ }
}
}
@@ -204,9 +231,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
boolean marked = tx.setRollbackOnly();
- if (err instanceof NodeStoppingException)
- return super.onDone(null, err);
-
if (err instanceof IgniteTxRollbackCheckedException) {
if (marked) {
try {
@@ -289,11 +313,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
- * @param f Future.
+ * @param fut Future.
* @return {@code True} if mini-future.
*/
- private boolean isMini(IgniteInternalFuture<?> f) {
- return f.getClass().equals(MiniFuture.class);
+ private boolean isMini(IgniteInternalFuture<?> fut) {
+ return fut.getClass() == FinishMiniFuture.class ||
+ fut.getClass() == CheckBackupMiniFuture.class ||
+ fut.getClass() == CheckRemoteTxMiniFuture.class;
}
/**
@@ -393,7 +419,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
ClusterNode backup = cctx.discovery().node(backupId);
- MiniFuture mini = new MiniFuture(backup, mapping);
+ final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(backup, mapping);
add(mini);
@@ -414,8 +440,25 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
readyNearMappingFromBackup(mapping);
- if (committed)
+ if (committed) {
+ if (tx.syncCommit()) {
+ GridCacheVersion nearXidVer = tx.nearXidVersion();
+
+ assert nearXidVer != null : tx;
+
+ IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer);
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ mini.onDone(tx);
+ }
+ });
+
+ return;
+ }
+
mini.onDone(tx);
+ }
else {
ClusterTopologyCheckedException cause =
new ClusterTopologyCheckedException("Primary node left grid: " + nodeId);
@@ -427,46 +470,26 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
else {
- GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
- cctx.localNodeId(),
- futureId(),
- mini.futureId(),
- tx.topologyVersion(),
- tx.xidVersion(),
- tx.commitVersion(),
- tx.threadId(),
- tx.isolation(),
- true,
- false,
- tx.system(),
- tx.ioPolicy(),
- false,
- true,
- true,
- null,
- null,
- null,
- null,
- 0,
- null,
- 0,
- tx.activeCachesDeploymentEnabled());
-
- finishReq.checkCommitted(true);
+ GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId());
+
+ // Preserve old behavior, otherwise response is not sent.
+ if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) > 0)
+ finishReq.syncCommit(true);
try {
if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0)
cctx.io().send(backup, finishReq, tx.ioPolicy());
- else
+ else {
mini.onDone(new IgniteTxHeuristicCheckedException("Failed to check for tx commit on " +
"the backup node (node has an old Ignite version) [rmtNodeId=" + backup.id() +
", ver=" + backup.version() + ']'));
+ }
}
catch (ClusterTopologyCheckedException e) {
- mini.onResult(e);
+ mini.onNodeLeft(backupId);
}
catch (IgniteCheckedException e) {
- mini.onResult(e);
+ mini.onDone(e);
}
}
}
@@ -476,7 +499,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
- *
+ * @return {@code True} if need to send finish request for one phase commit transaction.
*/
private boolean needFinishOnePhase() {
if (tx.mappings().empty())
@@ -584,7 +607,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
add(fut);
}
else {
- MiniFuture fut = new MiniFuture(m);
+ FinishMiniFuture fut = new FinishMiniFuture(m);
req.miniId(fut.futureId());
@@ -604,11 +627,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
// Remove previous mapping.
mappings.remove(m.node().id());
- fut.onResult(e);
+ fut.onNodeLeft(n.id());
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
- fut.onResult(e);
+ fut.onDone(e);
}
}
}
@@ -618,10 +641,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@SuppressWarnings("unchecked")
@Override public String apply(IgniteInternalFuture<?> f) {
- if (isMini(f)) {
- MiniFuture m = (MiniFuture)f;
+ if (f.getClass() == FinishMiniFuture.class) {
+ FinishMiniFuture fut = (FinishMiniFuture)f;
+
+ return "FinishFuture[node=" + fut.node().id() +
+ ", loc=" + fut.node().isLocal() +
+ ", done=" + fut.isDone() + "]";
+ }
+ else if (f.getClass() == CheckBackupMiniFuture.class) {
+ CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
- return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+ return "CheckBackupFuture[node=" + fut.node().id() +
+ ", loc=" + fut.node().isLocal() +
+ ", done=" + f.isDone() + "]";
+ }
+ else if (f.getClass() == CheckRemoteTxMiniFuture.class) {
+ CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f;
+
+ return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]";
}
else
return "[loc=true, done=" + f.isDone() + "]";
@@ -634,108 +671,217 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
+ * @param miniId Mini future ID.
+ * @return Finish request.
+ */
+ private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) {
+ GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
+ cctx.localNodeId(),
+ futureId(),
+ miniId,
+ tx.topologyVersion(),
+ tx.xidVersion(),
+ tx.commitVersion(),
+ tx.threadId(),
+ tx.isolation(),
+ true,
+ false,
+ tx.system(),
+ tx.ioPolicy(),
+ false,
+ tx.syncCommit(),
+ tx.syncRollback(),
+ null,
+ null,
+ null,
+ null,
+ 0,
+ null,
+ 0,
+ tx.activeCachesDeploymentEnabled());
+
+ finishReq.checkCommitted(true);
+
+ return finishReq;
+ }
+
+ /**
+ *
+ */
+ private abstract class MinFuture extends GridFutureAdapter<IgniteInternalTx> {
+ /** */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /**
+ * @param nodeId Node ID.
+ * @return {@code True} if future processed node failure.
+ */
+ abstract boolean onNodeLeft(UUID nodeId);
+
+ /**
+ * @return Future ID.
+ */
+ final IgniteUuid futureId() {
+ return futId;
+ }
+ }
+
+ /**
* Mini-future for get operations. Mini-futures are only waiting on a single
* node as opposed to multiple nodes.
*/
- private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+ private class FinishMiniFuture extends MinFuture {
/** */
private static final long serialVersionUID = 0L;
- /** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
/** Keys. */
@GridToStringInclude
private GridDistributedTxMapping m;
- /** Backup check flag. */
- private ClusterNode backup;
-
/**
* @param m Mapping.
*/
- MiniFuture(GridDistributedTxMapping m) {
+ FinishMiniFuture(GridDistributedTxMapping m) {
this.m = m;
}
/**
- * @param backup Backup to check.
- * @param m Mapping associated with the backup.
+ * @return Node ID.
*/
- MiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
- this.backup = backup;
- this.m = m;
+ ClusterNode node() {
+ return m.node();
}
/**
- * @return Future ID.
+ * @return Keys.
*/
- IgniteUuid futureId() {
- return futId;
+ public GridDistributedTxMapping mapping() {
+ return m;
}
/**
- * @return Node ID.
+ * @param nodeId Failed node ID.
*/
- public ClusterNode node() {
- assert m != null || backup != null;
+ boolean onNodeLeft(UUID nodeId) {
+ if (nodeId.equals(m.node().id())) {
+ if (log.isDebugEnabled())
+ log.debug("Remote node left grid while sending or waiting for reply: " + this);
+
+ if (isSync()) {
+ Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
+
+ if (txNodes != null) {
+ Collection<UUID> backups = txNodes.get(nodeId);
+
+ if (!F.isEmpty(backups)) {
+ final CheckRemoteTxMiniFuture mini = new CheckRemoteTxMiniFuture(new HashSet<>(backups));
+
+ add(mini);
+
+ GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId());
+
+ req.waitRemoteTransactions(true);
+
+ for (UUID backupId : backups) {
+ ClusterNode backup = cctx.discovery().node(backupId);
+
+ if (backup != null && WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) <= 0) {
+ if (backup.isLocal()) {
+ IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(tx.nearXidVersion());
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ mini.onDhtFinishResponse(cctx.localNodeId());
+ }
+ });
+ }
+ else {
+ try {
+ cctx.io().send(backup, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ mini.onNodeLeft(backupId);
+ }
+ catch (IgniteCheckedException e) {
+ mini.onDone(e);
+ }
+ }
+ }
+ else
+ mini.onDhtFinishResponse(backupId);
+ }
+ }
+ }
+ }
- return backup != null ? backup : m.node();
+ onDone(tx);
+
+ return true;
+ }
+
+ return false;
}
/**
- * @return Keys.
+ * @param res Result callback.
*/
- public GridDistributedTxMapping mapping() {
- return m;
+ void onNearFinishResponse(GridNearTxFinishResponse res) {
+ if (res.error() != null)
+ onDone(res.error());
+ else
+ onDone(tx);
}
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(FinishMiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+ }
+ }
+
+ /**
+ *
+ */
+ private class CheckBackupMiniFuture extends MinFuture {
+ /** Keys. */
+ @GridToStringInclude
+ private GridDistributedTxMapping m;
+
+ /** Backup node to check. */
+ private ClusterNode backup;
+
/**
- * @param e Error.
+ * @param backup Backup to check.
+ * @param m Mapping associated with the backup.
*/
- void onResult(Throwable e) {
- if (log.isDebugEnabled())
- log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
- // Fail.
- onDone(e);
+ CheckBackupMiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
+ this.backup = backup;
+ this.m = m;
}
/**
- * @param e Node failure.
+ * @return Node ID.
*/
- void onResult(ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
+ public ClusterNode node() {
+ return backup;
+ }
- if (backup != null) {
+ /** {@inheritDoc} */
+ @Override boolean onNodeLeft(UUID nodeId) {
+ if (nodeId.equals(backup.id())) {
readyNearMappingFromBackup(m);
- onDone(e);
- }
- else
- // Complete future with tx.
- onDone(tx);
- }
+ onDone(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
- /**
- * @param res Result callback.
- */
- void onResult(GridNearTxFinishResponse res) {
- assert backup == null;
+ return true;
+ }
- if (res.error() != null)
- onDone(res.error());
- else
- onDone(tx);
+ return false;
}
/**
* @param res Response.
*/
- void onResult(GridDhtTxFinishResponse res) {
- assert backup != null;
-
+ void onDhtFinishResponse(GridDhtTxFinishResponse res) {
readyNearMappingFromBackup(m);
Throwable err = res.checkCommittedError();
@@ -755,9 +901,67 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
onDone(tx);
}
+ }
+
+ /**
+ *
+ */
+ private class CheckRemoteTxMiniFuture extends MinFuture {
+ /** */
+ private Set<UUID> nodes;
+
+ /**
+ * @param nodes Backup nodes.
+ */
+ public CheckRemoteTxMiniFuture(Set<UUID> nodes) {
+ this.nodes = nodes;
+ }
+
+ /**
+ * @return Backup nodes.
+ */
+ Set<UUID> nodes() {
+ synchronized (this) {
+ return new HashSet<>(nodes);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean onNodeLeft(UUID nodeId) {
+ return onResponse(nodeId);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ void onDhtFinishResponse(UUID nodeId) {
+ onResponse(nodeId);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return {@code True} if processed node response.
+ */
+ private boolean onResponse(UUID nodeId) {
+ boolean done;
+
+ boolean ret;
+
+ synchronized (this) {
+ ret = nodes.remove(nodeId);
+
+ done = nodes.isEmpty();
+ }
+
+ if (done)
+ onDone(tx);
+
+ return ret;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+ return S.toString(CheckRemoteTxMiniFuture.class, this);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 3e5e28f..65eac63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -70,6 +70,9 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
* @param commit Commit flag.
* @param invalidate Invalidate flag.
* @param sys System flag.
+ * @param plc IO policy.
+ * @param syncCommit Sync commit flag.
+ * @param syncRollback Sync rollback flag.
* @param explicitLock Explicit lock flag.
* @param storeEnabled Store enabled flag.
* @param topVer Topology version.
@@ -77,6 +80,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
* @param txSize Expected transaction size.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash.
* @param addDepInfo Deployment info flag.
*/
public GridNearTxFinishRequest(
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 4904ad8..b84d2fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -99,7 +99,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
}
@@ -107,7 +107,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index aa4e929f..b7b480e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -202,7 +202,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
- @Nullable @Override public GridCacheVersion nearXidVersion() {
+ @Override public GridCacheVersion nearXidVersion() {
return xidVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index d886243..8812709 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -264,7 +264,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (ownedVals != null) {
+ if (ownedVals != null && ownedValKeys == null) {
ownedValKeys = ownedVals.keySet();
ownedValVals = ownedVals.values();
@@ -287,7 +287,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
if (filterFailedKeys != null) {
- for (IgniteTxKey key :filterFailedKeys) {
+ for (IgniteTxKey key : filterFailedKeys) {
GridCacheContext cctx = ctx.cacheContext(key.cacheId());
key.prepareMarshal(cctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 59d8b5b..dc98eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -280,28 +280,28 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
GridCacheContext cctx = ctx.cacheContext(cacheId);
- if (keyValFilter != null) {
+ if (keyValFilter != null && keyValFilterBytes == null) {
if (addDepInfo)
prepareObject(keyValFilter, cctx);
keyValFilterBytes = CU.marshal(cctx, keyValFilter);
}
- if (rdc != null) {
+ if (rdc != null && rdcBytes == null) {
if (addDepInfo)
prepareObject(rdc, cctx);
rdcBytes = CU.marshal(cctx, rdc);
}
- if (trans != null) {
+ if (trans != null && transBytes == null) {
if (addDepInfo)
prepareObject(trans, cctx);
transBytes = CU.marshal(cctx, trans);
}
- if (!F.isEmpty(args)) {
+ if (!F.isEmpty(args) && argsBytes == null) {
if (addDepInfo) {
for (Object arg : args)
prepareObject(arg, cctx);
@@ -317,16 +317,16 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
Marshaller mrsh = ctx.marshaller();
- if (keyValFilterBytes != null)
+ if (keyValFilterBytes != null && keyValFilter == null)
keyValFilter = mrsh.unmarshal(keyValFilterBytes, ldr);
- if (rdcBytes != null)
+ if (rdcBytes != null && rdc == null)
rdc = mrsh.unmarshal(rdcBytes, ldr);
- if (transBytes != null)
+ if (transBytes != null && trans == null)
trans = mrsh.unmarshal(transBytes, ldr);
- if (argsBytes != null)
+ if (argsBytes != null && args == null)
args = mrsh.unmarshal(argsBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index cce465b..ab882d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -122,11 +122,14 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
GridCacheContext cctx = ctx.cacheContext(cacheId);
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
- metaDataBytes = marshalCollection(metadata, cctx);
- dataBytes = marshalCollection(data, cctx);
+ if (metaDataBytes == null)
+ metaDataBytes = marshalCollection(metadata, cctx);
+
+ if (dataBytes == null)
+ dataBytes = marshalCollection(data, cctx);
if (addDepInfo && !F.isEmpty(data)) {
for (Object o : data) {
@@ -144,11 +147,14 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
- metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
- data = unmarshalCollection(dataBytes, ctx, ldr);
+ if (metadata == null)
+ metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
+
+ if (data == null)
+ data = unmarshalCollection(dataBytes, ctx, ldr);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index f5f99f5..914b4ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -160,6 +160,12 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
public long timeout(long timeout);
/**
+ * Changes transaction state from COMMITTING to MARKED_ROLLBACK.
+ * Must be called only from thread committing transaction.
+ */
+ public void errorWhenCommitting();
+
+ /**
* Modify the transaction associated with the current thread such that the
* only possible outcome of the transaction is to roll back the
* transaction.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 22e27c3..ed44c49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -804,6 +804,22 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
+ public final void errorWhenCommitting() {
+ synchronized (this) {
+ TransactionState prev = state;
+
+ assert prev == COMMITTING : prev;
+
+ state = MARKED_ROLLBACK;
+
+ if (log.isDebugEnabled())
+ log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']');
+
+ notifyAll();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public boolean setRollbackOnly() {
return state(MARKED_ROLLBACK);
}
@@ -1083,7 +1099,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
case MARKED_ROLLBACK: {
- valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == COMMITTING;
+ valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED;
break;
}
@@ -1705,6 +1721,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
+ @Override public void errorWhenCommitting() {
+ throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+ }
+
+ /** {@inheritDoc} */
@Override public void commit() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index c42bc7f..f731975 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -828,7 +828,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
val.marshal(ctx, context());
- expiryPlcBytes = transferExpiryPlc ? CU.marshal(this.ctx, new IgniteExternalizableExpiryPolicy(expiryPlc)) : null;
+ if (transferExpiryPlc) {
+ if (expiryPlcBytes == null)
+ expiryPlcBytes = CU.marshal(this.ctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+ }
+ else
+ expiryPlcBytes = null;
}
/**
@@ -871,8 +876,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
val.unmarshal(this.ctx, clsLdr);
- if (expiryPlcBytes != null)
- expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, clsLdr);
+ if (expiryPlcBytes != null && expiryPlc == null)
+ expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, clsLdr);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index b25baf8..547c018 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -721,14 +721,12 @@ public class IgniteTxHandler {
IgniteInternalFuture<IgniteInternalTx> res = null;
- if (tx != null) {
- IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
+ IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
- // Only for error logging.
- rollbackFut.listen(CU.errorLogger(log));
+ // Only for error logging.
+ rollbackFut.listen(CU.errorLogger(log));
- res = rollbackFut;
- }
+ res = rollbackFut;
if (e instanceof Error)
throw (Error)e;
@@ -875,7 +873,19 @@ public class IgniteTxHandler {
log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']');
if (req.checkCommitted()) {
- sendReply(nodeId, req, !ctx.tm().addRolledbackTx(null, req.version()));
+ boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version());
+
+ if (!committed || !req.syncCommit())
+ sendReply(nodeId, req, committed);
+ else {
+ IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ sendReply(nodeId, req, true);
+ }
+ });
+ }
return;
}
@@ -1044,7 +1054,7 @@ public class IgniteTxHandler {
* @param committed {@code True} if transaction committed on this node.
*/
protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
- if (req.replyRequired()) {
+ if (req.replyRequired() || req.checkCommitted()) {
GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
if (req.checkCommitted()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 21ff0cf..926eaf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -809,7 +809,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
catch (IgniteCheckedException ex) {
commitError(ex);
- setRollbackOnly();
+ errorWhenCommitting();
// Safe to remove transaction from committed tx list because nothing was committed yet.
cctx.tm().removeCommittedTx(this);
@@ -819,7 +819,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
catch (Throwable ex) {
commitError(ex);
- setRollbackOnly();
+ errorWhenCommitting();
// Safe to remove transaction from committed tx list because nothing was committed yet.
cctx.tm().removeCommittedTx(this);
@@ -1161,7 +1161,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
// Set operation to NOOP.
txEntry.op(NOOP);
- setRollbackOnly();
+ errorWhenCommitting();
throw ex;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index d384e4e..ca15e20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1063,6 +1063,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
uncommitTx(tx);
+ tx.errorWhenCommitting();
+
throw new IgniteException("Missing commit version (consider increasing " +
IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() +
", tx=" + tx.getClass().getSimpleName() + ']');
@@ -1616,6 +1618,24 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param nearVer Near version.
+ * @return Finish future for related remote transactions.
+ */
+ @SuppressWarnings("unchecked")
+ public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion nearVer) {
+ GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>();
+
+ for (final IgniteInternalTx tx : txs()) {
+ if (!tx.local() && nearVer.equals(tx.nearXidVersion()))
+ fut.add((IgniteInternalFuture) tx.finishFuture());
+ }
+
+ fut.markInitialized();
+
+ return fut;
+ }
+
+ /**
* @param nearVer Near version ID.
* @param txNum Number of transactions.
* @param fut Result future.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index 3d65304..77c802d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -105,6 +105,7 @@ public class DataStreamerRequest implements Message {
* @param entries Entries to put.
* @param ignoreDepOwnership Ignore ownership.
* @param skipStore Skip store flag.
+ * @param keepBinary Keep binary flag.
* @param depMode Deployment mode.
* @param sampleClsName Sample class name.
* @param userVer User version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index cd783e4..98848ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -926,8 +926,15 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
CacheConfiguration newCfg = cacheConfiguration(cfg, cacheName);
- if (ctx.cache().cache(cacheName) == null)
- ctx.cache().dynamicStartCache(newCfg, cacheName, null, CacheType.INTERNAL, false, true).get();
+ if (ctx.cache().cache(cacheName) == null) {
+ ctx.cache().dynamicStartCache(newCfg,
+ cacheName,
+ null,
+ CacheType.INTERNAL,
+ false,
+ true,
+ true).get();
+ }
assert ctx.cache().cache(cacheName) != null : cacheName;
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
index f4a8fad..ecb892e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
@@ -90,7 +90,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
super.prepareMarshal(marsh);
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = marsh.marshal(err);
}
@@ -98,7 +98,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
@Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(marsh, ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = marsh.unmarshal(errBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 65dca08..a89913f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -357,7 +357,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
switch (cmd) {
case DESTROY_CACHE: {
- fut = ((IgniteKernal)ctx.grid()).destroyCacheAsync(cacheName).chain(
+ // Do not check thread tx here since there can be active system cache txs.
+ fut = ((IgniteKernal)ctx.grid()).destroyCacheAsync(cacheName, false).chain(
new CX1<IgniteInternalFuture<?>, GridRestResponse>() {
@Override public GridRestResponse applyx(IgniteInternalFuture<?> f)
throws IgniteCheckedException {
@@ -369,7 +370,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
}
case GET_OR_CREATE_CACHE: {
- fut = ((IgniteKernal)ctx.grid()).getOrCreateCacheAsync(cacheName).chain(
+ // Do not check thread tx here since there can be active system cache txs.
+ fut = ((IgniteKernal)ctx.grid()).getOrCreateCacheAsync(cacheName, false).chain(
new CX1<IgniteInternalFuture<?>, GridRestResponse>() {
@Override public GridRestResponse applyx(IgniteInternalFuture<?> f)
throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 1ea5014..8c23d92 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -104,6 +104,7 @@ public interface DiscoverySpi extends IgniteSpi {
* Sets a handler for initial data exchange between Ignite nodes.
*
* @param exchange Discovery data exchange handler.
+ * @return {@code this} for chaining.
*/
public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange);
@@ -113,6 +114,7 @@ public interface DiscoverySpi extends IgniteSpi {
* dynamic metrics between nodes.
*
* @param metricsProvider Provider of metrics data.
+ * @return {@code this} for chaining.
*/
public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 066a5fd..21204c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -204,7 +204,8 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
* Stops streamer.
*/
public void stop() {
- srv.stop();
+ if (srv != null)
+ srv.stop();
if (log.isDebugEnabled())
log.debug("Socket streaming server stopped");
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
index 95ca9b5..9908b87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
@@ -324,10 +324,14 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd = random();
+ Set<Integer> ids = new HashSet<>(set);
+
cache.removeAll(rangeKeys(0, rnd));
- for (int i = 0; i < rnd; i++)
- assert cache.localPeek("key" + i, CachePeekMode.ONHEAP) == null;
+ for (int i = 0; i < rnd; i++) {
+ if (ids.contains(i))
+ assertNull(cache.localPeek("key" + i));
+ }
}
});
}
@@ -350,7 +354,7 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
for (int i = 0; i < rnd; i++) {
if (ids.contains(i))
- assert cache.localPeek("key" + i, CachePeekMode.ONHEAP) == null;
+ assertNull(cache.localPeek("key" + i));
}
}
});
@@ -359,6 +363,7 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
/**
* @param cache Cache.
* @param key Key.
+ * @return Removed value.
*/
private <K, V> V removeAsync(IgniteCache<K, V> cache, K key) {
IgniteCache<K, V> cacheAsync = cache.withAsync();
@@ -371,6 +376,8 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
/**
* @param cache Cache.
* @param key Key.
+ * @param val Value.
+ * @return Remove result.
*/
private <K, V> boolean removeAsync(IgniteCache<K, V> cache, K key, V val) {
IgniteCache<K, V> cacheAsync = cache.withAsync();
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 0d9c541..1e0071e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4337,7 +4337,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
log.info("Set iterators not cleared, will wait");
- Thread.sleep(500);
+ Thread.sleep(1000);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
index a34857f..e70c97b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
@@ -299,7 +299,7 @@ public class GridCacheStopSelfTest extends GridCommonAbstractTest {
return null;
}
- }));
+ }, "cache-thread"));
}
readyLatch.await();
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 8a5dfd4..c9cd750 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -158,7 +158,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
ccfg.setName(DYNAMIC_CACHE_NAME);
- futs.add(kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, true));
+ futs.add(kernal.context().cache().dynamicStartCache(ccfg,
+ ccfg.getName(),
+ null,
+ true,
+ true,
+ true));
return null;
}
@@ -190,7 +195,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
GridTestUtils.runMultiThreaded(new Callable<Object>() {
@Override public Object call() throws Exception {
- futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME));
+ futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true));
return null;
}
@@ -218,7 +223,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
- futs.add(kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, true));
+ futs.add(kernal.context().cache().dynamicStartCache(ccfg,
+ ccfg.getName(),
+ null,
+ true,
+ true,
+ true));
return null;
}
@@ -252,7 +262,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
@Override public Object call() throws Exception {
IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
- futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME));
+ futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true));
return null;
}
@@ -315,7 +325,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount(); g++)
caches[g] = grid(g).cache(DYNAMIC_CACHE_NAME);
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
for (int g = 0; g < nodeCount(); g++) {
final IgniteKernal kernal0 = (IgniteKernal) grid(g);
@@ -368,7 +378,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
}
// Undeploy cache.
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
startGrid(nodeCount() + 1);
@@ -445,7 +455,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
}, IllegalArgumentException.class, null);
}
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
stopGrid(nodeCount() + 1);
stopGrid(nodeCount());
@@ -512,7 +522,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount() + 1; g++)
assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
}
finally {
stopGrid(nodeCount());
@@ -554,7 +564,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount() + 1; g++)
assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
}
finally {
stopGrid(nodeCount());
@@ -600,7 +610,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount() + 1; g++)
assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
}
finally {
stopGrid(nodeCount());
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
index bf6dcda..34e7080 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -59,6 +60,8 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
return cfg;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
index 84838db..a08d080 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -188,6 +188,9 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
}
/**
+ * @param conc Transaction concurrency.
+ * @param backup Check backup flag.
+ * @param commit Check commit flag.
* @throws Exception If failed.
*/
private void checkPrimaryNodeFailureBackupCommit(
@@ -197,6 +200,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
) throws Exception {
try {
startGrids(gridCount());
+
awaitPartitionMapExchange();
for (int i = 0; i < gridCount(); i++)
@@ -290,7 +294,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
return null;
}
- });
+ }, "tx-thread");
commitLatch.await();
@@ -366,6 +370,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
/**
* @param ignite Ignite instance to generate key.
+ * @param backup Backup key flag.
* @return Generated key that is not primary nor backup for {@code ignite(0)} and primary for
* {@code ignite(1)}.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
new file mode 100644
index 0000000..c47401c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheCommitDelayTxRecoveryTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int SRVS = 4;
+
+ /** */
+ private static volatile boolean commit;
+
+ /** */
+ private static volatile CountDownLatch commitStartedLatch;
+
+ /** */
+ private static volatile CountDownLatch commitFinishLatch;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRecovery1() throws Exception {
+ checkRecovery(1, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRecovery2() throws Exception {
+ checkRecovery(2, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRecoveryStoreEnabled1() throws Exception {
+ checkRecovery(1, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRecoveryStoreEnabled2() throws Exception {
+ checkRecovery(2, true);
+ }
+
+ /**
+ * @param backups Number of cache backups.
+ * @param useStore If {@code true} tests cache with store configured.
+ * @throws Exception If failed.
+ */
+ private void checkRecovery(int backups, boolean useStore) throws Exception {
+ startGridsMultiThreaded(SRVS, false);
+
+ client = true;
+
+ Ignite clientNode = startGrid(SRVS);
+
+ assertTrue(clientNode.configuration().isClientMode());
+
+ client = false;
+
+ clientNode.createCache(cacheConfiguration(backups, useStore));
+
+ awaitPartitionMapExchange();
+
+ Ignite srv = ignite(0);
+
+ assertFalse(srv.configuration().isClientMode());
+
+ for (Boolean pessimistic : Arrays.asList(false, true)) {
+ checkRecovery(backupKey(srv.cache(null)), srv, pessimistic, useStore);
+
+ checkRecovery(nearKey(srv.cache(null)), srv, pessimistic, useStore);
+
+ checkRecovery(nearKey(clientNode.cache(null)), clientNode, pessimistic, useStore);
+
+ srv = ignite(0);
+
+ assertFalse(srv.configuration().isClientMode());
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param ignite Node executing update.
+ * @param pessimistic If {@code true} uses pessimistic transaction.
+ * @param useStore {@code True} if store is used.
+ * @throws Exception If failed.
+ */
+ private void checkRecovery(final Integer key,
+ final Ignite ignite,
+ final boolean pessimistic,
+ final boolean useStore) throws Exception {
+ Ignite primary = primaryNode(key, null);
+
+ assertNotSame(ignite, primary);
+
+ List<Ignite> backups = backupNodes(key, null);
+
+ assertFalse(backups.isEmpty());
+
+ final Set<String> backupNames = new HashSet<>();
+
+ for (Ignite node : backups)
+ backupNames.add(node.name());
+
+ log.info("Check recovery [key=" + key +
+ ", pessimistic=" + pessimistic +
+ ", primary=" + primary.name() +
+ ", backups=" + backupNames +
+ ", node=" + ignite.name() + ']');
+
+ final IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ cache.put(key, 0);
+
+ commitStartedLatch = new CountDownLatch(backupNames.size());
+ commitFinishLatch = new CountDownLatch(1);
+
+ commit = false;
+
+ TestEntryProcessor.skipFirst = useStore ? ignite.name() : null;
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Start update.");
+
+ if (pessimistic) {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.invoke(key, new TestEntryProcessor(backupNames));
+
+ commit = true;
+
+ log.info("Start commit.");
+
+ assertEquals(backupNames.size(), commitStartedLatch.getCount());
+
+ tx.commit();
+ }
+ }
+ else {
+ commit = true;
+
+ cache.invoke(key, new TestEntryProcessor(backupNames));
+ }
+
+ log.info("End update, execute get.");
+
+ Integer val = cache.get(key);
+
+ log.info("Get value: " + val);
+
+ assertEquals(1, (Object)val);
+
+ return null;
+ }
+ }, "update-thread");
+
+ assertTrue(commitStartedLatch.await(30, SECONDS));
+
+ log.info("Stop node: " + primary.name());
+
+ primary.close();
+
+ commitFinishLatch.countDown();
+
+ fut.get();
+
+ for (Ignite node : G.allGrids())
+ assertEquals(1, node.cache(null).get(key));
+
+ cache.put(key, 2);
+
+ for (Ignite node : G.allGrids())
+ assertEquals(2, node.cache(null).get(key));
+
+ startGrid(primary.name());
+
+ for (Ignite node : G.allGrids())
+ assertEquals(2, node.cache(null).get(key));
+
+ cache.put(key, 3);
+
+ for (Ignite node : G.allGrids())
+ assertEquals(3, node.cache(null).get(key));
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ *
+ */
+ static class TestEntryProcessor implements CacheEntryProcessor<Integer, Integer, Void> {
+ /** */
+ private Set<String> nodeNames;
+
+ /** Skips first call for given node (used to skip call for store update). */
+ private static String skipFirst;
+
+ /**
+ * @param nodeNames Node names where sleep will be called.
+ */
+ public TestEntryProcessor(Set<String> nodeNames) {
+ this.nodeNames = nodeNames;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<Integer, Integer> entry, Object... args) {
+ Ignite ignite = entry.unwrap(Ignite.class);
+
+ System.out.println(Thread.currentThread().getName() + " process [node=" + ignite.name() +
+ ", commit=" + commit + ", skipFirst=" + skipFirst + ']');
+
+ boolean skip = false;
+
+ if (commit && ignite.name().equals(skipFirst)) {
+ skipFirst = null;
+
+ skip = true;
+ }
+
+ if (!skip && commit && nodeNames.contains(ignite.name())) {
+ try {
+ System.out.println(Thread.currentThread().getName() + " start process invoke.");
+
+ assertTrue(commitStartedLatch != null && commitStartedLatch.getCount() > 0);
+
+ commitStartedLatch.countDown();
+
+ assertTrue(commitFinishLatch.await(10, SECONDS));
+
+ System.out.println(Thread.currentThread().getName() + " end process invoke.");
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ System.out.println(Thread.currentThread().getName() + " invoke set value.");
+
+ entry.setValue(1);
+
+ return null;
+ }
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param useStore If {@code true} adds cache store.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration(int backups, boolean useStore) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setBackups(backups);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setRebalanceMode(SYNC);
+
+ if (useStore) {
+ ccfg.setCacheStoreFactory(new TestStoreFactory());
+
+ ccfg.setWriteThrough(true);
+ }
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+ /** {@inheritDoc} */
+ @Override public CacheStore<Object, Object> create() {
+ return new CacheStoreAdapter<Object, Object>() {
+ @Override public Object load(Object key) throws CacheLoaderException {
+ return null;
+ }
+
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ // No-op.
+ }
+
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 4eb8a6b..7532354 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -30,6 +30,8 @@ import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
+
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -238,7 +240,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
* @param store If {@code true} uses cache with store.
* @throws Exception If failed.
*/
- private void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception {
+ protected final void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception {
ignite(0).createCache(cacheConfiguration(memMode, store));
final AtomicBoolean finished = new AtomicBoolean();
@@ -259,7 +261,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
}
});
- IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
+ final IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
int iter = 0;
@@ -309,6 +311,31 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
break;
}
+ case TX_PUT: {
+ while (System.currentTimeMillis() < stopTime) {
+ final Integer val = ++iter;
+
+ Ignite ignite = ignite(0);
+
+ for (int i = 0; i < keysCnt; i++) {
+ final Integer key = i;
+
+ doInTransaction(ignite, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.put(key, val);
+
+ return null;
+ }
+ });
+ }
+
+ for (int i = 0; i < keysCnt; i++)
+ assertEquals(val, cache.get(i));
+ }
+
+ break;
+ }
+
case PUT_ALL: {
while (System.currentTimeMillis() < stopTime) {
Integer val = ++iter;
@@ -541,7 +568,10 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
INVOKE,
/** */
- INVOKE_ALL
+ INVOKE_ALL,
+
+ /** */
+ TX_PUT
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 7655464..9204bc8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -101,6 +101,20 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
/**
* @throws Exception If failed.
*/
+ public void testExplicitTransactionRetriesSingleValue() throws Exception {
+ checkRetry(Test.TX_PUT, TestMemoryMode.HEAP, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExplicitTransactionRetriesSingleValueStoreEnabled() throws Exception {
+ checkRetry(Test.TX_PUT, TestMemoryMode.HEAP, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testExplicitTransactionRetries() throws Exception {
explicitTransactionRetries(TestMemoryMode.HEAP, false);
}
@@ -108,6 +122,13 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
/**
* @throws Exception If failed.
*/
+ public void testExplicitTransactionRetriesSingleOperation() throws Exception {
+ explicitTransactionRetries(TestMemoryMode.HEAP, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testExplicitTransactionRetriesStoreEnabled() throws Exception {
explicitTransactionRetries(TestMemoryMode.HEAP, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index d239ea8..91eecbb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -104,6 +105,8 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(disc);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
if (include)
cfg.setUserAttributes(F.asMap("include", true));
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index 6089795..552dd28 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -569,9 +569,9 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
GridNioServerListener lsnr,
@Nullable Integer queueLimit) throws Exception {
for (int i = 0; i < 10; i++) {
- try {
- int srvPort = port++;
+ int srvPort = port++;
+ try {
GridNioServer.Builder<?> builder = serverBuilder(srvPort, parser, lsnr);
if (queueLimit != null)
@@ -584,8 +584,11 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
return srvr;
}
catch (IgniteCheckedException e) {
- if (i < 9 && e.hasCause(BindException.class))
- log.error("Failed to start server, will try another port: " + e);
+ if (i < 9 && e.hasCause(BindException.class)) {
+ log.error("Failed to start server, will try another port [err=" + e + ", port=" + srvPort + ']');
+
+ U.sleep(5000);
+ }
else
throw e;
}