You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/01/21 10:04:52 UTC
[07/12] ignite git commit: IGNITE-2236: Optimized GridCompoundFuture:
- Listener is "inlined" into the class; - Removed "ignoreChildFailures" field.
IGNITE-2236: Optimized GridCompoundFuture:
- Listener is "inlined" into the class;
- Removed "ignoreChildFailures" field.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c302e40
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c302e40
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c302e40
Branch: refs/heads/ignite-gg-10837
Commit: 1c302e401b90928d48370757518e505383eb46bf
Parents: 27c9064
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Jan 20 17:17:01 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jan 20 17:17:01 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMvccManager.java | 43 ++-
.../distributed/GridCacheTxRecoveryFuture.java | 2 +-
.../dht/CacheDistributedGetFutureAdapter.java | 2 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 2 +-
.../distributed/dht/GridDhtLockFuture.java | 2 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 2 +-
.../colocated/GridDhtColocatedLockFuture.java | 2 +-
.../distributed/near/GridNearLockFuture.java | 2 +-
...arOptimisticSerializableTxPrepareFuture.java | 47 ++-
.../GridNearPessimisticTxPrepareFuture.java | 6 +-
.../near/GridNearTxFinishFuture.java | 29 +-
.../service/GridServiceProcessor.java | 17 +-
.../util/future/GridCompoundFuture.java | 314 +++++--------------
.../util/future/GridCompoundIdentityFuture.java | 6 +-
.../internal/util/future/GridFutureAdapter.java | 2 +
.../ignite/testframework/GridTestUtils.java | 7 +-
16 files changed, 195 insertions(+), 290 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dbc6992..c7d1f62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,17 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.events.DiscoveryEvent;
@@ -63,6 +52,18 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
@@ -982,9 +983,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
*/
@SuppressWarnings("unchecked")
public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer) {
- GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>();
-
- res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class);
+ GridCompoundFuture<Object, Object> res = new FinishAtomicUpdateFuture();
for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) {
IgniteInternalFuture<Void> complete = fut.completeFuture(topVer);
@@ -1221,4 +1220,20 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
return S.toString(FinishLockFuture.class, this, super.toString());
}
}
+
+ /**
+ * Finish atomic update future.
+ */
+ private static class FinishAtomicUpdateFuture extends GridCompoundFuture<Object, Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override protected boolean ignoreFailure(Throwable err) {
+ Class cls = err.getClass();
+
+ return ClusterTopologyCheckedException.class.isAssignableFrom(cls) ||
+ CachePartialUpdateCheckedException.class.isAssignableFrom(cls);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 1648de0..5a4a1ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -90,7 +90,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
UUID failedNodeId,
Map<UUID, Collection<UUID>> txNodes)
{
- super(cctx.kernalContext(), CU.boolReducer());
+ super(CU.boolReducer());
this.cctx = cctx;
this.tx = tx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 40eec63..7efaf49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -132,7 +132,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
boolean needVer,
boolean keepCacheObjects
) {
- super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
+ super(CU.<K, V>mapsReducer(keys.size()));
assert !F.isEmpty(keys);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index e410228..cb8c842 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -137,7 +137,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals
) {
- super(cctx.kernalContext(), CU.<GridCacheEntryInfo>collectionsReducer());
+ super(CU.<GridCacheEntryInfo>collectionsReducer());
assert reader != null;
assert !F.isEmpty(keys);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 1c3e052..07755e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -194,7 +194,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
CacheEntryPredicate[] filter,
boolean skipStore,
boolean keepBinary) {
- super(cctx.kernalContext(), CU.boolReducer());
+ super(CU.boolReducer());
assert nearNodeId != null;
assert nearLockVer != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 0e5db05..8c295ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -99,7 +99,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
* @param commit Commit flag.
*/
public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter tx, boolean commit) {
- super(cctx.kernalContext(), F.<IgniteInternalTx>identityReducer(tx));
+ super(F.<IgniteInternalTx>identityReducer(tx));
this.cctx = cctx;
this.tx = tx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index cfeee4b..e4c6b71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -169,7 +169,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
CacheEntryPredicate[] filter,
boolean skipStore,
boolean keepBinary) {
- super(cctx.kernalContext(), CU.boolReducer());
+ super(CU.boolReducer());
assert keys != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 55c5ab6..5d4fc01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -177,7 +177,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
CacheEntryPredicate[] filter,
boolean skipStore,
boolean keepBinary) {
- super(cctx.kernalContext(), CU.boolReducer());
+ super(CU.boolReducer());
assert keys != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 2090e04..4f9f227 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -90,9 +90,11 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
super(cctx, tx);
assert tx.optimistic() && tx.serializable() : tx;
+ }
- // Should wait for all mini futures completion before finishing tx.
- ignoreChildFailures(IgniteCheckedException.class);
+ /** {@inheritDoc} */
+ @Override protected boolean ignoreFailure(Throwable err) {
+ return IgniteCheckedException.class.isAssignableFrom(err.getClass());
}
/** {@inheritDoc} */
@@ -629,32 +631,43 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
/**
- *
+ * Client remap future.
*/
private static class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> {
/** */
- private boolean remap = true;
+ private static final long serialVersionUID = 0L;
/**
- *
+ * Constructor.
*/
public ClientRemapFuture() {
- super();
+ super(new ClientRemapFutureReducer());
+ }
+ }
- reducer(new IgniteReducer<GridNearTxPrepareResponse, Boolean>() {
- @Override public boolean collect(GridNearTxPrepareResponse res) {
- assert res != null;
+ /**
+ * Client remap future reducer.
+ */
+ private static class ClientRemapFutureReducer implements IgniteReducer<GridNearTxPrepareResponse, Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
- if (res.clientRemapVersion() == null)
- remap = false;
+ /** Remap flag. */
+ private boolean remap = true;
- return true;
- }
+ /** {@inheritDoc} */
+ @Override public boolean collect(@Nullable GridNearTxPrepareResponse res) {
+ assert res != null;
- @Override public Boolean reduce() {
- return remap;
- }
- });
+ if (res.clientRemapVersion() == null)
+ remap = false;
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean reduce() {
+ return remap;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 9ee9aea..8170008 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -63,9 +63,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
super(cctx, tx);
assert tx.pessimistic() : tx;
+ }
- // Should wait for all mini futures completion before finishing tx.
- ignoreChildFailures(IgniteCheckedException.class);
+ /** {@inheritDoc} */
+ @Override protected boolean ignoreFailure(Throwable err) {
+ return IgniteCheckedException.class.isAssignableFrom(err.getClass());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 26e189b..3c33bc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -28,7 +28,6 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -107,7 +106,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param commit Commit flag.
*/
public GridNearTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridNearTxLocal tx, boolean commit) {
- super(cctx.kernalContext(), F.<IgniteInternalTx>identityReducer(tx));
+ super(F.<IgniteInternalTx>identityReducer(tx));
this.cctx = cctx;
this.tx = tx;
@@ -644,16 +643,30 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (f.getClass() == FinishMiniFuture.class) {
FinishMiniFuture fut = (FinishMiniFuture)f;
- return "FinishFuture[node=" + fut.node().id() +
- ", loc=" + fut.node().isLocal() +
- ", done=" + fut.isDone() + "]";
+ ClusterNode node = fut.node();
+
+ if (node != null) {
+ return "FinishFuture[node=" + node.id() +
+ ", loc=" + node.isLocal() +
+ ", done=" + fut.isDone() + ']';
+ }
+ else {
+ return "FinishFuture[node=null, done=" + fut.isDone() + ']';
+ }
}
else if (f.getClass() == CheckBackupMiniFuture.class) {
CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
- return "CheckBackupFuture[node=" + fut.node().id() +
- ", loc=" + fut.node().isLocal() +
- ", done=" + f.isDone() + "]";
+ ClusterNode node = fut.node();
+
+ if (node != null) {
+ return "CheckBackupFuture[node=" + node.id() +
+ ", loc=" + node.isLocal() +
+ ", done=" + f.isDone() + "]";
+ }
+ else {
+ return "CheckBackupFuture[node=null, done=" + f.isDone() + "]";
+ }
}
else if (f.getClass() == CheckRemoteTxMiniFuture.class) {
CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6b05edd..2841083 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -515,10 +515,10 @@ public class GridServiceProcessor extends GridProcessorAdapter {
*/
@SuppressWarnings("unchecked")
public IgniteInternalFuture<?> cancelAll() {
- Collection<IgniteInternalFuture<?>> futs = new ArrayList<>();
-
Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE);
+ GridCompoundFuture res = null;
+
while (it.hasNext()) {
Cache.Entry<Object, Object> e = it.next();
@@ -527,11 +527,20 @@ public class GridServiceProcessor extends GridProcessorAdapter {
GridServiceDeployment dep = (GridServiceDeployment)e.getValue();
+ if (res == null)
+ res = new GridCompoundFuture<>();
+
// Cancel each service separately.
- futs.add(cancel(dep.configuration().getName()));
+ res.add(cancel(dep.configuration().getName()));
}
- return futs.isEmpty() ? new GridFinishedFuture<>() : new GridCompoundFuture(null, futs);
+ if (res != null) {
+ res.markInitialized();
+
+ return res;
+ }
+ else
+ return new GridFinishedFuture<>();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 4b2461e..c382497 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -17,15 +17,11 @@
package org.apache.ignite.internal.util.future;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
@@ -35,55 +31,48 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.jetbrains.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
/**
* Future composed of multiple inner futures.
*/
-public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
+public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements IgniteInClosure<IgniteInternalFuture<T>> {
/** */
private static final long serialVersionUID = 0L;
- /** */
- private static final int INITED = 0b1;
+ /** Initialization flag. */
+ private static final int INIT_FLAG = 0x1;
- /** */
- private static final AtomicIntegerFieldUpdater<GridCompoundFuture> flagsUpd =
- AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "flags");
+ /** Flags updater. */
+ private static final AtomicIntegerFieldUpdater<GridCompoundFuture> FLAGS_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "initFlag");
- /** */
- private static final AtomicIntegerFieldUpdater<GridCompoundFuture> lsnrCallsUpd =
+ /** Listener calls updater. */
+ private static final AtomicIntegerFieldUpdater<GridCompoundFuture> LSNR_CALLS_UPD =
AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");
/** Futures. */
protected final ArrayList<IgniteInternalFuture<T>> futs = new ArrayList<>();
- /** */
- @GridToStringExclude
- private final Listener lsnr = new Listener();
-
/** Reducer. */
@GridToStringInclude
- private IgniteReducer<T, R> rdc;
-
- /** Exceptions to ignore. */
- private Class<? extends Throwable>[] ignoreChildFailures;
+ private final IgniteReducer<T, R> rdc;
- /**
- * Updated via {@link #flagsUpd}.
- *
- * @see #INITED
- */
+ /** Initialization flag. Updated via {@link #FLAGS_UPD}. */
@SuppressWarnings("unused")
- private volatile int flags;
+ private volatile int initFlag;
- /** Updated via {@link #lsnrCallsUpd}. */
+ /** Listener calls. Updated via {@link #LSNR_CALLS_UPD}. */
@SuppressWarnings("unused")
private volatile int lsnrCalls;
/**
- *
+ * Default constructor.
*/
public GridCompoundFuture() {
- // No-op.
+ this(null);
}
/**
@@ -93,19 +82,59 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
this.rdc = rdc;
}
- /**
- * @param rdc Reducer to add.
- * @param futs Futures to add.
- */
- public GridCompoundFuture(
- @Nullable IgniteReducer<T, R> rdc,
- @Nullable Iterable<IgniteInternalFuture<T>> futs
- ) {
- this.rdc = rdc;
+ /** {@inheritDoc} */
+ @Override public void apply(IgniteInternalFuture<T> fut) {
+ try {
+ T t = fut.get();
+
+ try {
+ if (rdc != null && !rdc.collect(t))
+ onDone(rdc.reduce());
+ }
+ catch (RuntimeException e) {
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
- addAll(futs);
+ // Exception in reducer is a bug, so we bypass checkComplete here.
+ onDone(e);
+ }
+ catch (AssertionError e) {
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
- markInitialized();
+ // Bypass checkComplete because need to rethrow.
+ onDone(e);
+
+ throw e;
+ }
+ }
+ catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException |
+ ClusterTopologyCheckedException e) {
+ if (!ignoreFailure(e))
+ onDone(e);
+ }
+ catch (IgniteCheckedException e) {
+ if (!ignoreFailure(e)) {
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
+
+ onDone(e);
+ }
+ }
+ catch (RuntimeException e) {
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
+
+ onDone(e);
+ }
+ catch (AssertionError e) {
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
+
+ // Bypass checkComplete because need to rethrow.
+ onDone(e);
+
+ throw e;
+ }
+
+ LSNR_CALLS_UPD.incrementAndGet(GridCompoundFuture.this);
+
+ checkComplete();
}
/** {@inheritDoc} */
@@ -125,43 +154,20 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
*
* @return Collection of futures.
*/
- private Collection<IgniteInternalFuture<T>> futures(boolean pending) {
+ public Collection<IgniteInternalFuture<T>> futures() {
synchronized (futs) {
- Collection<IgniteInternalFuture<T>> res = new ArrayList<>(futs.size());
-
- for (IgniteInternalFuture<T> fut : futs) {
- if (!pending || !fut.isDone())
- res.add(fut);
- }
-
- return res;
+ return new ArrayList<>(futs);
}
}
/**
- * Gets collection of futures.
- *
- * @return Collection of futures.
- */
- public Collection<IgniteInternalFuture<T>> futures() {
- return futures(false);
- }
-
- /**
- * Gets pending (unfinished) futures.
+ * Checks if this compound future should ignore this particular exception.
*
- * @return Pending futures.
- */
- public Collection<IgniteInternalFuture<T>> pending() {
- return futures(true);
- }
-
- /**
- * @param ignoreChildFailures Flag indicating whether compound future should ignore child futures failures.
+ * @param err Exception to check.
+ * @return {@code True} if this error should be ignored.
*/
- @SafeVarargs
- public final void ignoreChildFailures(Class<? extends Throwable>... ignoreChildFailures) {
- this.ignoreChildFailures = ignoreChildFailures;
+ protected boolean ignoreFailure(Throwable err) {
+ return false;
}
/**
@@ -187,14 +193,6 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
}
/**
- * @return {@code True} if this future was initialized. Initialization happens when
- * {@link #markInitialized()} method is called on future.
- */
- public boolean initialized() {
- return flagSet(INITED);
- }
-
- /**
* Adds a future to this compound future.
*
* @param fut Future to add.
@@ -206,7 +204,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
futs.add(fut);
}
- fut.listen(lsnr);
+ fut.listen(this);
if (isCancelled()) {
try {
@@ -219,76 +217,18 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
}
/**
- * Adds futures to this compound future.
- *
- * @param futs Futures to add.
- */
- @SafeVarargs
- public final void addAll(@Nullable IgniteInternalFuture<T>... futs) {
- addAll(F.asList(futs));
- }
-
- /**
- * Adds futures to this compound future.
- *
- * @param futs Futures to add.
- */
- public void addAll(@Nullable Iterable<IgniteInternalFuture<T>> futs) {
- if (futs != null) {
- for (IgniteInternalFuture<T> fut : futs)
- add(fut);
- }
- }
-
- /**
- * Gets optional reducer.
- *
- * @return Optional reducer.
- */
- @Nullable public IgniteReducer<T, R> reducer() {
- return rdc;
- }
-
- /**
- * Sets optional reducer.
- *
- * @param rdc Optional reducer.
- */
- public void reducer(@Nullable IgniteReducer<T, R> rdc) {
- this.rdc = rdc;
- }
-
- /**
- * @param flag Flag to CAS.
- * @return {@code True} if CAS succeeds.
- */
- private boolean casFlag(int flag) {
- for (;;) {
- int flags0 = flags;
-
- if ((flags0 & flag) != 0)
- return false;
-
- if (flagsUpd.compareAndSet(this, flags0, flags0 | flag))
- return true;
- }
- }
-
- /**
- * @param flag Flag to check.
- * @return {@code True} if set.
+ * @return {@code True} if this future was initialized. Initialization happens when
+ * {@link #markInitialized()} method is called on future.
*/
- private boolean flagSet(int flag) {
- return (flags & flag) != 0;
+ public boolean initialized() {
+ return initFlag == INIT_FLAG;
}
/**
* Mark this future as initialized.
*/
public void markInitialized() {
- if (casFlag(INITED))
- // Check complete to make sure that we take care
- // of all the ignored callbacks.
+ if (FLAGS_UPD.compareAndSet(this, 0, INIT_FLAG))
checkComplete();
}
@@ -296,7 +236,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* Check completeness of the future.
*/
private void checkComplete() {
- if (flagSet(INITED) && !isDone() && lsnrCalls == futuresSize()) {
+ if (initialized() && !isDone() && lsnrCalls == futuresSize()) {
try {
onDone(rdc != null ? rdc.reduce() : null);
}
@@ -324,26 +264,6 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
}
}
- /**
- * Checks if this compound future should ignore this particular exception.
- *
- * @param err Exception to check.
- * @return {@code True} if this error should be ignored.
- */
- private boolean ignoreFailure(@Nullable Throwable err) {
- if (err == null)
- return true;
-
- if (ignoreChildFailures != null) {
- for (Class<? extends Throwable> ignoreCls : ignoreChildFailures) {
- if (ignoreCls.isAssignableFrom(err.getClass()))
- return true;
- }
- }
-
- return false;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCompoundFuture.class, this,
@@ -358,72 +278,4 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
})
);
}
-
- /**
- * Listener for futures.
- */
- private class Listener implements IgniteInClosure<IgniteInternalFuture<T>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void apply(IgniteInternalFuture<T> fut) {
- try {
- T t = fut.get();
-
- try {
- if (rdc != null && !rdc.collect(t))
- onDone(rdc.reduce());
- }
- catch (RuntimeException e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
-
- // Exception in reducer is a bug, so we bypass checkComplete here.
- onDone(e);
- }
- catch (AssertionError e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
-
- // Bypass checkComplete because need to rethrow.
- onDone(e);
-
- throw e;
- }
- }
- catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException |
- ClusterTopologyCheckedException e) {
- if (!ignoreFailure(e))
- onDone(e);
- }
- catch (IgniteCheckedException e) {
- if (!ignoreFailure(e)) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
-
- onDone(e);
- }
- }
- catch (RuntimeException e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
-
- onDone(e);
- }
- catch (AssertionError e) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
-
- // Bypass checkComplete because need to rethrow.
- onDone(e);
-
- throw e;
- }
-
- lsnrCallsUpd.incrementAndGet(GridCompoundFuture.this);
-
- checkComplete();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return "Compound future listener []";
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java
index bb5abf2..4010ccd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundIdentityFuture.java
@@ -17,13 +17,12 @@
package org.apache.ignite.internal.util.future;
-import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteReducer;
import org.jetbrains.annotations.Nullable;
/**
- * Future composed of multiple inner futures.
+ * Compound future with reducer which accepts and produces results of the same type.
*/
public class GridCompoundIdentityFuture<T> extends GridCompoundFuture<T, T> {
/** */
@@ -37,10 +36,9 @@ public class GridCompoundIdentityFuture<T> extends GridCompoundFuture<T, T> {
}
/**
- * @param ctx Context.
* @param rdc Reducer.
*/
- public GridCompoundIdentityFuture(GridKernalContext ctx, @Nullable IgniteReducer<T, T> rdc) {
+ public GridCompoundIdentityFuture(@Nullable IgniteReducer<T, T> rdc) {
super(rdc);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index a1720d5..c6a6a44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -72,6 +73,7 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
private boolean ignoreInterrupts;
/** */
+ @GridToStringExclude
private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr;
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/1c302e40/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 26a8994..c7940c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -648,10 +648,11 @@ public final class GridTestUtils {
});
// Compound future, that adds cancel() support to execution future.
- GridCompoundFuture<Long, Long> compFut = new GridCompoundFuture<>();
+ GridCompoundFuture<Long, Long> compFut = new GridCompoundFuture<>(F.sumLongReducer());
+
+ compFut.add(cancelFut);
+ compFut.add(runFut);
- compFut.addAll(cancelFut, runFut);
- compFut.reducer(F.sumLongReducer());
compFut.markInitialized();
cancelFut.onDone();