You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/05/13 10:43:06 UTC
[01/21] incubator-ignite git commit: # ignite-157
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-gg-9614 925d8a010 -> 506d742cd
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/93876df9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/93876df9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/93876df9
Branch: refs/heads/ignite-gg-9614
Commit: 93876df9373c260eaa1e8f8dc9e8edbb82110810
Parents: c3f3dd1
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 7 11:36:38 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 7 11:36:38 2015 +0300
----------------------------------------------------------------------
.../cache/distributed/dht/GridDhtTxMapping.java | 2 +-
.../colocated/GridDhtColocatedLockFuture.java | 25 +-
.../colocated/GridDhtDetachedCacheEntry.java | 4 +-
.../near/GridAbstractNearTxPrepareFuture.java | 219 ++++++
.../distributed/near/GridNearCacheEntry.java | 4 +-
.../distributed/near/GridNearLockFuture.java | 5 -
.../near/GridNearOptimisticTxPrepareFuture.java | 763 +++++++++++++++++++
.../GridNearPessimisticTxPrepareFuture.java | 311 ++++++++
.../cache/distributed/near/GridNearTxLocal.java | 41 +-
.../near/GridNearTxPrepareFuture.java | 12 +-
.../cache/transactions/IgniteTxHandler.java | 2 +-
11 files changed, 1313 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
index d207d76..ba2c35f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java
@@ -28,7 +28,7 @@ import java.util.*;
/**
* DHT transaction mapping.
*/
-public class GridDhtTxMapping<K, V> {
+public class GridDhtTxMapping {
/** Transaction nodes mapping (primary node -> related backup nodes). */
private final Map<UUID, Collection<UUID>> txNodes = new GridLeanMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 5b74b31..7da6346 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -1279,25 +1279,18 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
- try {
- if (res.dhtVersion(i) == null) {
- onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
- "(will fail the lock): " + res));
-
- return;
- }
-
- // Set value to detached entry.
- entry.resetFromPrimary(newVal, dhtVer);
-
- if (log.isDebugEnabled())
- log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ if (res.dhtVersion(i) == null) {
+ onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+ "(will fail the lock): " + res));
return;
}
+
+ // Set value to detached entry.
+ entry.resetFromPrimary(newVal, dhtVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
}
else
cctx.mvcc().markExplicitOwner(k, threadId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 5c4dd13..2c84bd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -46,10 +46,8 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
*
* @param val Value.
* @param ver Version.
- * @throws IgniteCheckedException If value unmarshalling failed.
*/
- public void resetFromPrimary(CacheObject val, GridCacheVersion ver)
- throws IgniteCheckedException {
+ public void resetFromPrimary(CacheObject val, GridCacheVersion ver) {
value(val);
this.ver = ver;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
new file mode 100644
index 0000000..905f018
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import javax.cache.expiry.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
+
+/**
+ * Common code for tx prepare in optimistic and pessimistic modes.
+ */
+public abstract class GridAbstractNearTxPrepareFuture extends GridCompoundIdentityFuture<IgniteInternalTx>
+ implements GridCacheFuture<IgniteInternalTx> {
+ /** Logger reference. */
+ protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
+ /** Context. */
+ protected GridCacheSharedContext<?, ?> cctx;
+
+ /** Future ID. */
+ protected IgniteUuid futId;
+
+ /** Transaction. */
+ @GridToStringInclude
+ protected GridNearTxLocal tx;
+
+ /** Error. */
+ @GridToStringExclude
+ protected AtomicReference<Throwable> err = new AtomicReference<>(null);
+
+ /** Trackable flag. */
+ protected boolean trackable = true;
+
+ /** Full information about transaction nodes mapping. */
+ protected GridDhtTxMapping txMapping;
+
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ */
+ public GridAbstractNearTxPrepareFuture(GridCacheSharedContext cctx, final GridNearTxLocal tx) {
+ super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
+ @Override public boolean collect(IgniteInternalTx e) {
+ return true;
+ }
+
+ @Override public IgniteInternalTx reduce() {
+ // Nothing to aggregate.
+ return tx;
+ }
+ });
+
+ assert cctx != null;
+ assert tx != null;
+
+ this.cctx = cctx;
+ this.tx = tx;
+
+ futId = IgniteUuid.randomUuid();
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridAbstractNearTxPrepareFuture.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion version() {
+ return tx.xidVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ trackable = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return trackable;
+ }
+
+ /**
+ * Prepares transaction.
+ */
+ public abstract void prepare();
+
+ /**
+ * @param nodeId Sender.
+ * @param res Result.
+ */
+ public abstract void onResult(UUID nodeId, GridNearTxPrepareResponse res);
+
+ /**
+ * Checks if mapped transaction can be committed on one phase.
+ * One-phase commit can be done if transaction maps to one primary node and not more than one backup.
+ */
+ protected final void checkOnePhase() {
+ if (tx.storeUsed())
+ return;
+
+ Map<UUID, Collection<UUID>> map = txMapping.transactionNodes();
+
+ if (map.size() == 1) {
+ Map.Entry<UUID, Collection<UUID>> entry = F.firstEntry(map);
+
+ assert entry != null;
+
+ Collection<UUID> backups = entry.getValue();
+
+ if (backups.size() <= 1)
+ tx.onePhaseCommit(true);
+ }
+ }
+
+ /**
+ * @param m Mapping.
+ * @param res Response.
+ */
+ protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+ assert res.error() == null : res;
+ assert F.isEmpty(res.invalidPartitions()) : res;
+
+ for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {
+ IgniteTxEntry txEntry = tx.entry(entry.getKey());
+
+ assert txEntry != null;
+
+ GridCacheContext cacheCtx = txEntry.context();
+
+ while (true) {
+ try {
+ if (cacheCtx.isNear()) {
+ GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached();
+
+ CacheVersionedValue tup = entry.getValue();
+
+ nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(),
+ tup.version(), m.node().id(), tx.topologyVersion());
+ }
+ else if (txEntry.cached().detached()) {
+ GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
+
+ CacheVersionedValue tup = entry.getValue();
+
+ detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion());
+ }
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // Retry.
+ }
+ }
+ }
+
+ tx.implicitSingleResult(res.returnValue());
+
+ for (IgniteTxKey key : res.filterFailedKeys()) {
+ IgniteTxEntry txEntry = tx.entry(key);
+
+ assert txEntry != null : "Missing tx entry for write key: " + key;
+
+ txEntry.op(NOOP);
+
+ assert txEntry.context() != null;
+
+ ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry);
+
+ if (expiry != null)
+ txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
+ }
+
+ if (!m.empty()) {
+ // Register DHT version.
+ tx.addDhtVersion(m.node().id(), res.dhtVersion());
+
+ m.dhtVersion(res.dhtVersion());
+
+ if (m.near())
+ tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index c7fa4ab..797fd32 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -204,15 +204,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
* @param topVer Topology version.
* @return {@code True} if reset was done.
* @throws GridCacheEntryRemovedException If obsolete.
- * @throws IgniteCheckedException If failed.
*/
- @SuppressWarnings( {"RedundantTypeArguments"})
public boolean resetFromPrimary(CacheObject val,
GridCacheVersion ver,
GridCacheVersion dhtVer,
UUID primaryNodeId,
AffinityTopologyVersion topVer)
- throws GridCacheEntryRemovedException, IgniteCheckedException
+ throws GridCacheEntryRemovedException
{
assert dhtVer != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index a427b65..25bd76b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1450,11 +1450,6 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
// Replace old entry with new one.
entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
}
- catch (IgniteCheckedException e) {
- onDone(e);
-
- return;
- }
}
i++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
new file mode 100644
index 0000000..2fbca7b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.transactions.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
+import static org.apache.ignite.transactions.TransactionState.*;
+
+/**
+ *
+ */
+public class GridNearOptimisticTxPrepareFuture extends GridAbstractNearTxPrepareFuture
+ implements GridCacheMvccFuture<IgniteInternalTx> {
+ /** */
+ private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ */
+ public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+ super(cctx, tx);
+
+ assert tx.optimistic() : tx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
+ if (log.isDebugEnabled())
+ log.debug("Transaction future received owner changed callback: " + entry);
+
+ if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) {
+ lockKeys.remove(entry.txKey());
+
+ // This will check for locks.
+ onDone();
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends ClusterNode> nodes() {
+ return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
+ @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
+ if (isMini(f))
+ return ((MiniFuture)f).node();
+
+ return cctx.discovery().localNode();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ boolean found = false;
+
+ for (IgniteInternalFuture<?> fut : futures()) {
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture) fut;
+
+ if (f.node().id().equals(nodeId)) {
+ f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+
+ found = true;
+ }
+ }
+ }
+
+ return found;
+ }
+
+ /**
+ * @param nodeId Failed node ID.
+ * @param mappings Remaining mappings.
+ * @param e Error.
+ */
+ void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) {
+ if (err.compareAndSet(null, e)) {
+ boolean marked = tx.setRollbackOnly();
+
+ if (e instanceof IgniteTxOptimisticCheckedException) {
+ assert nodeId != null : "Missing node ID for optimistic failure exception: " + e;
+
+ tx.removeKeysMapping(nodeId, mappings);
+ }
+
+ if (e instanceof IgniteTxRollbackCheckedException) {
+ if (marked) {
+ try {
+ tx.rollback();
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
+ }
+ }
+ }
+
+ onComplete();
+ }
+ }
+
+ /**
+ * @return {@code True} if all locks are owned.
+ */
+ private boolean checkLocks() {
+ boolean locked = lockKeys.isEmpty();
+
+ if (locked) {
+ if (log.isDebugEnabled())
+ log.debug("All locks are acquired for near prepare future: " + this);
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
+ }
+
+ return locked;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ if (!isDone()) {
+ for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+ if (isMini(fut)) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.node().id().equals(nodeId);
+
+ f.onResult(nodeId, res);
+ }
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
+ // If locks were not acquired yet, delay completion.
+ if (isDone() || (err == null && !checkLocks()))
+ return false;
+
+ this.err.compareAndSet(null, err);
+
+ if (err == null)
+ tx.state(PREPARED);
+
+ if (super.onDone(tx, err)) {
+ // Don't forget to clean up.
+ cctx.mvcc().removeFuture(this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param f Future.
+ * @return {@code True} if mini-future.
+ */
+ private boolean isMini(IgniteInternalFuture<?> f) {
+ return f.getClass().equals(MiniFuture.class);
+ }
+
+ /**
+ * Completeness callback.
+ */
+ private void onComplete() {
+ if (super.onDone(tx, err.get()))
+ // Don't forget to clean up.
+ cctx.mvcc().removeFuture(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepare() {
+ // Obtain the topology version to use.
+ AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+
+ if (topVer != null) {
+ tx.topologyVersion(topVer);
+
+ prepare0();
+
+ return;
+ }
+
+ prepareOnTopology();
+ }
+
+ /**
+ *
+ */
+ private void prepareOnTopology() {
+ GridDhtTopologyFuture topFut = topologyReadLock();
+
+ try {
+ if (topFut == null) {
+ assert isDone();
+
+ return;
+ }
+
+ if (topFut.isDone()) {
+ StringBuilder invalidCaches = new StringBuilder();
+
+ boolean cacheInvalid = false;
+
+ for (GridCacheContext ctx : cctx.cacheContexts()) {
+ if (tx.activeCacheIds().contains(ctx.cacheId()) && !topFut.isCacheTopologyValid(ctx)) {
+ if (cacheInvalid)
+ invalidCaches.append(", ");
+
+ invalidCaches.append(U.maskName(ctx.name()));
+
+ cacheInvalid = true;
+ }
+ }
+
+ if (cacheInvalid) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
+ invalidCaches.toString()));
+
+ return;
+ }
+
+ tx.topologyVersion(topFut.topologyVersion());
+
+ prepare0();
+ }
+ else {
+ topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+ @Override public void run() {
+ prepareOnTopology();
+ }
+ });
+ }
+ });
+ }
+ }
+ finally {
+ topologyReadUnlock();
+ }
+ }
+
+ /**
+ * Acquires topology read lock.
+ *
+ * @return Topology ready future.
+ */
+ private GridDhtTopologyFuture topologyReadLock() {
+ if (tx.activeCacheIds().isEmpty())
+ return cctx.exchange().lastTopologyFuture();
+
+ GridCacheContext<?, ?> nonLocCtx = null;
+
+ for (int cacheId : tx.activeCacheIds()) {
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (!cacheCtx.isLocal()) {
+ nonLocCtx = cacheCtx;
+
+ break;
+ }
+ }
+
+ if (nonLocCtx == null)
+ return cctx.exchange().lastTopologyFuture();
+
+ nonLocCtx.topology().readLock();
+
+ if (nonLocCtx.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ nonLocCtx.name()));
+
+ return null;
+ }
+
+ return nonLocCtx.topology().topologyVersionFuture();
+ }
+
+ /**
+ * Releases topology read lock.
+ */
+ private void topologyReadUnlock() {
+ if (!tx.activeCacheIds().isEmpty()) {
+ GridCacheContext<?, ?> nonLocCtx = null;
+
+ for (int cacheId : tx.activeCacheIds()) {
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (!cacheCtx.isLocal()) {
+ nonLocCtx = cacheCtx;
+
+ break;
+ }
+ }
+
+ if (nonLocCtx != null)
+ nonLocCtx.topology().readUnlock();
+ }
+ }
+
+ /**
+ * Initializes future.
+ */
+ private void prepare0() {
+ try {
+ if (!tx.state(PREPARING)) {
+ if (tx.setRollbackOnly()) {
+ if (tx.timedOut())
+ onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
+ "was rolled back: " + this));
+ else
+ onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " +
+ "[state=" + tx.state() + ", tx=" + this + ']'));
+ }
+ else
+ onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
+ "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+
+ return;
+ }
+
+ // Make sure to add future before calling prepare.
+ cctx.mvcc().addFuture(this);
+
+ prepare(
+ tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
+ tx.writeEntries());
+
+ markInitialized();
+ }
+ catch (TransactionTimeoutException | TransactionOptimisticException e) {
+ onError(cctx.localNodeId(), null, e);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+
+ /**
+ * @param reads Read entries.
+ * @param writes Write entries.
+ * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node.
+ */
+ private void prepare(
+ Iterable<IgniteTxEntry> reads,
+ Iterable<IgniteTxEntry> writes
+ ) throws IgniteCheckedException {
+ AffinityTopologyVersion topVer = tx.topologyVersion();
+
+ assert topVer.topologyVersion() > 0;
+
+ txMapping = new GridDhtTxMapping();
+
+ ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings =
+ new ConcurrentLinkedDeque8<>();
+
+ if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
+ for (int cacheId : tx.activeCacheIds()) {
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
+ onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " +
+ "partition nodes left the grid): " + cacheCtx.name()));
+
+ return;
+ }
+ }
+ }
+
+ // Assign keys to primary nodes.
+ GridDistributedTxMapping cur = null;
+
+ for (IgniteTxEntry read : reads) {
+ GridDistributedTxMapping updated = map(read, topVer, cur, false);
+
+ if (cur != updated) {
+ mappings.offer(updated);
+
+ if (updated.node().isLocal()) {
+ if (read.context().isNear())
+ tx.nearLocallyMapped(true);
+ else if (read.context().isColocated())
+ tx.colocatedLocallyMapped(true);
+ }
+
+ cur = updated;
+ }
+ }
+
+ for (IgniteTxEntry write : writes) {
+ GridDistributedTxMapping updated = map(write, topVer, cur, true);
+
+ if (cur != updated) {
+ mappings.offer(updated);
+
+ if (updated.node().isLocal()) {
+ if (write.context().isNear())
+ tx.nearLocallyMapped(true);
+ else if (write.context().isColocated())
+ tx.colocatedLocallyMapped(true);
+ }
+
+ cur = updated;
+ }
+ }
+
+ if (isDone()) {
+ if (log.isDebugEnabled())
+ log.debug("Abandoning (re)map because future is done: " + this);
+
+ return;
+ }
+
+ tx.addEntryMapping(mappings);
+
+ cctx.mvcc().recheckPendingLocks();
+
+ txMapping.initLast(mappings);
+
+ tx.transactionNodes(txMapping.transactionNodes());
+
+ checkOnePhase();
+
+ proceedPrepare(mappings);
+ }
+
+ /**
+ * Continues prepare after previous mapping successfully finished.
+ *
+ * @param mappings Queue of mappings.
+ */
+ private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) {
+ if (isDone())
+ return;
+
+ final GridDistributedTxMapping m = mappings.poll();
+
+ if (m == null)
+ return;
+
+ assert !m.empty();
+
+ final ClusterNode n = m.node();
+
+ GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+ futId,
+ tx.topologyVersion(),
+ tx,
+ tx.optimistic() && tx.serializable() ? m.reads() : null,
+ m.writes(),
+ tx.groupLockKey(),
+ tx.partitionLock(),
+ m.near(),
+ txMapping.transactionNodes(),
+ m.last(),
+ m.lastBackups(),
+ tx.onePhaseCommit(),
+ tx.needReturnValue() && tx.implicit(),
+ tx.implicitSingle(),
+ m.explicitLock(),
+ tx.subjectId(),
+ tx.taskNameHash());
+
+ for (IgniteTxEntry txEntry : m.writes()) {
+ if (txEntry.op() == TRANSFORM)
+ req.addDhtVersion(txEntry.txKey(), null);
+ }
+
+ // Must lock near entries separately.
+ if (m.near()) {
+ try {
+ tx.optimisticLockEntries(req.writes());
+
+ tx.userPrepare();
+ }
+ catch (IgniteCheckedException e) {
+ onError(null, null, e);
+ }
+ }
+
+ final MiniFuture fut = new MiniFuture(m, mappings);
+
+ req.miniId(fut.futureId());
+
+ add(fut); // Append new future.
+
+ // If this is the primary node for the keys.
+ if (n.isLocal()) {
+ // At this point, if any new node joined, then it is
+ // waiting for this transaction to complete, so
+ // partition reassignments are not possible here.
+ cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
+ @Override public void apply(GridNearTxPrepareResponse res) {
+ fut.onResult(n.id(), res);
+ }
+ });
+ }
+ else {
+ assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx +
+ ", nodeId=" + n.id() + ']';
+
+ try {
+ cctx.io().send(n, req, tx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ // Fail the whole thing.
+ fut.onResult(e);
+ }
+ }
+ }
+
+ /**
+ * @param entry Transaction entry.
+ * @param topVer Topology version.
+ * @param cur Current mapping.
+ * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
+ * @return Mapping.
+ */
+ private GridDistributedTxMapping map(
+ IgniteTxEntry entry,
+ AffinityTopologyVersion topVer,
+ GridDistributedTxMapping cur,
+ boolean waitLock
+ ) throws IgniteCheckedException {
+ GridCacheContext cacheCtx = entry.context();
+
+ List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+
+ txMapping.addMapping(nodes);
+
+ ClusterNode primary = F.first(nodes);
+
+ assert primary != null;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Mapped key to primary node [key=" + entry.key() +
+ ", part=" + cacheCtx.affinity().partition(entry.key()) +
+ ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
+ }
+
+ if (tx.groupLock() && !primary.isLocal())
+ throw new IgniteCheckedException("Failed to prepare group lock transaction (local node is not primary for " +
+ " key)[key=" + entry.key() + ", primaryNodeId=" + primary.id() + ']');
+
+ // Must re-initialize cached entry while holding topology lock.
+ if (cacheCtx.isNear())
+ entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
+ else if (!cacheCtx.isLocal())
+ entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
+ else
+ entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
+
+ if (cacheCtx.isNear() || cacheCtx.isLocal()) {
+ if (waitLock && entry.explicitVersion() == null) {
+ if (!tx.groupLock() || tx.groupLockKey().equals(entry.txKey()))
+ lockKeys.add(entry.txKey());
+ }
+ }
+
+ if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
+ cur = new GridDistributedTxMapping(primary);
+
+ // Initialize near flag right away.
+ cur.near(cacheCtx.isNear());
+ }
+
+ cur.add(entry);
+
+ if (entry.explicitVersion() != null) {
+ tx.markExplicit(primary.id());
+
+ cur.markExplicitLock();
+ }
+
+ entry.nodeId(primary.id());
+
+ if (cacheCtx.isNear()) {
+ while (true) {
+ try {
+ GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached();
+
+ cached.dhtNodeId(tx.xidVersion(), primary.id());
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ entry.cached(cacheCtx.near().entryEx(entry.key()));
+ }
+ }
+ }
+
+ return cur;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearOptimisticTxPrepareFuture.class, this, super.toString());
+ }
+
+ /**
+ *
+ */
+ private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** Keys. */
+ @GridToStringInclude
+ private GridDistributedTxMapping m;
+
+ /** Flag to signal some result being processed. */
+ private AtomicBoolean rcvRes = new AtomicBoolean(false);
+
+ /** Mappings to proceed prepare. */
+ private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings;
+
+ /**
+ * @param m Mapping.
+ * @param mappings Queue of mappings to proceed with.
+ */
+ MiniFuture(
+ GridDistributedTxMapping m,
+ ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings
+ ) {
+ this.m = m;
+ this.mappings = mappings;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public ClusterNode node() {
+ return m.node();
+ }
+
+ /**
+ * @return Keys.
+ */
+ public GridDistributedTxMapping mapping() {
+ return m;
+ }
+
+ /**
+ * @param e Error.
+ */
+ void onResult(Throwable e) {
+ if (rcvRes.compareAndSet(false, true)) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+ // Fail.
+ onDone(e);
+ }
+ else
+ U.warn(log, "Received error after another result has been processed [fut=" +
+ GridNearOptimisticTxPrepareFuture.this + ", mini=" + this + ']', e);
+ }
+
+ /**
+ * @param e Node failure.
+ */
+ void onResult(ClusterTopologyCheckedException e) {
+ if (isDone())
+ return;
+
+ if (rcvRes.compareAndSet(false, true)) {
+ if (log.isDebugEnabled())
+ log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);
+
+ // Fail the whole future (make sure not to remap on different primary node
+ // to prevent multiple lock coordinators).
+ onError(null, null, e);
+ }
+ }
+
+ /**
+ * @param nodeId Failed node ID.
+ * @param res Result callback.
+ */
+ void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ if (isDone())
+ return;
+
+ if (rcvRes.compareAndSet(false, true)) {
+ if (res.error() != null) {
+ // Fail the whole compound future.
+ onError(nodeId, mappings, res.error());
+ }
+ else {
+ onPrepareResponse(m, res);
+
+ // Proceed prepare before finishing mini future.
+ if (mappings != null)
+ proceedPrepare(mappings);
+
+ // Finish this mini future.
+ onDone(tx);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
new file mode 100644
index 0000000..84a4ab8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.transactions.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
+import static org.apache.ignite.transactions.TransactionState.*;
+
+/**
+ *
+ */
+public class GridNearPessimisticTxPrepareFuture extends GridAbstractNearTxPrepareFuture {
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ */
+ public GridNearPessimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+ super(cctx, tx);
+
+ assert tx.pessimistic() : tx;
+
+ // Should wait for all mini futures completion before finishing tx.
+ ignoreChildFailures(IgniteCheckedException.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends ClusterNode> nodes() {
+ return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
+ @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
+ return ((MiniFuture)f).node();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ boolean found = false;
+
+ for (IgniteInternalFuture<?> fut : futures()) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.node().id().equals(nodeId)) {
+ f.onError(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+
+ found = true;
+ }
+ }
+
+ return found;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+ if (!isDone()) {
+ for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+ MiniFuture f = (MiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId())) {
+ assert f.node().id().equals(nodeId);
+
+ if (log.isDebugEnabled())
+ log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f);
+
+ f.onResult(res);
+ }
+ }
+ }
+ }
+ /** {@inheritDoc} */
+ @Override public void prepare() {
+ if (!tx.state(PREPARING)) {
+ if (tx.setRollbackOnly()) {
+ if (tx.timedOut())
+ onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx));
+ else
+ onDone(new IgniteCheckedException("Invalid transaction state for prepare " +
+ "[state=" + tx.state() + ", tx=" + this + ']'));
+ }
+ else
+ onDone(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " +
+ "[state=" + tx.state() + ", tx=" + this + ']'));
+
+ return;
+ }
+
+ try {
+ tx.userPrepare();
+
+ cctx.mvcc().addFuture(this);
+
+ preparePessimistic();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+
+ /**
+ *
+ */
+ private void preparePessimistic() {
+ Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+
+ AffinityTopologyVersion topVer = tx.topologyVersion();
+
+ txMapping = new GridDhtTxMapping();
+
+ for (IgniteTxEntry txEntry : tx.allEntries()) {
+ GridCacheContext cacheCtx = txEntry.context();
+
+ List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer);
+
+ ClusterNode primary = F.first(nodes);
+
+ boolean near = cacheCtx.isNear();
+
+ IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near);
+
+ GridDistributedTxMapping nodeMapping = mappings.get(key);
+
+ if (nodeMapping == null) {
+ nodeMapping = new GridDistributedTxMapping(primary);
+
+ nodeMapping.near(cacheCtx.isNear());
+
+ mappings.put(key, nodeMapping);
+ }
+
+ txEntry.nodeId(primary.id());
+
+ nodeMapping.add(txEntry);
+
+ txMapping.addMapping(nodes);
+ }
+
+ tx.transactionNodes(txMapping.transactionNodes());
+
+ checkOnePhase();
+
+ for (final GridDistributedTxMapping m : mappings.values()) {
+ final ClusterNode node = m.node();
+
+ GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+ futId,
+ tx.topologyVersion(),
+ tx,
+ m.reads(),
+ m.writes(),
+ /*grp lock key*/null,
+ /*part lock*/false,
+ m.near(),
+ txMapping.transactionNodes(),
+ true,
+ txMapping.transactionNodes().get(node.id()),
+ tx.onePhaseCommit(),
+ tx.needReturnValue() && tx.implicit(),
+ tx.implicitSingle(),
+ m.explicitLock(),
+ tx.subjectId(),
+ tx.taskNameHash());
+
+ for (IgniteTxEntry txEntry : m.writes()) {
+ if (txEntry.op() == TRANSFORM)
+ req.addDhtVersion(txEntry.txKey(), null);
+ }
+
+ final MiniFuture fut = new MiniFuture(m);
+
+ req.miniId(fut.futureId());
+
+ add(fut);
+
+ if (node.isLocal()) {
+ cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
+ @Override public void apply(GridNearTxPrepareResponse res) {
+ fut.onResult(res);
+ }
+ });
+ }
+ else {
+ try {
+ cctx.io().send(node, req, tx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ // Fail the whole thing.
+ fut.onError(e);
+ }
+ }
+ }
+
+ markInitialized();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable IgniteInternalTx res, @Nullable Throwable err) {
+ if (err != null)
+ this.err.compareAndSet(null, err);
+
+ err = this.err.get();
+
+ if (err == null)
+ tx.state(PREPARED);
+
+ if (super.onDone(res, err)) {
+ cctx.mvcc().removeFuture(this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearPessimisticTxPrepareFuture.class, this, super.toString());
+ }
+
+ /**
+ *
+ */
+ private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /** */
+ private GridDistributedTxMapping m;
+
+ /**
+ * @param m Mapping.
+ */
+ MiniFuture(GridDistributedTxMapping m) {
+ this.m = m;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public ClusterNode node() {
+ return m.node();
+ }
+
+ /**
+ * @param res Response.
+ */
+ void onResult(GridNearTxPrepareResponse res) {
+ if (res.error() != null)
+ onError(res.error());
+ else {
+ onPrepareResponse(m, res);
+
+ onDone(tx);
+ }
+ }
+
+ /**
+ * @param e Error.
+ */
+ void onError(Throwable e) {
+ err.compareAndSet(null, e);
+
+ onDone(e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c665354..f7a43bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -683,11 +683,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
- GridNearTxPrepareFuture fut = (GridNearTxPrepareFuture)prepFut.get();
+ GridAbstractNearTxPrepareFuture fut = (GridAbstractNearTxPrepareFuture)prepFut.get();
if (fut == null) {
// Future must be created before any exception can be thrown.
- fut = new GridNearTxPrepareFuture<>(cctx, this);
+ fut = optimistic() ? new GridNearOptimisticTxPrepareFuture(cctx, this) :
+ new GridNearPessimisticTxPrepareFuture(cctx, this);
if (!prepFut.compareAndSet(null, fut))
return prepFut.get();
@@ -698,41 +699,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
mapExplicitLocks();
- // For pessimistic mode we don't distribute prepare request and do not lock topology version
- // as it was fixed on first lock.
- if (pessimistic()) {
- if (!state(PREPARING)) {
- if (setRollbackOnly()) {
- if (timedOut())
- fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was " +
- "rolled back: " + this));
- else
- fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" +
- state() + ", tx=" + this + ']'));
- }
- else
- fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " +
- "[state=" + state() + ", tx=" + this + ']'));
-
- return fut;
- }
-
- try {
- userPrepare();
-
- // Make sure to add future before calling prepare.
- cctx.mvcc().addFuture(fut);
-
- fut.prepare();
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
- }
- }
- else {
- // In optimistic mode we must wait for topology map update.
- fut.prepare();
- }
+ fut.prepare();
return fut;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index f573187..962d973 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -79,7 +79,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
private boolean trackable = true;
/** Full information about transaction nodes mapping. */
- private GridDhtTxMapping<K, V> txMapping;
+ private GridDhtTxMapping txMapping;
/** */
private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
@@ -497,7 +497,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
assert topVer.topologyVersion() > 0;
- txMapping = new GridDhtTxMapping<>();
+ txMapping = new GridDhtTxMapping();
ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings =
new ConcurrentLinkedDeque8<>();
@@ -580,7 +580,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
AffinityTopologyVersion topVer = tx.topologyVersion();
- txMapping = new GridDhtTxMapping<>();
+ txMapping = new GridDhtTxMapping();
for (IgniteTxEntry txEntry : tx.allEntries()) {
GridCacheContext cacheCtx = txEntry.context();
@@ -996,12 +996,6 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
catch (GridCacheEntryRemovedException ignored) {
// Retry.
}
- catch (IgniteCheckedException e) {
- // Fail the whole compound future.
- onError(nodeId, mappings, e);
-
- return;
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index af75fb8..d98b4ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -362,7 +362,7 @@ public class IgniteTxHandler {
* @param res Response.
*/
private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) {
- GridNearTxPrepareFuture fut = (GridNearTxPrepareFuture)ctx.mvcc()
+ GridAbstractNearTxPrepareFuture fut = (GridAbstractNearTxPrepareFuture)ctx.mvcc()
.<IgniteInternalTx>future(res.version(), res.futureId());
if (fut == null) {
[19/21] incubator-ignite git commit: ignite-676: Fix ignite Start
Nodes on public TC
Posted by vo...@apache.org.
ignite-676: Fix ignite Start Nodes on public TC
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3861fbda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3861fbda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3861fbda
Branch: refs/heads/ignite-gg-9614
Commit: 3861fbda0e54be505edc6dc5993edc4d70f16d59
Parents: f027ac5
Author: Artem Shutak <as...@gridgain.com>
Authored: Tue May 12 20:32:46 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Tue May 12 20:32:46 2015 +0300
----------------------------------------------------------------------
bin/include/functions.sh | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3861fbda/bin/include/functions.sh
----------------------------------------------------------------------
diff --git a/bin/include/functions.sh b/bin/include/functions.sh
index f0430c1..b18b150 100755
--- a/bin/include/functions.sh
+++ b/bin/include/functions.sh
@@ -34,7 +34,7 @@
checkJava() {
# Check JAVA_HOME.
if [ "$JAVA_HOME" = "" ]; then
- JAVA=`which java`
+ JAVA=`type -p java`
RETCODE=$?
if [ $RETCODE -ne 0 ]; then
[06/21] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-sprint-4' into ignite-157
Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-4' into ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3ddbff92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3ddbff92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3ddbff92
Branch: refs/heads/ignite-gg-9614
Commit: 3ddbff9233f8a6e51e3192287881e189b4d97c56
Parents: a2fb8f6 0c13a08
Author: sboikov <se...@inria.fr>
Authored: Thu May 7 22:24:00 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Thu May 7 22:24:00 2015 +0300
----------------------------------------------------------------------
examples/pom.xml | 2 +-
modules/aop/pom.xml | 2 +-
modules/aws/pom.xml | 2 +-
modules/clients/pom.xml | 2 +-
modules/cloud/pom.xml | 2 +-
modules/codegen/pom.xml | 2 +-
modules/core/pom.xml | 2 +-
modules/extdata/p2p/pom.xml | 2 +-
modules/extdata/uri/pom.xml | 2 +-
modules/gce/pom.xml | 2 +-
modules/geospatial/pom.xml | 2 +-
modules/hadoop/pom.xml | 2 +-
modules/hibernate/pom.xml | 2 +-
modules/indexing/pom.xml | 2 +-
modules/jcl/pom.xml | 2 +-
modules/jta/pom.xml | 2 +-
modules/log4j/pom.xml | 2 +-
modules/rest-http/pom.xml | 2 +-
modules/scalar/pom.xml | 2 +-
modules/schedule/pom.xml | 2 +-
modules/schema-import/pom.xml | 2 +-
modules/slf4j/pom.xml | 2 +-
modules/spring/pom.xml | 2 +-
modules/ssh/pom.xml | 2 +-
modules/tools/pom.xml | 2 +-
modules/urideploy/pom.xml | 2 +-
modules/visor-console/pom.xml | 2 +-
modules/visor-plugins/pom.xml | 2 +-
modules/web/pom.xml | 2 +-
modules/yardstick/pom.xml | 2 +-
pom.xml | 14 ++++----------
31 files changed, 34 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
[10/21] incubator-ignite git commit: # ignite-157
Posted by vo...@apache.org.
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/46dda3dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/46dda3dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/46dda3dd
Branch: refs/heads/ignite-gg-9614
Commit: 46dda3dd4c893c811f51e3e491e459422c0ddf06
Parents: 41d1a14
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 8 12:12:30 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 8 12:15:51 2015 +0300
----------------------------------------------------------------------
.../GridCacheAbstractNodeRestartSelfTest.java | 11 +++++++----
.../GridCacheAtomicReplicatedFailoverSelfTest.java | 6 ++++++
.../replicated/GridCacheReplicatedFailoverSelfTest.java | 6 ++++++
.../ignite/testsuites/IgniteCacheRestartTestSuite.java | 4 ++--
4 files changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46dda3dd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
index 85e2c7c..76020b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
@@ -106,6 +105,10 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
disco.setIpFinder(ipFinder);
+ disco.setSocketTimeout(10_000);
+ disco.setAckTimeout(10_000);
+ disco.setNetworkTimeout(10_000);
+
c.setDiscoverySpi(disco);
return c;
@@ -512,7 +515,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
try {
cache.put(key, Integer.toString(key));
}
- catch (TransactionRollbackException | ClusterTopologyException | CacheException ignored) {
+ catch (IgniteException | CacheException ignored) {
// It is ok if primary node leaves grid.
}
@@ -668,7 +671,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
tx.commit();
}
}
- catch (ClusterTopologyException | CacheException ignored) {
+ catch (IgniteException | CacheException ignored) {
// It is ok if primary node leaves grid.
}
@@ -814,7 +817,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
tx.commit();
}
- catch (ClusterTopologyException | CacheException ignored) {
+ catch (IgniteException | CacheException ignored) {
// It is ok if primary node leaves grid.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46dda3dd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
index 0a2781b..1e57c09 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
import static org.apache.ignite.cache.CacheMode.*;
@@ -29,4 +30,9 @@ public class GridCacheAtomicReplicatedFailoverSelfTest extends GridCacheAtomicFa
@Override protected CacheMode cacheMode() {
return REPLICATED;
}
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46dda3dd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFailoverSelfTest.java
index 326f57d..3461dd4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFailoverSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.replicated;
import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.processors.cache.*;
import static org.apache.ignite.cache.CacheMode.*;
@@ -30,4 +31,9 @@ public class GridCacheReplicatedFailoverSelfTest extends GridCacheAbstractFailov
@Override protected CacheMode cacheMode() {
return REPLICATED;
}
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/46dda3dd/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index a8f15dc..e5372e8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -35,9 +35,9 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
suite.addTestSuite(GridCachePartitionedTxSalvageSelfTest.class);
+ suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
+ suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
// TODO IGNITE-157.
- //suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
- //suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
// suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicNodeRestartTest.class);
[17/21] incubator-ignite git commit: # ignite-157
Posted by vo...@apache.org.
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/250dd8ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/250dd8ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/250dd8ea
Branch: refs/heads/ignite-gg-9614
Commit: 250dd8ea6ec9c3298eed28f0f2d5c9e40508c8f9
Parents: d245dc8
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 8 15:35:40 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 8 15:35:40 2015 +0300
----------------------------------------------------------------------
.../processors/cache/distributed/dht/GridDhtTxLocalAdapter.java | 2 +-
.../processors/cache/GridCacheAbstractFailoverSelfTest.java | 1 -
2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/250dd8ea/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index d886989..444085f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -887,7 +887,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture fut);
/**
- * @return {@code True} if transaction if finished on prepare step.
+ * @return {@code True} if transaction is finished on prepare step.
*/
protected final boolean commitOnPrepare() {
return onePhaseCommit() && !near();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/250dd8ea/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index 6f6355a..5d9ad35 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -31,7 +31,6 @@ import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import javax.cache.*;
-import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.cache.CacheRebalanceMode.*;
[15/21] incubator-ignite git commit: # ignite-157
Posted by vo...@apache.org.
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9c5fbec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9c5fbec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9c5fbec
Branch: refs/heads/ignite-gg-9614
Commit: f9c5fbec3e4cf36c720320e4ee7ef0b91d9f014b
Parents: 50ec7f3
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 8 15:20:27 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 8 15:20:27 2015 +0300
----------------------------------------------------------------------
.../ignite/testsuites/IgniteCacheFailoverTestSuite.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9c5fbec/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index 2cc6a5a..dd3ce27 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -61,9 +61,10 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheTxNearDisabledPutGetRestartTest.class);
suite.addTestSuite(IgniteCacheTxNearDisabledFairAffinityPutGetRestartTest.class);
- suite.addTestSuite(GridCachePartitionedFailoverSelfTest.class);
- suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class);
- suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class);
+ // TODO IGNITE-882.
+ //suite.addTestSuite(GridCachePartitionedFailoverSelfTest.class);
+ //suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class);
+ //suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class);
return suite;
}
[03/21] incubator-ignite git commit: # ignite-157
Posted by vo...@apache.org.
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dff3fc68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dff3fc68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dff3fc68
Branch: refs/heads/ignite-gg-9614
Commit: dff3fc683214f8036e8e0db8d0ed497a4fe729e4
Parents: a238ce3
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 7 17:01:32 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 7 17:01:32 2015 +0300
----------------------------------------------------------------------
.../near/GridNearTxPrepareFuture.java | 1044 ------------------
.../cache/GridCacheAbstractFullApiSelfTest.java | 2 +-
2 files changed, 1 insertion(+), 1045 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dff3fc68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
deleted file mode 100644
index 9cf4aca..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ /dev/null
@@ -1,1044 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.transactions.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.jetbrains.annotations.*;
-import org.jsr166.*;
-
-import javax.cache.expiry.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
-import static org.apache.ignite.transactions.TransactionState.*;
-
-/**
- *
- */
-public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
- implements GridCacheMvccFuture<IgniteInternalTx> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Logger reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- private static IgniteLogger log;
-
- /** Context. */
- private GridCacheSharedContext<K, V> cctx;
-
- /** Future ID. */
- private IgniteUuid futId;
-
- /** Transaction. */
- @GridToStringInclude
- private GridNearTxLocal tx;
-
- /** Error. */
- @GridToStringExclude
- private AtomicReference<Throwable> err = new AtomicReference<>(null);
-
- /** Trackable flag. */
- private boolean trackable = true;
-
- /** Full information about transaction nodes mapping. */
- private GridDhtTxMapping txMapping;
-
- /** */
- private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
-
- /**
- * @param cctx Context.
- * @param tx Transaction.
- */
- public GridNearTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridNearTxLocal tx) {
- super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
- @Override public boolean collect(IgniteInternalTx e) {
- return true;
- }
-
- @Override public IgniteInternalTx reduce() {
- // Nothing to aggregate.
- return tx;
- }
- });
-
- assert cctx != null;
- assert tx != null;
-
- this.cctx = cctx;
- this.tx = tx;
-
- futId = IgniteUuid.randomUuid();
-
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridNearTxPrepareFuture.class);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return tx.xidVersion();
- }
-
- /** {@inheritDoc} */
- @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
- if (log.isDebugEnabled())
- log.debug("Transaction future received owner changed callback: " + entry);
-
- if (tx.optimistic()) {
- if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) {
- lockKeys.remove(entry.txKey());
-
- // This will check for locks.
- onDone();
-
- return true;
- }
- }
-
- return false;
- }
-
- /**
- * @return Involved nodes.
- */
- @Override public Collection<? extends ClusterNode> nodes() {
- return
- F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return trackable;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- trackable = false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- boolean found = false;
-
- for (IgniteInternalFuture<?> fut : futures())
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.node().id().equals(nodeId)) {
- f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
-
- found = true;
- }
- }
-
- return found;
- }
-
- /**
- * @param nodeId Failed node ID.
- * @param mappings Remaining mappings.
- * @param e Error.
- */
- void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) {
- if (err.compareAndSet(null, e)) {
- boolean marked = tx.setRollbackOnly();
-
- if (e instanceof IgniteTxOptimisticCheckedException) {
- assert nodeId != null : "Missing node ID for optimistic failure exception: " + e;
-
- tx.removeKeysMapping(nodeId, mappings);
- }
-
- if (e instanceof IgniteTxRollbackCheckedException) {
- if (marked) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
- }
- }
- }
-
- onComplete();
- }
- }
-
- /**
- * @return {@code True} if all locks are owned.
- */
- private boolean checkLocks() {
- boolean locked = lockKeys.isEmpty();
-
- if (locked) {
- if (log.isDebugEnabled())
- log.debug("All locks are acquired for near prepare future: " + this);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
- }
-
- return locked;
- }
-
- /**
- * @param e Error.
- */
- void onError(Throwable e) {
- onError(null, null, e);
- }
-
- /**
- * @param nodeId Sender.
- * @param res Result.
- */
- public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
- if (!isDone()) {
- for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
-
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
-
- f.onResult(nodeId, res);
- }
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
- // If locks were not acquired yet, delay completion.
- if (isDone() || (err == null && !checkLocks()))
- return false;
-
- this.err.compareAndSet(null, err);
-
- if (err == null)
- tx.state(PREPARED);
-
- if (super.onDone(tx, err)) {
- // Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
-
- return true;
- }
-
- return false;
- }
-
- /**
- * @param f Future.
- * @return {@code True} if mini-future.
- */
- private boolean isMini(IgniteInternalFuture<?> f) {
- return f.getClass().equals(MiniFuture.class);
- }
-
- /**
- * Completeness callback.
- */
- private void onComplete() {
- if (super.onDone(tx, err.get()))
- // Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
- }
-
- /**
- * Completes this future.
- */
- void complete() {
- onComplete();
- }
-
- /**
- * Waits for topology exchange future to be ready and then prepares user transaction.
- */
- public void prepare() {
- if (tx.optimistic()) {
- // Obtain the topology version to use.
- AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
-
- if (topVer != null) {
- tx.topologyVersion(topVer);
-
- prepare0();
-
- return;
- }
-
- prepareOnTopology();
-
- }
- else
- preparePessimistic();
- }
-
- /**
- *
- */
- private void prepareOnTopology() {
- GridDhtTopologyFuture topFut = topologyReadLock();
-
- try {
- if (topFut == null) {
- assert isDone();
-
- return;
- }
-
- if (topFut.isDone()) {
- StringBuilder invalidCaches = new StringBuilder();
- Boolean cacheInvalid = false;
- for (GridCacheContext ctx : cctx.cacheContexts()) {
- if (tx.activeCacheIds().contains(ctx.cacheId()) && !topFut.isCacheTopologyValid(ctx)) {
- if (cacheInvalid)
- invalidCaches.append(", ");
-
- invalidCaches.append(U.maskName(ctx.name()));
-
- cacheInvalid = true;
- }
- }
-
- if (cacheInvalid) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- invalidCaches.toString()));
-
- return;
- }
-
- tx.topologyVersion(topFut.topologyVersion());
-
- prepare0();
- }
- else {
- topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override
- public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
- @Override
- public void run() {
- prepareOnTopology();
- }
- });
- }
- });
- }
- }
- finally {
- topologyReadUnlock();
- }
- }
-
- /**
- * Acquires topology read lock.
- *
- * @return Topology ready future.
- */
- private GridDhtTopologyFuture topologyReadLock() {
- if (tx.activeCacheIds().isEmpty())
- return cctx.exchange().lastTopologyFuture();
-
- GridCacheContext<K, V> nonLocCtx = null;
-
- for (int cacheId : tx.activeCacheIds()) {
- GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
- if (!cacheCtx.isLocal()) {
- nonLocCtx = cacheCtx;
-
- break;
- }
- }
-
- if (nonLocCtx == null)
- return cctx.exchange().lastTopologyFuture();
-
- nonLocCtx.topology().readLock();
-
- if (nonLocCtx.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- nonLocCtx.name()));
-
- return null;
- }
-
- return nonLocCtx.topology().topologyVersionFuture();
- }
-
- /**
- * Releases topology read lock.
- */
- private void topologyReadUnlock() {
- if (!tx.activeCacheIds().isEmpty()) {
- GridCacheContext<K, V> nonLocalCtx = null;
-
- for (int cacheId : tx.activeCacheIds()) {
- GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
- if (!cacheCtx.isLocal()) {
- nonLocalCtx = cacheCtx;
-
- break;
- }
- }
-
- if (nonLocalCtx != null)
- nonLocalCtx.topology().readUnlock();
- }
- }
-
- /**
- * Initializes future.
- */
- private void prepare0() {
- assert tx.optimistic();
-
- try {
- if (!tx.state(PREPARING)) {
- if (tx.setRollbackOnly()) {
- if (tx.timedOut())
- onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
- "was rolled back: " + this));
- else
- onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " +
- "[state=" + tx.state() + ", tx=" + this + ']'));
- }
- else
- onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
- "prepare [state=" + tx.state() + ", tx=" + this + ']'));
-
- return;
- }
-
- // Make sure to add future before calling prepare.
- cctx.mvcc().addFuture(this);
-
- prepare(
- tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
- tx.writeEntries());
-
- markInitialized();
- }
- catch (TransactionTimeoutException | TransactionOptimisticException e) {
- onError(cctx.localNodeId(), null, e);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- }
-
- /**
- * @param reads Read entries.
- * @param writes Write entries.
- * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node.
- */
- private void prepare(
- Iterable<IgniteTxEntry> reads,
- Iterable<IgniteTxEntry> writes
- ) throws IgniteCheckedException {
- assert tx.optimistic();
-
- AffinityTopologyVersion topVer = tx.topologyVersion();
-
- assert topVer.topologyVersion() > 0;
-
- txMapping = new GridDhtTxMapping();
-
- ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings =
- new ConcurrentLinkedDeque8<>();
-
- if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
- for (int cacheId : tx.activeCacheIds()) {
- GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
- if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
- onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " +
- "partition nodes left the grid): " + cacheCtx.name()));
-
- return;
- }
- }
- }
-
- // Assign keys to primary nodes.
- GridDistributedTxMapping cur = null;
-
- for (IgniteTxEntry read : reads) {
- GridDistributedTxMapping updated = map(read, topVer, cur, false);
-
- if (cur != updated) {
- mappings.offer(updated);
-
- if (updated.node().isLocal()) {
- if (read.context().isNear())
- tx.nearLocallyMapped(true);
- else if (read.context().isColocated())
- tx.colocatedLocallyMapped(true);
- }
-
- cur = updated;
- }
- }
-
- for (IgniteTxEntry write : writes) {
- GridDistributedTxMapping updated = map(write, topVer, cur, true);
-
- if (cur != updated) {
- mappings.offer(updated);
-
- if (updated.node().isLocal()) {
- if (write.context().isNear())
- tx.nearLocallyMapped(true);
- else if (write.context().isColocated())
- tx.colocatedLocallyMapped(true);
- }
-
- cur = updated;
- }
- }
-
- if (isDone()) {
- if (log.isDebugEnabled())
- log.debug("Abandoning (re)map because future is done: " + this);
-
- return;
- }
-
- tx.addEntryMapping(mappings);
-
- cctx.mvcc().recheckPendingLocks();
-
- txMapping.initLast(mappings);
-
- tx.transactionNodes(txMapping.transactionNodes());
-
- checkOnePhase();
-
- proceedPrepare(mappings);
- }
-
- /**
- *
- */
- private void preparePessimistic() {
- Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
-
- AffinityTopologyVersion topVer = tx.topologyVersion();
-
- txMapping = new GridDhtTxMapping();
-
- for (IgniteTxEntry txEntry : tx.allEntries()) {
- GridCacheContext cacheCtx = txEntry.context();
-
- List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer);
-
- ClusterNode primary = F.first(nodes);
-
- boolean near = cacheCtx.isNear();
-
- IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, near);
-
- GridDistributedTxMapping nodeMapping = mappings.get(key);
-
- if (nodeMapping == null) {
- nodeMapping = new GridDistributedTxMapping(primary);
-
- nodeMapping.near(cacheCtx.isNear());
-
- mappings.put(key, nodeMapping);
- }
-
- txEntry.nodeId(primary.id());
-
- nodeMapping.add(txEntry);
-
- txMapping.addMapping(nodes);
- }
-
- tx.transactionNodes(txMapping.transactionNodes());
-
- checkOnePhase();
-
- for (final GridDistributedTxMapping m : mappings.values()) {
- final ClusterNode node = m.node();
-
- GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
- futId,
- tx.topologyVersion(),
- tx,
- m.reads(),
- m.writes(),
- /*grp lock key*/null,
- /*part lock*/false,
- m.near(),
- txMapping.transactionNodes(),
- true,
- txMapping.transactionNodes().get(node.id()),
- tx.onePhaseCommit(),
- tx.needReturnValue() && tx.implicit(),
- tx.implicitSingle(),
- m.explicitLock(),
- tx.subjectId(),
- tx.taskNameHash());
-
- for (IgniteTxEntry txEntry : m.writes()) {
- if (txEntry.op() == TRANSFORM)
- req.addDhtVersion(txEntry.txKey(), null);
- }
-
- final MiniFuture fut = new MiniFuture(m, null);
-
- req.miniId(fut.futureId());
-
- add(fut);
-
- if (node.isLocal()) {
-// cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
-// @Override public void apply(GridNearTxPrepareResponse res) {
-// fut.onResult(node.id(), res);
-// }
-// });
- }
- else {
- try {
- cctx.io().send(node, req, tx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- // Fail the whole thing.
- fut.onResult(e);
- }
- }
- }
-
- markInitialized();
- }
-
- /**
- * Checks if mapped transaction can be committed on one phase.
- * One-phase commit can be done if transaction maps to one primary node and not more than one backup.
- */
- private void checkOnePhase() {
- if (tx.storeUsed())
- return;
-
- Map<UUID, Collection<UUID>> map = txMapping.transactionNodes();
-
- if (map.size() == 1) {
- Map.Entry<UUID, Collection<UUID>> entry = F.firstEntry(map);
-
- assert entry != null;
-
- Collection<UUID> backups = entry.getValue();
-
- if (backups.size() <= 1)
- tx.onePhaseCommit(true);
- }
- }
-
- /**
- * Continues prepare after previous mapping successfully finished.
- *
- * @param mappings Queue of mappings.
- */
- private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) {
- if (isDone())
- return;
-
- final GridDistributedTxMapping m = mappings.poll();
-
- if (m == null)
- return;
-
- assert !m.empty();
-
- final ClusterNode n = m.node();
-
- GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
- futId,
- tx.topologyVersion(),
- tx,
- tx.optimistic() && tx.serializable() ? m.reads() : null,
- m.writes(),
- tx.groupLockKey(),
- tx.partitionLock(),
- m.near(),
- txMapping.transactionNodes(),
- m.last(),
- m.lastBackups(),
- tx.onePhaseCommit(),
- tx.needReturnValue() && tx.implicit(),
- tx.implicitSingle(),
- m.explicitLock(),
- tx.subjectId(),
- tx.taskNameHash());
-
- for (IgniteTxEntry txEntry : m.writes()) {
- if (txEntry.op() == TRANSFORM)
- req.addDhtVersion(txEntry.txKey(), null);
- }
-
- // Must lock near entries separately.
- if (m.near()) {
- try {
- tx.optimisticLockEntries(req.writes());
-
- tx.userPrepare();
- }
- catch (IgniteCheckedException e) {
- onError(null, null, e);
- }
- }
-
- final MiniFuture fut = new MiniFuture(m, mappings);
-
- req.miniId(fut.futureId());
-
- add(fut); // Append new future.
-
- // If this is the primary node for the keys.
- if (n.isLocal()) {
- // At this point, if any new node joined, then it is
- // waiting for this transaction to complete, so
- // partition reassignments are not possible here.
-// cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
-// @Override public void apply(GridNearTxPrepareResponse res) {
-// fut.onResult(n.id(), res);
-// }
-// });
- }
- else {
- assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx +
- ", nodeId=" + n.id() + ']';
-
- try {
- cctx.io().send(n, req, tx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- // Fail the whole thing.
- fut.onResult(e);
- }
- }
- }
-
- /**
- * @param entry Transaction entry.
- * @param topVer Topology version.
- * @param cur Current mapping.
- * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
- * @return Mapping.
- */
- private GridDistributedTxMapping map(
- IgniteTxEntry entry,
- AffinityTopologyVersion topVer,
- GridDistributedTxMapping cur,
- boolean waitLock
- ) throws IgniteCheckedException {
- GridCacheContext cacheCtx = entry.context();
-
- List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
-
- txMapping.addMapping(nodes);
-
- ClusterNode primary = F.first(nodes);
-
- assert primary != null;
-
- if (log.isDebugEnabled()) {
- log.debug("Mapped key to primary node [key=" + entry.key() +
- ", part=" + cacheCtx.affinity().partition(entry.key()) +
- ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
- }
-
- if (tx.groupLock() && !primary.isLocal())
- throw new IgniteCheckedException("Failed to prepare group lock transaction (local node is not primary for " +
- " key)[key=" + entry.key() + ", primaryNodeId=" + primary.id() + ']');
-
- // Must re-initialize cached entry while holding topology lock.
- if (cacheCtx.isNear())
- entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
- else if (!cacheCtx.isLocal())
- entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
- else
- entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
-
- if (cacheCtx.isNear() || cacheCtx.isLocal()) {
- if (waitLock && entry.explicitVersion() == null) {
- if (!tx.groupLock() || tx.groupLockKey().equals(entry.txKey()))
- lockKeys.add(entry.txKey());
- }
- }
-
- if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
- cur = new GridDistributedTxMapping(primary);
-
- // Initialize near flag right away.
- cur.near(cacheCtx.isNear());
- }
-
- cur.add(entry);
-
- if (entry.explicitVersion() != null) {
- tx.markExplicit(primary.id());
-
- cur.markExplicitLock();
- }
-
- entry.nodeId(primary.id());
-
- if (cacheCtx.isNear()) {
- while (true) {
- try {
- GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached();
-
- cached.dhtNodeId(tx.xidVersion(), primary.id());
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- entry.cached(cacheCtx.near().entryEx(entry.key()));
- }
- }
- }
-
- return cur;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearTxPrepareFuture.class, this, super.toString());
- }
-
- /**
- * Mini-future for get operations. Mini-futures are only waiting on a single
- * node as opposed to multiple nodes.
- */
- private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** Keys. */
- @GridToStringInclude
- private GridDistributedTxMapping m;
-
- /** Flag to signal some result being processed. */
- private AtomicBoolean rcvRes = new AtomicBoolean(false);
-
- /** Mappings to proceed prepare. */
- private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings;
-
- /**
- * @param m Mapping.
- * @param mappings Queue of mappings to proceed with.
- */
- MiniFuture(
- GridDistributedTxMapping m,
- ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings
- ) {
- this.m = m;
- this.mappings = mappings;
- }
-
- /**
- * @return Future ID.
- */
- IgniteUuid futureId() {
- return futId;
- }
-
- /**
- * @return Node ID.
- */
- public ClusterNode node() {
- return m.node();
- }
-
- /**
- * @return Keys.
- */
- public GridDistributedTxMapping mapping() {
- return m;
- }
-
- /**
- * @param e Error.
- */
- void onResult(Throwable e) {
- if (rcvRes.compareAndSet(false, true)) {
- if (log.isDebugEnabled())
- log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
- // Fail.
- onDone(e);
- }
- else
- U.warn(log, "Received error after another result has been processed [fut=" +
- GridNearTxPrepareFuture.this + ", mini=" + this + ']', e);
- }
-
- /**
- * @param e Node failure.
- */
- void onResult(ClusterTopologyCheckedException e) {
- if (isDone())
- return;
-
- if (rcvRes.compareAndSet(false, true)) {
- if (log.isDebugEnabled())
- log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);
-
- // Fail the whole future (make sure not to remap on different primary node
- // to prevent multiple lock coordinators).
- onError(null, null, e);
- }
- }
-
- /**
- * @param nodeId Failed node ID.
- * @param res Result callback.
- */
- void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
- if (isDone())
- return;
-
- if (rcvRes.compareAndSet(false, true)) {
- if (res.error() != null) {
- // Fail the whole compound future.
- onError(nodeId, mappings, res.error());
- }
- else {
- assert F.isEmpty(res.invalidPartitions());
-
- for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {
- IgniteTxEntry txEntry = tx.entry(entry.getKey());
-
- assert txEntry != null;
-
- GridCacheContext cacheCtx = txEntry.context();
-
- while (true) {
- try {
- if (cacheCtx.isNear()) {
- GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached();
-
- CacheVersionedValue tup = entry.getValue();
-
- nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(),
- tup.version(), m.node().id(), tx.topologyVersion());
- }
- else if (txEntry.cached().detached()) {
- GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
-
- CacheVersionedValue tup = entry.getValue();
-
- detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion());
- }
-
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- // Retry.
- }
- }
- }
-
- tx.implicitSingleResult(res.returnValue());
-
- for (IgniteTxKey key : res.filterFailedKeys()) {
- IgniteTxEntry txEntry = tx.entry(key);
-
- assert txEntry != null : "Missing tx entry for write key: " + key;
-
- txEntry.op(NOOP);
-
- assert txEntry.context() != null;
-
- ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry);
-
- if (expiry != null)
- txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
- }
-
- if (!m.empty()) {
- // Register DHT version.
- tx.addDhtVersion(m.node().id(), res.dhtVersion());
-
- m.dhtVersion(res.dhtVersion());
-
- if (m.near())
- tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
- }
-
- // Proceed prepare before finishing mini future.
- if (mappings != null)
- proceedPrepare(mappings);
-
- // Finish this mini future.
- onDone(tx);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dff3fc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index a346b65..4dc371c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4343,7 +4343,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertEquals(val1, cacheSkipStore.invoke(key, new SetValueProcessor(val2)));
assertEquals(i, map.get(key));
- assertEquals("For key " + key, val2, cacheSkipStore.get(key));
+ assertEquals(val2, cacheSkipStore.get(key));
}
for (String key : keys) {
[14/21] incubator-ignite git commit: # ignite-157
Posted by vo...@apache.org.
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/50ec7f34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/50ec7f34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/50ec7f34
Branch: refs/heads/ignite-gg-9614
Commit: 50ec7f34c4f3b9527bb2eb068b4662042a376856
Parents: 30d306a
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 8 15:11:30 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 8 15:11:30 2015 +0300
----------------------------------------------------------------------
.../distributed/near/GridNearOptimisticTxPrepareFuture.java | 8 +++++---
.../distributed/near/GridNearPessimisticTxPrepareFuture.java | 8 +++++---
.../ignite/testsuites/IgniteCacheRestartTestSuite.java | 8 ++++----
3 files changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50ec7f34/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 1f2c439..51c7ccd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -646,14 +646,16 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
/** {@inheritDoc} */
@Override public String toString() {
- Collection<String> pendingFuts = F.viewReadOnly(pending(), new C1<IgniteInternalFuture<?>, String>() {
+ Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@Override public String apply(IgniteInternalFuture<?> f) {
- return "[node=" + ((MiniFuture)f).node().id() + ", loc=" + ((MiniFuture)f).node().isLocal() + "]";
+ return "[node=" + ((MiniFuture)f).node().id() +
+ ", loc=" + ((MiniFuture)f).node().isLocal() +
+ ", done=" + f.isDone() + "]";
}
});
return S.toString(GridNearOptimisticTxPrepareFuture.class, this,
- "pendingFuts", pendingFuts,
+ "futs", futs,
"super", super.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50ec7f34/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index ba8b92c..998df9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -255,14 +255,16 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
/** {@inheritDoc} */
@Override public String toString() {
- Collection<String> pendingFuts = F.viewReadOnly(pending(), new C1<IgniteInternalFuture<?>, String>() {
+ Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@Override public String apply(IgniteInternalFuture<?> f) {
- return "[node=" + ((MiniFuture)f).node().id() + ", loc=" + ((MiniFuture)f).node().isLocal() + "]";
+ return "[node=" + ((MiniFuture)f).node().id() +
+ ", loc=" + ((MiniFuture)f).node().isLocal() +
+ ", done=" + f.isDone() + "]";
}
});
return S.toString(GridNearPessimisticTxPrepareFuture.class, this,
- "pendingFuts", pendingFuts,
+ "futs", futs,
"super", super.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/50ec7f34/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index 983d447..24945dd 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -21,7 +21,6 @@ import junit.framework.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
/**
* In-Memory Data Grid stability test suite on changing topology.
@@ -36,9 +35,10 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
suite.addTestSuite(GridCachePartitionedTxSalvageSelfTest.class);
- suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
- suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
- suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
+ // TODO IGNITE-882.
+ //suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
+ //suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
+ //suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicNodeRestartTest.class);
// suite.addTestSuite(IgniteCacheAtomicReplicatedNodeRestartSelfTest.class); // TODO IGNITE-747
[16/21] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-5' into ignite-157
Posted by vo...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d245dc81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d245dc81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d245dc81
Branch: refs/heads/ignite-gg-9614
Commit: d245dc818d18f9474854cd9550e60efc87732ce0
Parents: f9c5fbe 2361640
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 8 15:21:01 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 8 15:21:01 2015 +0300
----------------------------------------------------------------------
examples/pom.xml | 2 +-
modules/aop/pom.xml | 2 +-
modules/aws/pom.xml | 2 +-
modules/clients/pom.xml | 2 +-
modules/cloud/pom.xml | 2 +-
.../TcpDiscoveryCloudIpFinderSelfTest.java | 2 -
modules/codegen/pom.xml | 2 +-
modules/core/pom.xml | 2 +-
.../processors/cache/GridCacheAdapter.java | 26 ++-
.../cache/GridCacheEvictionManager.java | 2 +-
.../processors/cache/GridCacheProxyImpl.java | 24 --
.../processors/cache/GridCacheSwapManager.java | 215 +++++++++++++-----
.../processors/cache/IgniteInternalCache.java | 27 ---
.../colocated/GridDhtColocatedLockFuture.java | 2 +
.../distributed/near/GridNearCacheAdapter.java | 10 -
.../processors/cache/local/GridLocalCache.java | 8 +-
.../local/atomic/GridLocalAtomicCache.java | 27 ++-
.../cache/query/GridCacheQueryManager.java | 21 +-
.../transactions/IgniteTxLocalAdapter.java | 12 +-
.../processors/igfs/IgfsDeleteWorker.java | 4 +
.../offheap/GridOffHeapProcessor.java | 17 ++
.../util/offheap/GridOffHeapPartitionedMap.java | 9 +
.../unsafe/GridUnsafePartitionedMap.java | 155 ++++++-------
.../core/src/main/resources/ignite.properties | 2 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 227 +++++++++++--------
.../cache/GridCacheAbstractSelfTest.java | 4 +-
.../cache/OffHeapTieredTransactionSelfTest.java | 127 +++++++++++
...icOffHeapTieredMultiNodeFullApiSelfTest.java | 43 ++++
...ionedNearDisabledOffHeapFullApiSelfTest.java | 8 +-
...DisabledOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...abledOffHeapTieredAtomicFullApiSelfTest.java | 56 +++++
...earDisabledOffHeapTieredFullApiSelfTest.java | 33 +++
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +++
...CacheAtomicOffHeapTieredFullApiSelfTest.java | 32 +++
...icOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +++
...yWriteOrderOffHeapTieredFullApiSelfTest.java | 33 +++
...erOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +++
...achePartitionedMultiNodeFullApiSelfTest.java | 15 +-
...dCachePartitionedOffHeapFullApiSelfTest.java | 8 +-
...titionedOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...PartitionedOffHeapTieredFullApiSelfTest.java | 32 +++
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 72 ++++++
...idCacheReplicatedOffHeapFullApiSelfTest.java | 8 +-
...plicatedOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...eReplicatedOffHeapTieredFullApiSelfTest.java | 33 +++
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +++
...LocalAtomicOffHeapTieredFullApiSelfTest.java | 32 +++
.../GridCacheLocalOffHeapFullApiSelfTest.java | 6 +-
...dCacheLocalOffHeapTieredFullApiSelfTest.java | 32 +++
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
.../IgniteCacheFullApiSelfTestSuite.java | 18 ++
.../testsuites/IgniteCacheTestSuite3.java | 3 -
modules/extdata/p2p/pom.xml | 2 +-
modules/extdata/uri/pom.xml | 2 +-
modules/gce/pom.xml | 2 +-
modules/geospatial/pom.xml | 2 +-
modules/hadoop/pom.xml | 2 +-
modules/hibernate/pom.xml | 2 +-
modules/indexing/pom.xml | 2 +-
...eQueryMultiThreadedOffHeapTiredSelfTest.java | 37 +++
.../IgniteCacheQueryMultiThreadedSelfTest.java | 29 ++-
.../IgniteCacheQuerySelfTestSuite.java | 1 +
modules/jcl/pom.xml | 2 +-
modules/jta/pom.xml | 2 +-
modules/log4j/pom.xml | 2 +-
modules/rest-http/pom.xml | 2 +-
modules/scalar/pom.xml | 2 +-
modules/schedule/pom.xml | 2 +-
modules/schema-import/pom.xml | 2 +-
modules/slf4j/pom.xml | 2 +-
modules/spring/pom.xml | 2 +-
modules/ssh/pom.xml | 2 +-
modules/tools/pom.xml | 2 +-
modules/urideploy/pom.xml | 2 +-
modules/visor-console/pom.xml | 2 +-
modules/visor-plugins/pom.xml | 2 +-
modules/web/pom.xml | 2 +-
modules/yardstick/pom.xml | 2 +-
pom.xml | 6 +-
79 files changed, 1269 insertions(+), 406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d245dc81/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d245dc81/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d245dc81/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
[12/21] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-5' into ignite-157
Posted by vo...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4dd6a63b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4dd6a63b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4dd6a63b
Branch: refs/heads/ignite-gg-9614
Commit: 4dd6a63ba0e63d9f6b72264983807ab577900b8f
Parents: c548136 17bf271
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 8 13:32:05 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 8 13:32:05 2015 +0300
----------------------------------------------------------------------
DEVNOTES.txt | 32 +-
dev-tools/.gitignore | 2 +
dev-tools/build.gradle | 45 +
dev-tools/src/main/groovy/jiraslurp.groovy | 146 +
modules/cloud/pom.xml | 2 +-
.../processors/cache/GridCacheIoManager.java | 314 +-
.../processors/cache/GridCacheMessage.java | 8 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 8 +
.../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +-
.../atomic/GridNearAtomicUpdateResponse.java | 18 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 6 +
.../dht/preloader/GridDhtForceKeysResponse.java | 54 +-
.../distributed/near/GridNearGetResponse.java | 8 +-
.../util/lang/GridFilteredIterator.java | 2 +-
.../ignite/internal/util/lang/GridFunc.java | 7218 +++++-------------
.../spi/discovery/tcp/TcpDiscoverySpi.java | 22 +-
.../cache/GridCacheSwapReloadSelfTest.java | 20 +-
.../IgniteCacheP2pUnmarshallingErrorTest.java | 189 +
...gniteCacheP2pUnmarshallingNearErrorTest.java | 56 +
...CacheP2pUnmarshallingRebalanceErrorTest.java | 80 +
.../IgniteCacheP2pUnmarshallingTxErrorTest.java | 109 +
.../cache/IgniteCachePeekModesAbstractTest.java | 15 +-
...CacheLoadingConcurrentGridStartSelfTest.java | 49 +-
...idFileSwapSpaceSpiMultithreadedLoadTest.java | 4 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 65 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
...gniteCacheP2pUnmarshallingErrorTestSuit.java | 41 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 293 -
.../testsuites/IgniteCacheTestSuite2.java | 141 +
.../testsuites/IgniteCacheTestSuite3.java | 143 +
.../testsuites/IgniteCacheTestSuite4.java | 131 +
modules/gce/pom.xml | 2 +-
.../cache/GridCacheOffHeapAndSwapSelfTest.java | 11 +-
.../cache/GridCacheOffHeapSelfTest.java | 11 +-
...niteCacheP2pUnmarshallingQueryErrorTest.java | 56 +
.../IgniteCacheQuerySelfTestSuite.java | 3 +
.../ignite/scalar/ScalarConversions.scala | 8 -
.../ignite/schema/generator/CodeGenerator.java | 6 +-
pom.xml | 22 +-
39 files changed, 3469 insertions(+), 5874 deletions(-)
----------------------------------------------------------------------
[20/21] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-157' into ignite-sprint-5
Posted by vo...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-157' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bbc21a6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bbc21a6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bbc21a6a
Branch: refs/heads/ignite-gg-9614
Commit: bbc21a6acba0193695e8b924b540bae4a647f3b8
Parents: 3861fbd ab1f9dd
Author: sboikov <sb...@gridgain.com>
Authored: Wed May 13 10:04:14 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed May 13 10:04:14 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMvccManager.java | 4 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 32 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 27 +
.../cache/distributed/dht/GridDhtTxMapping.java | 2 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 81 +-
.../colocated/GridDhtColocatedLockFuture.java | 25 +-
.../colocated/GridDhtDetachedCacheEntry.java | 4 +-
.../distributed/near/GridNearCacheEntry.java | 4 +-
.../distributed/near/GridNearLockFuture.java | 5 -
.../near/GridNearOptimisticTxPrepareFuture.java | 779 +++++++++++++
.../GridNearPessimisticTxPrepareFuture.java | 349 ++++++
.../cache/distributed/near/GridNearTxLocal.java | 84 +-
.../near/GridNearTxPrepareFuture.java | 1050 ------------------
.../near/GridNearTxPrepareFutureAdapter.java | 226 ++++
.../cache/transactions/IgniteInternalTx.java | 4 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 68 +-
.../transactions/IgniteTxLocalAdapter.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 12 +-
.../GridCacheAbstractFailoverSelfTest.java | 8 +-
.../GridCacheAbstractNodeRestartSelfTest.java | 11 +-
.../distributed/IgniteTxGetAfterStopTest.java | 131 +++
...ePrimaryNodeFailureRecoveryAbstractTest.java | 4 +-
...idCacheAtomicReplicatedFailoverSelfTest.java | 6 +
.../GridCacheReplicatedFailoverSelfTest.java | 6 +
.../GridCacheReplicatedNodeRestartSelfTest.java | 80 ++
.../IgniteCacheFailoverTestSuite.java | 10 +-
.../testsuites/IgniteCacheRestartTestSuite.java | 8 +-
.../testsuites/IgniteCacheTestSuite3.java | 2 +
29 files changed, 1759 insertions(+), 1267 deletions(-)
----------------------------------------------------------------------
[09/21] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-157' into ignite-157
Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/ignite-157' into ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/41d1a143
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/41d1a143
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/41d1a143
Branch: refs/heads/ignite-gg-9614
Commit: 41d1a143548b4ab09aa1b97bca0d7cdf9f3e4a59
Parents: b3dcbf1 fcb45bb
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 8 11:33:51 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 8 11:33:51 2015 +0300
----------------------------------------------------------------------
examples/pom.xml | 2 +-
modules/aop/pom.xml | 2 +-
modules/aws/pom.xml | 2 +-
modules/clients/pom.xml | 2 +-
modules/cloud/pom.xml | 2 +-
modules/codegen/pom.xml | 2 +-
modules/core/pom.xml | 2 +-
.../testsuites/IgniteCacheRestartTestSuite.java | 4 ++--
modules/extdata/p2p/pom.xml | 2 +-
modules/extdata/uri/pom.xml | 2 +-
modules/gce/pom.xml | 2 +-
modules/geospatial/pom.xml | 2 +-
modules/hadoop/pom.xml | 2 +-
modules/hibernate/pom.xml | 2 +-
modules/indexing/pom.xml | 2 +-
modules/jcl/pom.xml | 2 +-
modules/jta/pom.xml | 2 +-
modules/log4j/pom.xml | 2 +-
modules/rest-http/pom.xml | 2 +-
modules/scalar/pom.xml | 2 +-
modules/schedule/pom.xml | 2 +-
modules/schema-import/pom.xml | 2 +-
modules/slf4j/pom.xml | 2 +-
modules/spring/pom.xml | 2 +-
modules/ssh/pom.xml | 2 +-
modules/tools/pom.xml | 2 +-
modules/urideploy/pom.xml | 2 +-
modules/visor-console/pom.xml | 2 +-
modules/visor-plugins/pom.xml | 2 +-
modules/web/pom.xml | 2 +-
modules/yardstick/pom.xml | 2 +-
pom.xml | 14 ++++----------
32 files changed, 36 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
[05/21] incubator-ignite git commit: # ignite-157
Posted by vo...@apache.org.
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a2fb8f6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a2fb8f6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a2fb8f6e
Branch: refs/heads/ignite-gg-9614
Commit: a2fb8f6e5903356a10ec8c6e17ebeaadaafe552f
Parents: c19402e
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 7 18:01:16 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 7 18:01:16 2015 +0300
----------------------------------------------------------------------
.../GridCacheReplicatedNodeRestartSelfTest.java | 80 ++++++++++++++++++++
.../testsuites/IgniteCacheRestartTestSuite.java | 4 +-
2 files changed, 82 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2fb8f6e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
index 0023160..8ce96cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java
@@ -54,4 +54,84 @@ public class GridCacheReplicatedNodeRestartSelfTest extends GridCacheAbstractNod
return c;
}
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithPutTwoNodesNoBackups() throws Throwable {
+ super.testRestartWithPutTwoNodesNoBackups();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithPutTwoNodesOneBackup() throws Throwable {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithPutFourNodesOneBackups() throws Throwable {
+ super.testRestartWithPutFourNodesOneBackups();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithPutFourNodesNoBackups() throws Throwable {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithPutSixNodesTwoBackups() throws Throwable {
+ super.testRestartWithPutSixNodesTwoBackups();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithPutEightNodesTwoBackups() throws Throwable {
+ super.testRestartWithPutEightNodesTwoBackups();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithPutTenNodesTwoBackups() throws Throwable {
+ super.testRestartWithPutTenNodesTwoBackups();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithTxTwoNodesNoBackups() throws Throwable {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithTxTwoNodesOneBackup() throws Throwable {
+ super.testRestartWithTxTwoNodesOneBackup();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithTxFourNodesOneBackups() throws Throwable {
+ super.testRestartWithTxFourNodesOneBackups();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithTxFourNodesNoBackups() throws Throwable {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithTxSixNodesTwoBackups() throws Throwable {
+ super.testRestartWithTxSixNodesTwoBackups();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithTxEightNodesTwoBackups() throws Throwable {
+ super.testRestartWithTxEightNodesTwoBackups();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithTxTenNodesTwoBackups() throws Throwable {
+ super.testRestartWithTxTenNodesTwoBackups();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithTxPutAllTenNodesTwoBackups() throws Throwable {
+ super.testRestartWithTxPutAllTenNodesTwoBackups();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testRestartWithTxPutAllFourNodesTwoBackups() throws Throwable {
+ super.testRestartWithTxPutAllFourNodesTwoBackups();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a2fb8f6e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index 983d447..e5372e8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -21,7 +21,6 @@ import junit.framework.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
/**
* In-Memory Data Grid stability test suite on changing topology.
@@ -38,7 +37,8 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
- suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
+ // TODO IGNITE-157.
+ // suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicNodeRestartTest.class);
// suite.addTestSuite(IgniteCacheAtomicReplicatedNodeRestartSelfTest.class); // TODO IGNITE-747
[21/21] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-5' into ignite-gg-9614
Posted by vo...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-gg-9614
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/506d742c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/506d742c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/506d742c
Branch: refs/heads/ignite-gg-9614
Commit: 506d742cd41184ba98887d4948013821ed6c711d
Parents: 925d8a0 bbc21a6
Author: ptupitsyn <pt...@gridgain.com>
Authored: Wed May 13 11:42:30 2015 +0300
Committer: ptupitsyn <pt...@gridgain.com>
Committed: Wed May 13 11:42:30 2015 +0300
----------------------------------------------------------------------
bin/include/functions.sh | 2 +-
.../processors/cache/GridCacheMvccManager.java | 4 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 32 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 27 +
.../cache/distributed/dht/GridDhtTxMapping.java | 2 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 81 +-
.../colocated/GridDhtColocatedLockFuture.java | 25 +-
.../colocated/GridDhtDetachedCacheEntry.java | 4 +-
.../distributed/near/GridNearCacheEntry.java | 4 +-
.../distributed/near/GridNearLockFuture.java | 5 -
.../near/GridNearOptimisticTxPrepareFuture.java | 779 +++++++++++++
.../GridNearPessimisticTxPrepareFuture.java | 349 ++++++
.../cache/distributed/near/GridNearTxLocal.java | 84 +-
.../near/GridNearTxPrepareFuture.java | 1050 ------------------
.../near/GridNearTxPrepareFutureAdapter.java | 226 ++++
.../cache/transactions/IgniteInternalTx.java | 4 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 68 +-
.../transactions/IgniteTxLocalAdapter.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 12 +-
.../GridCacheAbstractFailoverSelfTest.java | 8 +-
.../GridCacheAbstractNodeRestartSelfTest.java | 11 +-
.../distributed/IgniteTxGetAfterStopTest.java | 131 +++
...ePrimaryNodeFailureRecoveryAbstractTest.java | 4 +-
...idCacheAtomicReplicatedFailoverSelfTest.java | 6 +
.../GridCacheReplicatedFailoverSelfTest.java | 6 +
.../GridCacheReplicatedNodeRestartSelfTest.java | 80 ++
.../IgniteCacheFailoverTestSuite.java | 10 +-
.../testsuites/IgniteCacheRestartTestSuite.java | 8 +-
.../testsuites/IgniteCacheTestSuite3.java | 2 +
30 files changed, 1760 insertions(+), 1268 deletions(-)
----------------------------------------------------------------------
[11/21] incubator-ignite git commit: # ignite-157
Posted by vo...@apache.org.
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c5481362
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c5481362
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c5481362
Branch: refs/heads/ignite-gg-9614
Commit: c54813625103ea98591acfb8fe15d5cb4dea7f1c
Parents: 46dda3d
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 8 13:31:41 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 8 13:31:41 2015 +0300
----------------------------------------------------------------------
.../transactions/IgniteTxLocalAdapter.java | 2 +-
.../GridCacheAbstractNodeRestartSelfTest.java | 6 +-
.../distributed/IgniteTxGetAfterStopTest.java | 131 +++++++++++++++++++
.../testsuites/IgniteCacheRestartTestSuite.java | 4 +-
4 files changed, 137 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5481362/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index fc3efba..bd3a4ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1760,7 +1760,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
}
- if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal())) {
+ if (!missed.isEmpty() && cacheCtx.isLocal()) {
return checkMissed(cacheCtx,
retMap,
missed,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5481362/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
index 76020b8..7e65f23 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
@@ -105,9 +105,9 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
disco.setIpFinder(ipFinder);
- disco.setSocketTimeout(10_000);
- disco.setAckTimeout(10_000);
- disco.setNetworkTimeout(10_000);
+ disco.setSocketTimeout(30_000);
+ disco.setAckTimeout(30_000);
+ disco.setNetworkTimeout(30_000);
c.setDiscoverySpi(disco);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5481362/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java
new file mode 100644
index 0000000..469f513
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteTxGetAfterStopTest extends IgniteCacheAbstractTest {
+ /** */
+ private CacheMode cacheMode;
+
+ /** */
+ private NearCacheConfiguration nearCfg;
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return cacheMode;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return nearCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicated() throws Exception {
+ getAfterStop(REPLICATED, null);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitioned() throws Exception {
+ getAfterStop(PARTITIONED, new NearCacheConfiguration());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionedNearDisabled() throws Exception {
+ getAfterStop(PARTITIONED, null);
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param nearCfg Near cache configuration.
+ * @throws Exception If failed.
+ */
+ private void getAfterStop(CacheMode cacheMode, @Nullable NearCacheConfiguration nearCfg) throws Exception {
+ this.cacheMode = cacheMode;
+ this.nearCfg = nearCfg;
+
+ startGrids();
+
+ IgniteCache<Integer, Integer> cache0 = jcache(0);
+ IgniteCache<Integer, Integer> cache1 = jcache(1);
+
+ Integer key0 = primaryKey(cache0);
+ Integer key1 = primaryKey(cache1);
+
+ try (Transaction tx = ignite(0).transactions().txStart()) {
+ log.info("Put: " + key0);
+
+ cache0.put(key0, key0);
+
+ log.info("Stop node.");
+
+ stopGrid(3);
+
+ log.info("Get: " + key1);
+
+ cache0.get(key1);
+
+ log.info("Commit.");
+
+ tx.commit();
+ }
+
+ assertEquals(key0, cache0.get(key0));
+ assertNull(cache1.get(key1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c5481362/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index e5372e8..983d447 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -21,6 +21,7 @@ import junit.framework.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
/**
* In-Memory Data Grid stability test suite on changing topology.
@@ -37,8 +38,7 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
- // TODO IGNITE-157.
- // suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
+ suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicNodeRestartTest.class);
// suite.addTestSuite(IgniteCacheAtomicReplicatedNodeRestartSelfTest.class); // TODO IGNITE-747
[18/21] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-5' into ignite-157
Posted by vo...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ab1f9dd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ab1f9dd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ab1f9dd2
Branch: refs/heads/ignite-gg-9614
Commit: ab1f9dd278a1c1e42cf36b21a2bedcd2f7e4129e
Parents: 250dd8e f6012f1
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 12 09:05:47 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 12 09:05:47 2015 +0300
----------------------------------------------------------------------
.../processors/resource/GridResourceField.java | 11 +
.../processors/resource/GridResourceIoc.java | 387 ++++++-------------
.../processors/resource/GridResourceMethod.java | 13 +
.../resource/GridResourceProcessor.java | 4 +-
.../ignite/internal/util/IgniteUtils.java | 15 +
.../spi/discovery/tcp/TcpDiscoverySpi.java | 24 +-
parent/pom.xml | 2 +
pom.xml | 33 --
8 files changed, 193 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
[04/21] incubator-ignite git commit: # ignite-157
Posted by vo...@apache.org.
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c19402e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c19402e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c19402e0
Branch: refs/heads/ignite-gg-9614
Commit: c19402e0d6085e88da4164cbb9b838e886849c6b
Parents: dff3fc6
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 7 17:43:57 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 7 17:43:57 2015 +0300
----------------------------------------------------------------------
.../ignite/testsuites/IgniteCacheRestartTestSuite.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c19402e0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index a449cfc..983d447 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -21,6 +21,7 @@ import junit.framework.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
/**
* In-Memory Data Grid stability test suite on changing topology.
@@ -35,10 +36,9 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
suite.addTestSuite(GridCachePartitionedTxSalvageSelfTest.class);
- // TODO: IGNITE-157.
- // suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
- // suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
- // suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
+ suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
+ suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
+ suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicNodeRestartTest.class);
// suite.addTestSuite(IgniteCacheAtomicReplicatedNodeRestartSelfTest.class); // TODO IGNITE-747
[02/21] incubator-ignite git commit: # ignite-157-1
Posted by vo...@apache.org.
# ignite-157-1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a238ce35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a238ce35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a238ce35
Branch: refs/heads/ignite-gg-9614
Commit: a238ce357fb0cb0c5378fbfc64341c3167843db5
Parents: 93876df
Author: sboikov <sb...@gridgain.com>
Authored: Thu May 7 14:49:33 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu May 7 16:19:26 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMvccManager.java | 4 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 32 ++++----
.../distributed/dht/GridDhtTxLocalAdapter.java | 27 +++++++
.../distributed/dht/GridDhtTxPrepareFuture.java | 81 ++++++++++----------
.../near/GridAbstractNearTxPrepareFuture.java | 3 +
.../near/GridNearOptimisticTxPrepareFuture.java | 13 +++-
.../GridNearPessimisticTxPrepareFuture.java | 15 +++-
.../cache/distributed/near/GridNearTxLocal.java | 43 +++++------
.../near/GridNearTxPrepareFuture.java | 20 ++---
.../cache/transactions/IgniteInternalTx.java | 4 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 65 +++++++---------
.../cache/transactions/IgniteTxManager.java | 12 +--
.../cache/GridCacheAbstractFullApiSelfTest.java | 2 +-
...ePrimaryNodeFailureRecoveryAbstractTest.java | 4 +-
.../IgniteCacheFailoverTestSuite.java | 7 +-
16 files changed, 178 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 0bb97a9..c05e4b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -510,7 +510,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @return Future.
*/
@SuppressWarnings({"unchecked"})
- @Nullable public <T> GridCacheFuture<T> future(GridCacheVersion ver, IgniteUuid futId) {
+ @Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) {
Collection<? extends GridCacheFuture> futs = this.futs.get(ver);
if (futs != null)
@@ -519,7 +519,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Found future in futures map: " + fut);
- return (GridCacheFuture<T>)fut;
+ return fut;
}
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 07ced0d..614f520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -284,7 +284,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
+ @Override public IgniteInternalFuture<?> prepareAsync() {
if (optimistic()) {
assert isSystemInvalidate();
@@ -296,7 +296,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
nearMiniId,
null,
true,
- null,
null);
}
@@ -305,14 +304,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (fut == null) {
// Future must be created before any exception can be thrown.
- if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(
+ if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture(
cctx,
this,
nearMiniId,
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
true,
needReturnValue(),
- null,
null)))
return prepFut.get();
}
@@ -371,7 +369,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
* @param lastBackups IDs of backup nodes receiving last prepare request.
* @return Future that will be completed when locks are acquired.
*/
- public IgniteInternalFuture<IgniteInternalTx> prepareAsync(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
@Nullable Iterable<IgniteTxEntry> reads,
@Nullable Iterable<IgniteTxEntry> writes,
Map<IgniteTxKey, GridCacheVersion> verMap,
@@ -379,8 +377,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
IgniteUuid nearMiniId,
Map<UUID, Collection<UUID>> txNodes,
boolean last,
- Collection<UUID> lastBackups,
- IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ Collection<UUID> lastBackups
) {
// In optimistic mode prepare still can be called explicitly from salvageTx.
GridDhtTxPrepareFuture fut = prepFut.get();
@@ -389,21 +386,20 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
init();
// Future must be created before any exception can be thrown.
- if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(
+ if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture(
cctx,
this,
nearMiniId,
verMap,
last,
needReturnValue(),
- lastBackups,
- completeCb))) {
+ lastBackups))) {
GridDhtTxPrepareFuture f = prepFut.get();
assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
"[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';
- return f;
+ return chainOnePhasePrepare(f);
}
}
else {
@@ -411,7 +407,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
"[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']';
// Prepare was called explicitly.
- return fut;
+ return chainOnePhasePrepare(fut);
}
if (state() != PREPARING) {
@@ -475,7 +471,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
}
- return fut;
+ return chainOnePhasePrepare(fut);
}
/** {@inheritDoc} */
@@ -517,8 +513,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
}
else
- prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prep.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
try {
f.get(); // Check for errors of a parent future.
@@ -605,8 +601,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
else {
prepFut.complete();
- prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
try {
f.get(); // Check for errors of a parent future.
}
@@ -686,7 +682,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
+ @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
return prepFut.get();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 08fcaf6..d886989 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.*;
@@ -885,6 +886,32 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
*/
protected abstract void clearPrepareFuture(GridDhtTxPrepareFuture fut);
+ /**
+ * @return {@code True} if transaction if finished on prepare step.
+ */
+ protected final boolean commitOnPrepare() {
+ return onePhaseCommit() && !near();
+ }
+
+ /**
+ * @param prepFut Prepare future.
+ * @return If transaction if finished on prepare step returns future which is completed after transaction finish.
+ */
+ protected final IgniteInternalFuture<GridNearTxPrepareResponse> chainOnePhasePrepare(
+ final GridDhtTxPrepareFuture prepFut) {
+ if (commitOnPrepare()) {
+ return finishFuture().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridNearTxPrepareResponse>() {
+ @Override public GridNearTxPrepareResponse applyx(IgniteInternalFuture<IgniteInternalTx> finishFut)
+ throws IgniteCheckedException
+ {
+ return prepFut.get();
+ }
+ });
+ }
+
+ return prepFut;
+ }
+
/** {@inheritDoc} */
@Override public void rollback() throws IgniteCheckedException {
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 3a1a80a..0e64726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -50,19 +50,32 @@ import static org.apache.ignite.transactions.TransactionState.*;
*
*/
@SuppressWarnings("unchecked")
-public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
- implements GridCacheMvccFuture<IgniteInternalTx> {
+public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
+ implements GridCacheMvccFuture<GridNearTxPrepareResponse> {
/** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+ /** */
+ private static final IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse> REDUCER =
+ new IgniteReducer<IgniteInternalTx, GridNearTxPrepareResponse>() {
+ @Override public boolean collect(IgniteInternalTx e) {
+ return true;
+ }
+
+ @Override public GridNearTxPrepareResponse reduce() {
+ // Nothing to aggregate.
+ return null;
+ }
+ };
+
/** Logger. */
private static IgniteLogger log;
/** Context. */
- private GridCacheSharedContext<K, V> cctx;
+ private GridCacheSharedContext<?, ?> cctx;
/** Future ID. */
private IgniteUuid futId;
@@ -128,15 +141,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
/** */
private boolean invoke;
- /** */
- private IgniteInClosure<GridNearTxPrepareResponse> completeCb;
-
/**
* @param cctx Context.
* @param tx Transaction.
* @param nearMiniId Near mini future id.
* @param dhtVerMap DHT versions map.
* @param last {@code True} if this is last prepare operation for node.
+ * @param retVal Return value flag.
* @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
*/
public GridDhtTxPrepareFuture(
@@ -146,19 +157,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
Map<IgniteTxKey, GridCacheVersion> dhtVerMap,
boolean last,
boolean retVal,
- Collection<UUID> lastBackups,
- IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ Collection<UUID> lastBackups
) {
- super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
- @Override public boolean collect(IgniteInternalTx e) {
- return true;
- }
-
- @Override public IgniteInternalTx reduce() {
- // Nothing to aggregate.
- return tx;
- }
- });
+ super(REDUCER);
this.cctx = cctx;
this.tx = tx;
@@ -178,8 +179,6 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
this.retVal = retVal;
- this.completeCb = completeCb;
-
assert dhtMap != null;
assert nearMap != null;
}
@@ -382,7 +381,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
* @param t Error.
*/
public void onError(Throwable t) {
- onDone(tx, t);
+ onDone(null, t);
}
/**
@@ -479,7 +478,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) {
+ @Override public boolean onDone(GridNearTxPrepareResponse res0, Throwable err) {
assert err != null || (initialized() && !hasPending()) : "On done called for prepare future that has " +
"pending mini futures: " + this;
@@ -495,16 +494,15 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
// Must create prepare response before transaction is committed to grab correct return value.
final GridNearTxPrepareResponse res = createPrepareResponse();
- onComplete();
+ onComplete(res);
- if (!tx.near()) {
+ if (tx.commitOnPrepare()) {
if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) {
IgniteInternalFuture<IgniteInternalTx> fut = this.err.get() == null ?
tx.commitAsync() : tx.rollbackAsync();
fut.listen(new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override
- public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
+ @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
try {
if (replied.compareAndSet(false, true))
sendPrepareResponse(res);
@@ -530,15 +528,17 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
}
else {
if (replied.compareAndSet(false, true)) {
+ GridNearTxPrepareResponse res = createPrepareResponse();
+
try {
- sendPrepareResponse(createPrepareResponse());
+ sendPrepareResponse(res);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send prepare response for transaction: " + tx, e);
}
finally {
// Will call super.onDone().
- onComplete();
+ onComplete(res);
}
return true;
@@ -562,16 +562,12 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
}
/**
+ * @param res Response.
* @throws IgniteCheckedException If failed to send response.
*/
private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException {
if (!tx.nearNodeId().equals(cctx.localNodeId()))
cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
- else {
- assert completeCb != null;
-
- completeCb.apply(res);
- }
}
/**
@@ -616,10 +612,10 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
for (IgniteTxEntry e : writes) {
IgniteTxEntry txEntry = tx.entry(e.txKey());
- GridCacheContext cacheCtx = txEntry.context();
-
assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']';
+ GridCacheContext cacheCtx = txEntry.context();
+
while (true) {
try {
GridCacheEntryEx entry = txEntry.cached();
@@ -682,13 +678,14 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
/**
* Completeness callback.
*
+ * @param res Response.
* @return {@code True} if {@code done} flag was changed as a result of this call.
*/
- private boolean onComplete() {
+ private boolean onComplete(@Nullable GridNearTxPrepareResponse res) {
if (last || tx.isSystemInvalidate())
tx.state(PREPARED);
- if (super.onDone(tx, err.get())) {
+ if (super.onDone(res, err.get())) {
// Don't forget to clean up.
cctx.mvcc().removeFuture(this);
@@ -702,7 +699,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
* Completes this future.
*/
public void complete() {
- onComplete();
+ onComplete(null);
}
/**
@@ -717,7 +714,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
if (tx.empty()) {
tx.setRollbackOnly();
- onDone(tx);
+ onDone((GridNearTxPrepareResponse)null);
}
this.reads = reads;
@@ -821,7 +818,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
try {
GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
- GridCacheContext<K, V> cacheCtx = cached.context();
+ GridCacheContext<?, ?> cacheCtx = cached.context();
if (entry.explicitVersion() == null) {
GridCacheMvccCandidate added = cached.candidate(version());
@@ -977,7 +974,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
GridCacheContext cacheCtx = entry.context();
- GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
+ GridDhtCacheAdapter<?, ?> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(entry);
@@ -1234,7 +1231,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED);
for (GridCacheEntryInfo info : res.preloadEntries()) {
- GridCacheContext<K, V> cacheCtx = cctx.cacheContext(info.cacheId());
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(info.cacheId());
while (true) {
GridCacheEntryEx entry = cacheCtx.cache().entryEx(info.key());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
index 905f018..6f94f21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
@@ -153,6 +153,9 @@ public abstract class GridAbstractNearTxPrepareFuture extends GridCompoundIdenti
* @param res Response.
*/
protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+ if (res == null)
+ return;
+
assert res.error() == null : res;
assert F.isEmpty(res.invalidPartitions()) : res;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2fbca7b..110cca4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -535,9 +535,16 @@ public class GridNearOptimisticTxPrepareFuture extends GridAbstractNearTxPrepare
// At this point, if any new node joined, then it is
// waiting for this transaction to complete, so
// partition reassignments are not possible here.
- cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
- @Override public void apply(GridNearTxPrepareResponse res) {
- fut.onResult(n.id(), res);
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+
+ prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+ try {
+ fut.onResult(n.id(), prepFut.get());
+ }
+ catch (IgniteCheckedException e) {
+ fut.onResult(e);
+ }
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 84a4ab8..e3f24f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -203,9 +203,18 @@ public class GridNearPessimisticTxPrepareFuture extends GridAbstractNearTxPrepar
add(fut);
if (node.isLocal()) {
- cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
- @Override public void apply(GridNearTxPrepareResponse res) {
- fut.onResult(res);
+ IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(node.id(),
+ tx,
+ req);
+
+ prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+ try {
+ fut.onResult(prepFut.get());
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+ }
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index f7a43bb..a003d19 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -61,8 +61,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** Future. */
@GridToStringExclude
- private final AtomicReference<IgniteInternalFuture<IgniteInternalTx>> prepFut =
- new AtomicReference<>();
+ private final AtomicReference<IgniteInternalFuture<?>> prepFut = new AtomicReference<>();
/** */
@GridToStringExclude
@@ -682,7 +681,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
+ @Override public IgniteInternalFuture<?> prepareAsync() {
GridAbstractNearTxPrepareFuture fut = (GridAbstractNearTxPrepareFuture)prepFut.get();
if (fut == null) {
@@ -719,10 +718,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut);
- IgniteInternalFuture<IgniteInternalTx> prepareFut = prepFut.get();
+ IgniteInternalFuture<?> prepareFut = prepFut.get();
- prepareFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
GridNearTxFinishFuture fut0 = commitFut.get();
try {
@@ -766,7 +765,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut);
- IgniteInternalFuture<IgniteInternalTx> prepFut = this.prepFut.get();
+ IgniteInternalFuture<?> prepFut = this.prepFut.get();
if (prepFut == null || prepFut.isDone()) {
try {
@@ -790,8 +789,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
}
else {
- prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
try {
// Check for errors in prepare future.
f.get();
@@ -834,12 +833,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
* @return Future that will be completed when locks are acquired.
*/
@SuppressWarnings("TypeMayBeWeakened")
- public IgniteInternalFuture<IgniteInternalTx> prepareAsyncLocal(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
@Nullable Collection<IgniteTxEntry> reads,
@Nullable Collection<IgniteTxEntry> writes,
Map<UUID, Collection<UUID>> txNodes, boolean last,
- Collection<UUID> lastBackups,
- IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ Collection<UUID> lastBackups
) {
if (state() != PREPARING) {
if (timedOut())
@@ -854,15 +852,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
init();
- GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture<>(
+ GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture(
cctx,
this,
IgniteUuid.randomUuid(),
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
last,
needReturnValue() && implicit(),
- lastBackups,
- completeCb);
+ lastBackups);
try {
// At this point all the entries passed in must be enlisted in transaction because this is an
@@ -901,7 +898,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
}
- return fut;
+ return chainOnePhasePrepare(fut);
}
/**
@@ -917,7 +914,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (pessimistic())
prepareAsync();
- IgniteInternalFuture<IgniteInternalTx> prep = prepFut.get();
+ IgniteInternalFuture<?> prep = prepFut.get();
// Do not create finish future if there are no remote nodes.
if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) {
@@ -953,8 +950,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
}
else
- prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prep.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
try {
f.get(); // Check for errors of a parent future.
@@ -990,7 +987,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
cctx.mvcc().addFuture(fut);
- IgniteInternalFuture<IgniteInternalTx> prep = prepFut.get();
+ IgniteInternalFuture<?> prep = prepFut.get();
if (prep == null || prep.isDone()) {
try {
@@ -1006,8 +1003,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
fut.finish();
}
else
- prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) {
+ prep.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
try {
f.get(); // Check for errors of a parent future.
}
@@ -1200,7 +1197,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
+ @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
return prepFut.get();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 962d973..9cf4aca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -648,11 +648,11 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
add(fut);
if (node.isLocal()) {
- cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
- @Override public void apply(GridNearTxPrepareResponse res) {
- fut.onResult(node.id(), res);
- }
- });
+// cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
+// @Override public void apply(GridNearTxPrepareResponse res) {
+// fut.onResult(node.id(), res);
+// }
+// });
}
else {
try {
@@ -755,11 +755,11 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
// At this point, if any new node joined, then it is
// waiting for this transaction to complete, so
// partition reassignments are not possible here.
- cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
- @Override public void apply(GridNearTxPrepareResponse res) {
- fut.onResult(n.id(), res);
- }
- });
+// cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse>() {
+// @Override public void apply(GridNearTxPrepareResponse res) {
+// fut.onResult(n.id(), res);
+// }
+// });
}
else {
assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 8dc07cc..2bed843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -551,7 +551,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
*
* @return Future for prepare step.
*/
- public IgniteInternalFuture<IgniteInternalTx> prepareAsync();
+ public IgniteInternalFuture<?> prepareAsync();
/**
* @param endVer End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
@@ -580,7 +580,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
/**
* @return Future for transaction prepare if prepare is in progress.
*/
- @Nullable public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture();
+ @Nullable public IgniteInternalFuture<?> currentPrepareFuture();
/**
* @param state Transaction state.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 82d68b3..64cc77f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1008,7 +1008,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() {
+ @Nullable @Override public IgniteInternalFuture<?> currentPrepareFuture() {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index d98b4ff..a403f28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -58,9 +58,9 @@ public class IgniteTxHandler {
* @param req Request.
* @return Prepare future.
*/
- public IgniteInternalFuture<IgniteInternalTx> processNearTxPrepareRequest(final UUID nearNodeId,
+ public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId,
final GridNearTxPrepareRequest req) {
- return prepareTx(nearNodeId, null, req, null);
+ return prepareTx(nearNodeId, null, req);
}
/**
@@ -138,32 +138,28 @@ public class IgniteTxHandler {
* @param nearNodeId Near node ID that initiated transaction.
* @param locTx Optional local transaction.
* @param req Near prepare request.
- * @param completeCb Completion callback.
* @return Future for transaction.
*/
- public IgniteInternalFuture<IgniteInternalTx> prepareTx(
+ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareTx(
UUID nearNodeId,
@Nullable GridNearTxLocal locTx,
- GridNearTxPrepareRequest req,
- @Nullable IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ GridNearTxPrepareRequest req
) {
assert nearNodeId != null;
assert req != null;
if (locTx != null) {
- assert completeCb != null;
-
if (req.near()) {
// Make sure not to provide Near entries to DHT cache.
req.cloneEntries();
- return prepareNearTx(nearNodeId, req, completeCb);
+ return prepareNearTx(nearNodeId, req);
}
else
- return prepareColocatedTx(locTx, req, completeCb);
+ return prepareColocatedTx(locTx, req);
}
else
- return prepareNearTx(nearNodeId, req, null);
+ return prepareNearTx(nearNodeId, req);
}
/**
@@ -171,30 +167,27 @@ public class IgniteTxHandler {
*
* @param locTx Local transaction.
* @param req Near prepare request.
- * @param completeCb Completion callback.
* @return Prepare future.
*/
- private IgniteInternalFuture<IgniteInternalTx> prepareColocatedTx(
+ private IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
final GridNearTxLocal locTx,
- final GridNearTxPrepareRequest req,
- final IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ final GridNearTxPrepareRequest req
) {
IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO force preload keys.
return new GridEmbeddedFuture<>(
fut,
- new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception ex) {
+ new C2<Object, Exception, IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+ @Override public IgniteInternalFuture<GridNearTxPrepareResponse> apply(Object o, Exception ex) {
if (ex != null)
throw new GridClosureException(ex);
- IgniteInternalFuture<IgniteInternalTx> fut = locTx.prepareAsyncLocal(
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
req.reads(),
req.writes(),
req.transactionNodes(),
req.last(),
- req.lastBackups(),
- completeCb);
+ req.lastBackups());
if (locTx.isRollbackOnly())
locTx.rollbackAsync();
@@ -202,18 +195,16 @@ public class IgniteTxHandler {
return fut;
}
},
- new C2<IgniteInternalTx, Exception, IgniteInternalTx>() {
- @Nullable @Override public IgniteInternalTx apply(IgniteInternalTx tx, Exception e) {
+ new C2<GridNearTxPrepareResponse, Exception, GridNearTxPrepareResponse>() {
+ @Nullable @Override public GridNearTxPrepareResponse apply(GridNearTxPrepareResponse res, Exception e) {
if (e != null) {
- // tx can be null of exception occurred.
- if (tx != null)
- tx.setRollbackOnly(); // Just in case.
+ locTx.setRollbackOnly(); // Just in case.
if (!(e instanceof IgniteTxOptimisticCheckedException))
- U.error(log, "Failed to prepare DHT transaction: " + tx, e);
+ U.error(log, "Failed to prepare transaction: " + locTx, e);
}
- return tx;
+ return res;
}
}
);
@@ -224,13 +215,11 @@ public class IgniteTxHandler {
*
* @param nearNodeId Near node ID that initiated transaction.
* @param req Near prepare request.
- * @param completeCb Completion callback.
* @return Prepare future.
*/
- private IgniteInternalFuture<IgniteInternalTx> prepareNearTx(
+ private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
final UUID nearNodeId,
- final GridNearTxPrepareRequest req,
- IgniteInClosure<GridNearTxPrepareResponse> completeCb
+ final GridNearTxPrepareRequest req
) {
ClusterNode nearNode = ctx.node(nearNodeId);
@@ -315,7 +304,7 @@ public class IgniteTxHandler {
if (req.returnValue())
tx.needReturnValue(true);
- IgniteInternalFuture<IgniteInternalTx> fut = tx.prepareAsync(
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(
req.reads(),
req.writes(),
req.dhtVersions(),
@@ -323,8 +312,7 @@ public class IgniteTxHandler {
req.miniId(),
req.transactionNodes(),
req.last(),
- req.lastBackups(),
- completeCb);
+ req.lastBackups());
if (tx.isRollbackOnly()) {
try {
@@ -337,8 +325,8 @@ public class IgniteTxHandler {
final GridDhtTxLocal tx0 = tx;
- fut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> txFut) {
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> txFut) {
try {
txFut.get();
}
@@ -354,7 +342,7 @@ public class IgniteTxHandler {
return fut;
}
else
- return new GridFinishedFuture<>((IgniteInternalTx)null);
+ return new GridFinishedFuture<>((GridNearTxPrepareResponse)null);
}
/**
@@ -399,8 +387,7 @@ public class IgniteTxHandler {
* @param res Response.
*/
private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
- GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().
- <IgniteInternalTx>future(res.version(), res.futureId());
+ GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().future(res.version(), res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 8a1d490..2122602 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1816,7 +1816,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (nearVer.equals(tx.nearXidVersion())) {
TransactionState state = tx.state();
- IgniteInternalFuture<IgniteInternalTx> prepFut = tx.currentPrepareFuture();
+ IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
if (prepFut != null && !prepFut.isDone()) {
if (log.isDebugEnabled())
@@ -1828,8 +1828,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
final Collection<GridCacheVersion> processedVers0 = processedVers;
- prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> prepFut) {
+ prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> prepFut) {
if (log.isDebugEnabled())
log.debug("Transaction prepare future finished: " + tx);
@@ -2029,11 +2029,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (tx.state() == PREPARED)
commitIfPrepared(tx);
else {
- IgniteInternalFuture<IgniteInternalTx> prepFut = tx.currentPrepareFuture();
+ IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
if (prepFut != null) {
- prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+ prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
if (tx.state() == PREPARED)
commitIfPrepared(tx);
else if (tx.setRollbackOnly())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 4dc371c..a346b65 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4343,7 +4343,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertEquals(val1, cacheSkipStore.invoke(key, new SetValueProcessor(val2)));
assertEquals(i, map.get(key));
- assertEquals(val2, cacheSkipStore.get(key));
+ assertEquals("For key " + key, val2, cacheSkipStore.get(key));
}
for (String key : keys) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index ee2f16b..f996877 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -199,7 +199,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
- IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+ IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
waitPrepared(ignite(1));
@@ -360,7 +360,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
- IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+ IgniteInternalFuture<?> prepFut = txEx.prepareAsync();
waitPrepared(ignite(1));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a238ce35/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index 529bd23..2acd6a3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -54,10 +54,9 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderFailoverSelfTest.class);
suite.addTestSuite(GridCacheAtomicReplicatedFailoverSelfTest.class);
- // TODO IGNITE-157.
- // suite.addTestSuite(GridCachePartitionedFailoverSelfTest.class);
- // suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class);
- // suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class);
+ suite.addTestSuite(GridCachePartitionedFailoverSelfTest.class);
+ suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class);
+ suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicNodeJoinTest.class);
suite.addTestSuite(IgniteCacheTxNodeJoinTest.class);
[08/21] incubator-ignite git commit: # ignite-157
Posted by vo...@apache.org.
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b3dcbf18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b3dcbf18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b3dcbf18
Branch: refs/heads/ignite-gg-9614
Commit: b3dcbf18407318e7c824a7260fed8c1ec3d3d844
Parents: a2fb8f6
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 8 10:20:30 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 8 11:31:51 2015 +0300
----------------------------------------------------------------------
.../near/GridAbstractNearTxPrepareFuture.java | 222 ------------------
.../near/GridNearOptimisticTxPrepareFuture.java | 15 +-
.../GridNearPessimisticTxPrepareFuture.java | 39 +++-
.../cache/distributed/near/GridNearTxLocal.java | 2 +-
.../near/GridNearTxPrepareFutureAdapter.java | 226 +++++++++++++++++++
.../cache/transactions/IgniteTxHandler.java | 3 +-
.../GridCacheAbstractFailoverSelfTest.java | 7 +
.../IgniteCacheFailoverTestSuite.java | 8 +-
8 files changed, 283 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3dcbf18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
deleted file mode 100644
index 6f94f21..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.near;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-
-import javax.cache.expiry.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
-
-/**
- * Common code for tx prepare in optimistic and pessimistic modes.
- */
-public abstract class GridAbstractNearTxPrepareFuture extends GridCompoundIdentityFuture<IgniteInternalTx>
- implements GridCacheFuture<IgniteInternalTx> {
- /** Logger reference. */
- protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- protected static IgniteLogger log;
-
- /** Context. */
- protected GridCacheSharedContext<?, ?> cctx;
-
- /** Future ID. */
- protected IgniteUuid futId;
-
- /** Transaction. */
- @GridToStringInclude
- protected GridNearTxLocal tx;
-
- /** Error. */
- @GridToStringExclude
- protected AtomicReference<Throwable> err = new AtomicReference<>(null);
-
- /** Trackable flag. */
- protected boolean trackable = true;
-
- /** Full information about transaction nodes mapping. */
- protected GridDhtTxMapping txMapping;
-
- /**
- * @param cctx Context.
- * @param tx Transaction.
- */
- public GridAbstractNearTxPrepareFuture(GridCacheSharedContext cctx, final GridNearTxLocal tx) {
- super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
- @Override public boolean collect(IgniteInternalTx e) {
- return true;
- }
-
- @Override public IgniteInternalTx reduce() {
- // Nothing to aggregate.
- return tx;
- }
- });
-
- assert cctx != null;
- assert tx != null;
-
- this.cctx = cctx;
- this.tx = tx;
-
- futId = IgniteUuid.randomUuid();
-
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridAbstractNearTxPrepareFuture.class);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return tx.xidVersion();
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- trackable = false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return trackable;
- }
-
- /**
- * Prepares transaction.
- */
- public abstract void prepare();
-
- /**
- * @param nodeId Sender.
- * @param res Result.
- */
- public abstract void onResult(UUID nodeId, GridNearTxPrepareResponse res);
-
- /**
- * Checks if mapped transaction can be committed on one phase.
- * One-phase commit can be done if transaction maps to one primary node and not more than one backup.
- */
- protected final void checkOnePhase() {
- if (tx.storeUsed())
- return;
-
- Map<UUID, Collection<UUID>> map = txMapping.transactionNodes();
-
- if (map.size() == 1) {
- Map.Entry<UUID, Collection<UUID>> entry = F.firstEntry(map);
-
- assert entry != null;
-
- Collection<UUID> backups = entry.getValue();
-
- if (backups.size() <= 1)
- tx.onePhaseCommit(true);
- }
- }
-
- /**
- * @param m Mapping.
- * @param res Response.
- */
- protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
- if (res == null)
- return;
-
- assert res.error() == null : res;
- assert F.isEmpty(res.invalidPartitions()) : res;
-
- for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {
- IgniteTxEntry txEntry = tx.entry(entry.getKey());
-
- assert txEntry != null;
-
- GridCacheContext cacheCtx = txEntry.context();
-
- while (true) {
- try {
- if (cacheCtx.isNear()) {
- GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached();
-
- CacheVersionedValue tup = entry.getValue();
-
- nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(),
- tup.version(), m.node().id(), tx.topologyVersion());
- }
- else if (txEntry.cached().detached()) {
- GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
-
- CacheVersionedValue tup = entry.getValue();
-
- detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion());
- }
-
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- // Retry.
- }
- }
- }
-
- tx.implicitSingleResult(res.returnValue());
-
- for (IgniteTxKey key : res.filterFailedKeys()) {
- IgniteTxEntry txEntry = tx.entry(key);
-
- assert txEntry != null : "Missing tx entry for write key: " + key;
-
- txEntry.op(NOOP);
-
- assert txEntry.context() != null;
-
- ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry);
-
- if (expiry != null)
- txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
- }
-
- if (!m.empty()) {
- // Register DHT version.
- tx.addDhtVersion(m.node().id(), res.dhtVersion());
-
- m.dhtVersion(res.dhtVersion());
-
- if (m.near())
- tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3dcbf18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 110cca4..1f2c439 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -47,7 +47,7 @@ import static org.apache.ignite.transactions.TransactionState.*;
/**
*
*/
-public class GridNearOptimisticTxPrepareFuture extends GridAbstractNearTxPrepareFuture
+public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter
implements GridCacheMvccFuture<IgniteInternalTx> {
/** */
private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
@@ -397,8 +397,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridAbstractNearTxPrepare
txMapping = new GridDhtTxMapping();
- ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings =
- new ConcurrentLinkedDeque8<>();
+ ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>();
if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
for (int cacheId : tx.activeCacheIds()) {
@@ -647,7 +646,15 @@ public class GridNearOptimisticTxPrepareFuture extends GridAbstractNearTxPrepare
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridNearOptimisticTxPrepareFuture.class, this, super.toString());
+ Collection<String> pendingFuts = F.viewReadOnly(pending(), new C1<IgniteInternalFuture<?>, String>() {
+ @Override public String apply(IgniteInternalFuture<?> f) {
+ return "[node=" + ((MiniFuture)f).node().id() + ", loc=" + ((MiniFuture)f).node().isLocal() + "]";
+ }
+ });
+
+ return S.toString(GridNearOptimisticTxPrepareFuture.class, this,
+ "pendingFuts", pendingFuts,
+ "super", super.toString());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3dcbf18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index e3f24f5..ba8b92c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -41,7 +41,7 @@ import static org.apache.ignite.transactions.TransactionState.*;
/**
*
*/
-public class GridNearPessimisticTxPrepareFuture extends GridAbstractNearTxPrepareFuture {
+public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter {
/**
* @param cctx Context.
* @param tx Transaction.
@@ -72,7 +72,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridAbstractNearTxPrepar
MiniFuture f = (MiniFuture)fut;
if (f.node().id().equals(nodeId)) {
- f.onError(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+ f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
found = true;
}
@@ -222,8 +222,10 @@ public class GridNearPessimisticTxPrepareFuture extends GridAbstractNearTxPrepar
try {
cctx.io().send(node, req, tx.ioPolicy());
}
+ catch (ClusterTopologyCheckedException e) {
+ fut.onNodeLeft(e);
+ }
catch (IgniteCheckedException e) {
- // Fail the whole thing.
fut.onError(e);
}
}
@@ -242,7 +244,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridAbstractNearTxPrepar
if (err == null)
tx.state(PREPARED);
- if (super.onDone(res, err)) {
+ if (super.onDone(tx, err)) {
cctx.mvcc().removeFuture(this);
return true;
@@ -253,7 +255,15 @@ public class GridNearPessimisticTxPrepareFuture extends GridAbstractNearTxPrepar
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridNearPessimisticTxPrepareFuture.class, this, super.toString());
+ Collection<String> pendingFuts = F.viewReadOnly(pending(), new C1<IgniteInternalFuture<?>, String>() {
+ @Override public String apply(IgniteInternalFuture<?> f) {
+ return "[node=" + ((MiniFuture)f).node().id() + ", loc=" + ((MiniFuture)f).node().isLocal() + "]";
+ }
+ });
+
+ return S.toString(GridNearPessimisticTxPrepareFuture.class, this,
+ "pendingFuts", pendingFuts,
+ "super", super.toString());
}
/**
@@ -306,8 +316,25 @@ public class GridNearPessimisticTxPrepareFuture extends GridAbstractNearTxPrepar
/**
* @param e Error.
*/
+ void onNodeLeft(ClusterTopologyCheckedException e) {
+ onError(e);
+ }
+
+ /**
+ * @param e Error.
+ */
void onError(Throwable e) {
- err.compareAndSet(null, e);
+ if (isDone()) {
+ U.warn(log, "Received error when future is done [fut=" + this + ", err=" + e + ", tx=" + tx + ']');
+
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Error on tx prepare [fut=" + this + ", err=" + e + ", tx=" + tx + ']');
+
+ if (err.compareAndSet(null, e))
+ tx.setRollbackOnly();
onDone(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3dcbf18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a003d19..50d3f3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -682,7 +682,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> prepareAsync() {
- GridAbstractNearTxPrepareFuture fut = (GridAbstractNearTxPrepareFuture)prepFut.get();
+ GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut.get();
if (fut == null) {
// Future must be created before any exception can be thrown.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3dcbf18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
new file mode 100644
index 0000000..60b918c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import javax.cache.expiry.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
+
+/**
+ * Common code for tx prepare in optimistic and pessimistic modes.
+ */
+public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentityFuture<IgniteInternalTx>
+ implements GridCacheFuture<IgniteInternalTx> {
+ /** Logger reference. */
+ protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** */
+ private static final IgniteReducer<IgniteInternalTx, IgniteInternalTx> REDUCER =
+ new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
+ @Override public boolean collect(IgniteInternalTx e) {
+ return true;
+ }
+
+ @Override public IgniteInternalTx reduce() {
+ // Nothing to aggregate.
+ return null;
+ }
+ };
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
+ /** Context. */
+ protected GridCacheSharedContext<?, ?> cctx;
+
+ /** Future ID. */
+ protected IgniteUuid futId;
+
+ /** Transaction. */
+ @GridToStringInclude
+ protected GridNearTxLocal tx;
+
+ /** Error. */
+ @GridToStringExclude
+ protected AtomicReference<Throwable> err = new AtomicReference<>(null);
+
+ /** Trackable flag. */
+ protected boolean trackable = true;
+
+ /** Full information about transaction nodes mapping. */
+ protected GridDhtTxMapping txMapping;
+
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ */
+ public GridNearTxPrepareFutureAdapter(GridCacheSharedContext cctx, final GridNearTxLocal tx) {
+ super(cctx.kernalContext(), REDUCER);
+
+ assert cctx != null;
+ assert tx != null;
+
+ this.cctx = cctx;
+ this.tx = tx;
+
+ futId = IgniteUuid.randomUuid();
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridNearTxPrepareFutureAdapter.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion version() {
+ return tx.xidVersion();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ trackable = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return trackable;
+ }
+
+ /**
+ * Prepares transaction.
+ */
+ public abstract void prepare();
+
+ /**
+ * @param nodeId Sender.
+ * @param res Result.
+ */
+ public abstract void onResult(UUID nodeId, GridNearTxPrepareResponse res);
+
+ /**
+ * Checks if mapped transaction can be committed on one phase.
+ * One-phase commit can be done if transaction maps to one primary node and not more than one backup.
+ */
+ protected final void checkOnePhase() {
+ if (tx.storeUsed())
+ return;
+
+ Map<UUID, Collection<UUID>> map = txMapping.transactionNodes();
+
+ if (map.size() == 1) {
+ Map.Entry<UUID, Collection<UUID>> entry = F.firstEntry(map);
+
+ assert entry != null;
+
+ Collection<UUID> backups = entry.getValue();
+
+ if (backups.size() <= 1)
+ tx.onePhaseCommit(true);
+ }
+ }
+
+ /**
+ * @param m Mapping.
+ * @param res Response.
+ */
+ protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) {
+ if (res == null)
+ return;
+
+ assert res.error() == null : res;
+ assert F.isEmpty(res.invalidPartitions()) : res;
+
+ for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {
+ IgniteTxEntry txEntry = tx.entry(entry.getKey());
+
+ assert txEntry != null;
+
+ GridCacheContext cacheCtx = txEntry.context();
+
+ while (true) {
+ try {
+ if (cacheCtx.isNear()) {
+ GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached();
+
+ CacheVersionedValue tup = entry.getValue();
+
+ nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(),
+ tup.version(), m.node().id(), tx.topologyVersion());
+ }
+ else if (txEntry.cached().detached()) {
+ GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
+
+ CacheVersionedValue tup = entry.getValue();
+
+ detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion());
+ }
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ // Retry.
+ }
+ }
+ }
+
+ tx.implicitSingleResult(res.returnValue());
+
+ for (IgniteTxKey key : res.filterFailedKeys()) {
+ IgniteTxEntry txEntry = tx.entry(key);
+
+ assert txEntry != null : "Missing tx entry for write key: " + key;
+
+ txEntry.op(NOOP);
+
+ assert txEntry.context() != null;
+
+ ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry);
+
+ if (expiry != null)
+ txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
+ }
+
+ if (!m.empty()) {
+ // Register DHT version.
+ tx.addDhtVersion(m.node().id(), res.dhtVersion());
+
+ m.dhtVersion(res.dhtVersion());
+
+ if (m.near())
+ tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3dcbf18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index a403f28..826f392 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -350,7 +349,7 @@ public class IgniteTxHandler {
* @param res Response.
*/
private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) {
- GridAbstractNearTxPrepareFuture fut = (GridAbstractNearTxPrepareFuture)ctx.mvcc()
+ GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc()
.<IgniteInternalTx>future(res.version(), res.futureId());
if (fut == null) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3dcbf18/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
index 5389ef9..6f6355a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
@@ -70,6 +71,12 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac
cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
+ TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+
+ discoSpi.setSocketTimeout(10_000);
+ discoSpi.setAckTimeout(10_000);
+ discoSpi.setNetworkTimeout(10_000);
+
return cfg;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b3dcbf18/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index 2acd6a3..2cc6a5a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -54,10 +54,6 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
suite.addTestSuite(GridCacheAtomicPrimaryWriteOrderFailoverSelfTest.class);
suite.addTestSuite(GridCacheAtomicReplicatedFailoverSelfTest.class);
- suite.addTestSuite(GridCachePartitionedFailoverSelfTest.class);
- suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class);
- suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class);
-
suite.addTestSuite(IgniteCacheAtomicNodeJoinTest.class);
suite.addTestSuite(IgniteCacheTxNodeJoinTest.class);
suite.addTestSuite(IgniteCacheTxFairAffinityNodeJoinTest.class);
@@ -65,6 +61,10 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheTxNearDisabledPutGetRestartTest.class);
suite.addTestSuite(IgniteCacheTxNearDisabledFairAffinityPutGetRestartTest.class);
+ suite.addTestSuite(GridCachePartitionedFailoverSelfTest.class);
+ suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class);
+ suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class);
+
return suite;
}
}
[13/21] incubator-ignite git commit: # ignite-157
Posted by vo...@apache.org.
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/30d306a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/30d306a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/30d306a4
Branch: refs/heads/ignite-gg-9614
Commit: 30d306a4a14e42fe93047947f7346271c81bf98f
Parents: 4dd6a63
Author: sboikov <sb...@gridgain.com>
Authored: Fri May 8 13:36:26 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri May 8 13:36:26 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/30d306a4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 2e56b7a..574680a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -138,6 +138,8 @@ public class IgniteCacheTestSuite3 extends TestSuite {
// Cache interceptor tests.
suite.addTest(IgniteCacheInterceptorSelfTestSuite.suite());
+ suite.addTestSuite(IgniteTxGetAfterStopTest.class);
+
return suite;
}
}
[07/21] incubator-ignite git commit: # ignite-157
Posted by vo...@apache.org.
# ignite-157
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fcb45bb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fcb45bb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fcb45bb5
Branch: refs/heads/ignite-gg-9614
Commit: fcb45bb50533802a5299f67419bd76bfba4e2db7
Parents: 3ddbff9
Author: sboikov <se...@inria.fr>
Authored: Thu May 7 22:26:33 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Thu May 7 22:26:33 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/testsuites/IgniteCacheRestartTestSuite.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fcb45bb5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index e5372e8..a8f15dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -35,9 +35,9 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
suite.addTestSuite(GridCachePartitionedTxSalvageSelfTest.class);
- suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
- suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
// TODO IGNITE-157.
+ //suite.addTestSuite(GridCachePartitionedNodeRestartTest.class);
+ //suite.addTestSuite(GridCachePartitionedOptimisticTxNodeRestartTest.class);
// suite.addTestSuite(GridCacheReplicatedNodeRestartSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicNodeRestartTest.class);