You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/03/04 19:16:25 UTC
[3/5] incubator-ignite git commit: futures: api cleanup
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index eed273d..c09e51b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -420,7 +420,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
}
catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(ctx, e);
+ return new GridFinishedFuture<>(e);
}
// Register per-routine notifications listener if ordered messaging is used.
@@ -459,7 +459,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// Stop with exception if projection is empty.
if (nodes.isEmpty() && !locIncluded) {
- return new GridFinishedFuture<>(ctx,
+ return new GridFinishedFuture<>(
new ClusterTopologyCheckedException("Failed to register remote continuous listener (projection is empty)."));
}
@@ -553,7 +553,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
hnd.onListenerRegistered(routineId, ctx);
}
catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(ctx,
+ return new GridFinishedFuture<>(
new IgniteCheckedException("Failed to register handler locally: " + hnd, e));
}
}
@@ -1617,6 +1617,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private GridKernalContext ctx;
+
/** Consume ID. */
private UUID routineId;
@@ -1641,7 +1644,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param routineId Consume ID.
*/
StartFuture(GridKernalContext ctx, UUID routineId) {
- super(ctx);
+ this.ctx = ctx;
this.routineId = routineId;
}
@@ -1701,18 +1704,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** Timeout object. */
private volatile GridTimeoutObject timeoutObj;
- /**
- * Required by {@link Externalizable}.
- */
- public StopFuture() {
- // No-op.
- }
+ /** */
+ private GridKernalContext ctx;
/**
* @param ctx Kernal context.
*/
StopFuture(GridKernalContext ctx) {
- super(ctx);
+ super();
+ this.ctx = ctx;
}
/**
@@ -1762,7 +1762,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param nodeId Master node ID.
*/
SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) {
- super(ctx);
+ super();
this.nodeId = nodeId;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
index 5efcfe9..ae7052a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
@@ -48,7 +48,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {
* @param dataLdr Data loader.
*/
GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) {
- super(ctx);
+ super();
assert dataLdr != null;
@@ -57,8 +57,6 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {
/** {@inheritDoc} */
@Override public boolean cancel() throws IgniteCheckedException {
- checkValid();
-
if (onCancelled()) {
dataLdr.closeEx(true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
index ed3bbcb..1e20486 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
@@ -377,7 +377,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
enterBusy();
try {
- GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
+ GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
resFut.listenAsync(rmvActiveFut);
@@ -397,7 +397,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
return new IgniteFutureImpl<>(resFut);
}
catch (IgniteException e) {
- return new IgniteFinishedFutureImpl<>(ctx, e);
+ return new IgniteFinishedFutureImpl<>(e);
}
finally {
leaveBusy();
@@ -849,7 +849,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
isLocNode = node.equals(ctx.discovery().localNode());
entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
+ curFut = new GridFutureAdapter<>();
curFut.listenAsync(signalC);
sem = new Semaphore(parallelOps);
@@ -878,7 +878,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
entries0 = entries;
entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
+ curFut = new GridFutureAdapter<>();
curFut.listenAsync(signalC);
}
}
@@ -915,7 +915,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay
curFut0 = curFut;
entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
+ curFut = new GridFutureAdapter<>();
curFut.listenAsync(signalC);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index e61deee..52f762a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -303,7 +303,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
locVal += l;
- return new GridFinishedFuture<>(ctx.kernalContext(), updated ? locVal : curVal);
+ return new GridFinishedFuture<>(updated ? locVal : curVal);
}
}
finally {
@@ -347,7 +347,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
locVal += l;
- return new GridFinishedFuture<>(ctx.kernalContext(), updated ? locVal : curVal);
+ return new GridFinishedFuture<>(updated ? locVal : curVal);
}
}
finally {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
index d0ef4ce..7dc9567 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
@@ -49,7 +49,7 @@ public class IgniteHadoopNoopProcessor extends IgniteHadoopProcessorAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
- return new GridFinishedFutureEx<>(new IgniteCheckedException("Hadoop is not available."));
+ return new GridFinishedFuture<>(new IgniteCheckedException("Hadoop is not available."));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index e960422..c5ac658 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -406,7 +406,7 @@ public class IgfsDataManager extends IgfsManager {
byte[] res = fut.get();
if (res == null) {
- GridFutureAdapter<byte[]> rmtReadFut = new GridFutureAdapter<>(igfsCtx.kernalContext());
+ GridFutureAdapter<byte[]> rmtReadFut = new GridFutureAdapter<>();
IgniteInternalFuture<byte[]> oldRmtReadFut = rmtReadFuts.putIfAbsent(key, rmtReadFut);
@@ -599,7 +599,7 @@ public class IgfsDataManager extends IgfsManager {
if (log.isDebugEnabled())
log.debug("Cannot delete content of not-data file: " + fileInfo);
- return new GridFinishedFuture<>(igfsCtx.kernalContext());
+ return new GridFinishedFuture<>();
}
else
return delWorker.deleteAsync(fileInfo);
@@ -1256,14 +1256,14 @@ public class IgfsDataManager extends IgfsManager {
// Additional size check.
if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax())
- return new GridFinishedFuture<Object>(igfsCtx.kernalContext(),
+ return new GridFinishedFuture<Object>(
new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " +
"exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'));
}
catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(igfsCtx.kernalContext(), new IgniteCheckedException("Failed to store data " +
+ return new GridFinishedFuture<>(new IgniteCheckedException("Failed to store data " +
"block due to unexpected exception.", e));
}
}
@@ -1674,7 +1674,7 @@ public class IgfsDataManager extends IgfsManager {
* Gracefully stops worker by adding STOP_INFO to queue.
*/
private void stop() {
- delReqs.offer(F.t(new GridFutureAdapter<>(igfsCtx.kernalContext()), stopInfo));
+ delReqs.offer(F.t(new GridFutureAdapter<>(), stopInfo));
}
/**
@@ -1682,7 +1682,7 @@ public class IgfsDataManager extends IgfsManager {
* @return Future which completes when entry is actually removed.
*/
private IgniteInternalFuture<Object> deleteAsync(IgfsFileInfo info) {
- GridFutureAdapter<Object> fut = new GridFutureAdapter<>(igfsCtx.kernalContext());
+ GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
delReqs.offer(F.t(fut, info));
@@ -1795,7 +1795,7 @@ public class IgfsDataManager extends IgfsManager {
* @param fileId File id.
*/
private WriteCompletionFuture(GridKernalContext ctx, IgniteUuid fileId) {
- super(ctx);
+ super();
assert fileId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index eff987e..4fd5f87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -1604,9 +1604,9 @@ public final class IgfsImpl implements IgfsEx {
IgniteUuid id = meta.softDelete(null, null, ROOT_ID);
if (id == null)
- return new GridFinishedFuture<Object>(igfsCtx.kernalContext());
+ return new GridFinishedFuture<Object>();
else {
- GridFutureAdapter<Object> fut = new GridFutureAdapter<>(igfsCtx.kernalContext());
+ GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
GridFutureAdapter<Object> oldFut = delFuts.putIfAbsent(id, fut);
@@ -1625,7 +1625,7 @@ public final class IgfsImpl implements IgfsEx {
}
}
catch (IgniteCheckedException e) {
- return new GridFinishedFuture<Object>(igfsCtx.kernalContext(), e);
+ return new GridFinishedFuture<Object>(e);
}
}
@@ -1640,7 +1640,7 @@ public final class IgfsImpl implements IgfsEx {
GridCompoundFuture<Object, Object> resFut = new GridCompoundFuture<>(igfsCtx.kernalContext());
for (IgniteUuid id : ids) {
- GridFutureAdapter<Object> fut = new GridFutureAdapter<>(igfsCtx.kernalContext());
+ GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
IgniteInternalFuture<Object> oldFut = delFuts.putIfAbsent(id, fut);
@@ -1662,7 +1662,7 @@ public final class IgfsImpl implements IgfsEx {
return resFut;
}
else
- return new GridFinishedFuture<>(igfsCtx.kernalContext());
+ return new GridFinishedFuture<>();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index 1d5ba1a..8a8b858 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -118,7 +118,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
case LIST_PATHS: {
IgfsMessage res = execute(ses, cmd, msg, in);
- fut = res == null ? null : new GridFinishedFuture<>(ctx, res);
+ fut = res == null ? null : new GridFinishedFuture<>(res);
break;
}
@@ -138,7 +138,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
return fut;
}
catch (Exception e) {
- return new GridFinishedFuture<>(ctx, e);
+ return new GridFinishedFuture<>(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index f503161..92b39f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -193,10 +193,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
*/
private IgniteInternalFuture<?> rebuildIndexes(@Nullable final String space, @Nullable final TypeDescriptor desc) {
if (idx == null)
- return new GridFinishedFuture<>(ctx, new IgniteCheckedException("Indexing is disabled."));
+ return new GridFinishedFuture<>(new IgniteCheckedException("Indexing is disabled."));
if (desc == null || !desc.registered())
- return new GridFinishedFuture<Void>(ctx);
+ return new GridFinishedFuture<Void>();
final GridWorkerFuture<?> fut = new GridWorkerFuture<Void>();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index 8c9ef1d..6c1cedb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -90,7 +90,7 @@ public class GridRestProcessor extends GridProcessorAdapter {
*/
private IgniteInternalFuture<GridRestResponse> handleAsync0(final GridRestRequest req) {
if (!busyLock.tryReadLock())
- return new GridFinishedFuture<>(ctx,
+ return new GridFinishedFuture<>(
new IgniteCheckedException("Failed to handle request (received request while stopping grid)."));
try {
@@ -156,7 +156,7 @@ public class GridRestProcessor extends GridProcessorAdapter {
startLatch.await();
}
catch (InterruptedException e) {
- return new GridFinishedFuture<>(ctx, new IgniteCheckedException("Failed to handle request " +
+ return new GridFinishedFuture<>(new IgniteCheckedException("Failed to handle request " +
"(protocol handler was interrupted when awaiting grid start).", e));
}
}
@@ -185,10 +185,10 @@ public class GridRestProcessor extends GridProcessorAdapter {
U.warn(log, "Cannot update response session token: " + e1.getMessage());
}
- return new GridFinishedFuture<>(ctx, res);
+ return new GridFinishedFuture<>(res);
}
catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(ctx, new GridRestResponse(STATUS_AUTH_FAILED, e.getMessage()));
+ return new GridFinishedFuture<>(new GridRestResponse(STATUS_AUTH_FAILED, e.getMessage()));
}
}
@@ -199,7 +199,7 @@ public class GridRestProcessor extends GridProcessorAdapter {
IgniteInternalFuture<GridRestResponse> res = hnd == null ? null : hnd.handleAsync(req);
if (res == null)
- return new GridFinishedFuture<>(ctx,
+ return new GridFinishedFuture<>(
new IgniteCheckedException("Failed to find registered handler for command: " + req.command()));
final SecurityContext subjCtx0 = subjCtx;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 983dd55..c63e414 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -297,12 +297,12 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
catch (IgniteException e) {
U.error(log, "Failed to execute cache command: " + req, e);
- return new GridFinishedFuture<>(ctx, e);
+ return new GridFinishedFuture<>(e);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to execute cache command: " + req, e);
- return new GridFinishedFuture<>(ctx, e);
+ return new GridFinishedFuture<>(e);
}
finally {
if (log.isDebugEnabled())
@@ -1043,7 +1043,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
assert metrics != null;
- return new GridFinishedFuture<Object>(ctx, new GridCacheRestMetrics(
+ return new GridFinishedFuture<Object>(new GridCacheRestMetrics(
(int)metrics.getCacheGets(),
(int)(metrics.getCacheRemovals() + metrics.getCachePuts()),
(int)metrics.getCacheHits(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java
index 50d82e6..5ee6418 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java
@@ -92,7 +92,7 @@ public class GridCacheQueryCommandHandler extends GridRestCommandHandlerAdapter
}
default:
- return new GridFinishedFutureEx<>(new IgniteCheckedException("Unsupported query command: " + req.command()));
+ return new GridFinishedFuture<>(new IgniteCheckedException("Unsupported query command: " + req.command()));
}
}
@@ -124,7 +124,7 @@ public class GridCacheQueryCommandHandler extends GridRestCommandHandlerAdapter
return ctx.closure().callLocalSafe(c, false);
else {
if (ctx.discovery().node(destId) == null)
- return new GridFinishedFutureEx<>(new IgniteCheckedException("Destination node ID has left the grid " +
+ return new GridFinishedFuture<>(new IgniteCheckedException("Destination node ID has left the grid " +
"(retry the query): " + destId));
ctx.task().setThreadContext(TC_NO_FAILOVER, true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java
index c45f2c7..5abbb77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java
@@ -83,13 +83,13 @@ public class DataStructuresCommandHandler extends GridRestCommandHandlerAdapter
IgniteCheckedException err =
new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("key"));
- return new GridFinishedFuture(ctx, err);
+ return new GridFinishedFuture(err);
}
else if (req.delta() == null) {
IgniteCheckedException err =
new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("delta"));
- return new GridFinishedFuture(ctx, err);
+ return new GridFinishedFuture(err);
}
return ctx.closure().callLocalSafe(new Callable<Object>() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 78b6bd1..87f0fc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -136,7 +136,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
catch (IgniteCheckedException e) {
U.error(log, "Failed to execute task command: " + req, e);
- return new GridFinishedFuture<>(ctx, e);
+ return new GridFinishedFuture<>(e);
}
finally {
if (log.isDebugEnabled())
@@ -159,7 +159,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
GridRestTaskRequest req0 = (GridRestTaskRequest) req;
- final GridFutureAdapter<GridRestResponse> fut = new GridFutureAdapter<>(ctx);
+ final GridFutureAdapter<GridRestResponse> fut = new GridFutureAdapter<>();
final GridRestResponse res = new GridRestResponse();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
index 7c2a15b..d563f9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
@@ -94,7 +94,7 @@ public class GridTopologyCommandHandler extends GridRestCommandHandlerAdapter {
final String ip = req0.nodeIp();
if (id == null && ip == null)
- return new GridFinishedFuture<>(ctx, new IgniteCheckedException(
+ return new GridFinishedFuture<>(new IgniteCheckedException(
"Failed to handle request (either id or ip should be specified)."));
ClusterNode node;
@@ -131,7 +131,7 @@ public class GridTopologyCommandHandler extends GridRestCommandHandlerAdapter {
if (log.isDebugEnabled())
log.debug("Handled topology REST request [res=" + res + ", req=" + req + ']');
- return new GridFinishedFuture<>(ctx, res);
+ return new GridFinishedFuture<>(res);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java
index c66de86..2bfb704 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java
@@ -54,6 +54,6 @@ public class GridVersionCommandHandler extends GridRestCommandHandlerAdapter {
assert SUPPORTED_COMMANDS.contains(req.command());
- return new GridFinishedFuture<>(ctx, new GridRestResponse(VER_STR));
+ return new GridFinishedFuture<>(new GridRestResponse(VER_STR));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
index 43e47cd..aa1abd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java
@@ -135,8 +135,8 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr
@Override public IgniteInternalFuture<GridRestResponse> apply(GridRestResponse res, Exception e) {
return handleRequest0(ses, req, cmd);
}
- },
- ctx);
+ }
+ );
}
if (f != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java
index da4733b..f05fdf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java
@@ -37,7 +37,7 @@ public class GridServiceDeploymentFuture extends GridFutureAdapter<Object> {
* @param cfg Configuration.
*/
public GridServiceDeploymentFuture(GridKernalContext ctx, ServiceConfiguration cfg) {
- super(ctx);
+ super();
this.cfg = cfg;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 1793beb..aac9d90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -386,7 +386,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
U.error(log, "Failed to deploy service: " + cfg.getName(), e);
- return new GridFinishedFuture<>(ctx, e);
+ return new GridFinishedFuture<>(e);
}
}
}
@@ -398,7 +398,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
public IgniteInternalFuture<?> cancel(String name) {
while (true) {
try {
- GridFutureAdapter<?> fut = new GridFutureAdapter<>(ctx);
+ GridFutureAdapter<?> fut = new GridFutureAdapter<>();
GridFutureAdapter<?> old;
@@ -424,7 +424,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
catch (IgniteCheckedException e) {
log.error("Failed to undeploy service: " + name, e);
- return new GridFinishedFuture<>(ctx, e);
+ return new GridFinishedFuture<>(e);
}
}
}
@@ -446,7 +446,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
futs.add(cancel(dep.configuration().getName()));
}
- return futs.isEmpty() ? new GridFinishedFuture<>(ctx) : new GridCompoundFuture(ctx, null, futs);
+ return futs.isEmpty() ? new GridFinishedFuture<>() : new GridCompoundFuture(ctx, null, futs);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java
index 31c4c0e..beb2820 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java
@@ -114,8 +114,6 @@ public class GridStreamerStageExecutionFuture extends GridFutureAdapter<Object>
String stageName,
Collection<?> evts
) {
- super(streamer.kernalContext());
-
assert streamer != null;
assert stageName != null;
assert evts != null;
@@ -226,7 +224,7 @@ public class GridStreamerStageExecutionFuture extends GridFutureAdapter<Object>
}
}
catch (IgniteCheckedException e) {
- onFailed(ctx.localNodeId(), e);
+ onFailed(streamer.kernalContext().localNodeId(), e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 0750259..7139564 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -78,7 +78,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* @param ctx Context.
*/
public GridCompoundFuture(GridKernalContext ctx) {
- super(ctx);
+ super();
}
/**
@@ -86,7 +86,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
* @param rdc Reducer.
*/
public GridCompoundFuture(GridKernalContext ctx, @Nullable IgniteReducer<T, R> rdc) {
- super(ctx);
+ super();
this.rdc = rdc;
}
@@ -98,7 +98,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
*/
public GridCompoundFuture(GridKernalContext ctx, @Nullable IgniteReducer<T, R> rdc,
@Nullable Iterable<IgniteInternalFuture<T>> futs) {
- super(ctx);
+ super();
this.rdc = rdc;
@@ -248,14 +248,14 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
res.compareAndSet(null, rdc.reduce(), false, true);
}
catch (RuntimeException e) {
- U.error(log, "Failed to execute compound future reducer: " + this, e);
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
onDone(e);
return;
}
catch (AssertionError e) {
- U.error(log, "Failed to execute compound future reducer: " + this, e);
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
onDone(e);
@@ -320,13 +320,13 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
res.compareAndSet(null, rdc.reduce(), false, true);
}
catch (RuntimeException e) {
- U.error(log, "Failed to execute compound future reducer: " + this, e);
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
// Exception in reducer is a bug, so we bypass checkComplete here.
onDone(e);
}
catch (AssertionError e) {
- U.error(log, "Failed to execute compound future reducer: " + this, e);
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
// Bypass checkComplete because need to rethrow.
onDone(e);
@@ -335,36 +335,36 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
}
}
catch (IgniteTxOptimisticCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Optimistic failure [fut=" + GridCompoundFuture.this + ", err=" + e + ']');
+// if (log.isDebugEnabled())
+// log.debug("Optimistic failure [fut=" + GridCompoundFuture.this + ", err=" + e + ']');
err.compareAndSet(null, e);
}
catch (ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Topology exception [fut=" + GridCompoundFuture.this + ", err=" + e + ']');
+// if (log.isDebugEnabled())
+// log.debug("Topology exception [fut=" + GridCompoundFuture.this + ", err=" + e + ']');
err.compareAndSet(null, e);
}
catch (IgniteFutureCancelledCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to execute compound future reducer [lsnr=" + this + ", e=" + e + ']');
+// if (log.isDebugEnabled())
+// log.debug("Failed to execute compound future reducer [lsnr=" + this + ", e=" + e + ']');
err.compareAndSet(null, e);
}
catch (IgniteCheckedException e) {
- if (!ignoreFailure(e))
- U.error(log, "Failed to execute compound future reducer: " + this, e);
+// if (!ignoreFailure(e))
+// U.error(log, "Failed to execute compound future reducer: " + this, e);
err.compareAndSet(null, e);
}
catch (RuntimeException e) {
- U.error(log, "Failed to execute compound future reducer: " + this, e);
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
err.compareAndSet(null, e);
}
catch (AssertionError e) {
- U.error(log, "Failed to execute compound future reducer: " + this, e);
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
// Bypass checkComplete because need to rethrow.
onDone(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
index 41f0ba4..b3ec22d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
@@ -45,13 +45,14 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> {
}
/**
- * @param ctx Context.
* @param embedded Embedded future.
* @param c Closure to execute upon completion of embedded future.
*/
- public GridEmbeddedFuture(GridKernalContext ctx, IgniteInternalFuture<B> embedded, final IgniteBiClosure<B, Exception, A> c) {
- super(ctx);
-
+ public GridEmbeddedFuture(
+ IgniteInternalFuture<B> embedded,
+ final IgniteBiClosure<B, Exception, A> c,
+ boolean fake
+ ) {
assert embedded != null;
assert c != null;
@@ -77,30 +78,11 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> {
/**
* Embeds futures. Specific change order of arguments to avoid conflicts.
- *
- * @param syncNotify Synchronous notify flag.
- * @param embedded Closure.
- * @param c Closure which runs upon completion of embedded closure and which returns another future.
- * @param ctx Context.
- */
- public GridEmbeddedFuture(boolean syncNotify, IgniteInternalFuture<B> embedded, IgniteBiClosure<B, Exception, IgniteInternalFuture<A>> c,
- GridKernalContext ctx) {
- this(embedded, c, ctx);
-
- syncNotify(syncNotify);
- }
-
- /**
- * Embeds futures. Specific change order of arguments to avoid conflicts.
- *
- * @param ctx Context.
- * @param embedded Closure.
+ * @param embedded Closure.
* @param c Closure which runs upon completion of embedded closure and which returns another future.
*/
- public GridEmbeddedFuture(IgniteInternalFuture<B> embedded, final IgniteBiClosure<B, Exception, IgniteInternalFuture<A>> c,
- GridKernalContext ctx) {
- super(ctx);
-
+ public GridEmbeddedFuture(
+ IgniteInternalFuture<B> embedded, final IgniteBiClosure<B, Exception, IgniteInternalFuture<A>> c) {
assert embedded != null;
assert c != null;
@@ -158,15 +140,15 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> {
/**
* Embeds futures.
*
- * @param ctx Context.
* @param embedded Future.
* @param c1 Closure which runs upon completion of embedded future and which returns another future.
* @param c2 Closure will runs upon completion of future returned by {@code c1} closure.
*/
- public GridEmbeddedFuture(GridKernalContext ctx, IgniteInternalFuture<B> embedded, final IgniteBiClosure<B, Exception,
- IgniteInternalFuture<A>> c1, final IgniteBiClosure<A, Exception, A> c2) {
- super(ctx);
-
+ public GridEmbeddedFuture(
+ IgniteInternalFuture<B> embedded, final IgniteBiClosure<B, Exception,
+ IgniteInternalFuture<A>> c1,
+ final IgniteBiClosure<A, Exception, A> c2
+ ) {
assert embedded != null;
assert c1 != null;
assert c2 != null;
@@ -268,7 +250,7 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> {
applyx(f);
}
catch (IgniteIllegalStateException ignore) {
- U.warn(log, "Will not execute future listener (grid is stopping): " + ctx.gridName());
+ U.warn(null, "Will not execute future listener (grid is stopping): " + this);
}
catch (Exception e) {
onDone(e);
@@ -300,7 +282,7 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> {
applyx(f);
}
catch (IgniteIllegalStateException ignore) {
- U.warn(log, "Will not execute future listener (grid is stopping): " + ctx.gridName());
+ U.warn(null, "Will not execute future listener (grid is stopping): " + this);
}
catch (Exception e) {
onDone(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
index c2f1b89..31f6734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
@@ -21,40 +21,24 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-import java.io.*;
import java.util.concurrent.*;
-import static org.apache.ignite.IgniteSystemProperties.*;
-
/**
* Future that is completed at creation time.
*/
-public class GridFinishedFuture<T> implements IgniteInternalFuture<T>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Synchronous notification flag. */
- private static final boolean SYNC_NOTIFY = IgniteSystemProperties.getBoolean(IGNITE_FUT_SYNC_NOTIFICATION, true);
-
+public class GridFinishedFuture<T> implements IgniteInternalFuture<T> {
/** Complete value. */
private T t;
/** Error. */
private Throwable err;
- /** Context. */
- protected GridKernalContext ctx;
-
/** Start time. */
private final long startTime = U.currentTimeMillis();
- /** Synchronous notification flag. */
- private volatile boolean syncNotify = SYNC_NOTIFY;
-
/**
- * Empty constructor required for {@link Externalizable}.
+ * Creates finished future with complete value.
*/
public GridFinishedFuture() {
// No-op.
@@ -63,43 +47,31 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T>, Externali
/**
* Creates finished future with complete value.
*
- * @param ctx Context.
- */
- public GridFinishedFuture(GridKernalContext ctx) {
- assert ctx != null;
-
- this.ctx = ctx;
-
- t = null;
- err = null;
- }
-
- /**
- * Creates finished future with complete value.
- *
- * @param ctx Context.
* @param t Finished value.
*/
- public GridFinishedFuture(GridKernalContext ctx, T t) {
- assert ctx != null;
-
- this.ctx = ctx;
+ public GridFinishedFuture(T t) {
this.t = t;
-
- err = null;
}
/**
- * @param ctx Context.
* @param err Future error.
*/
- public GridFinishedFuture(GridKernalContext ctx, Throwable err) {
- assert ctx != null;
-
- this.ctx = ctx;
+ public GridFinishedFuture(Throwable err) {
this.err = err;
+ }
- t = null;
+ /**
+ * @return Value of error.
+ */
+ protected Throwable error() {
+ return err;
+ }
+
+ /**
+ * @return Value of result.
+ */
+ protected T result() {
+ return t;
}
/** {@inheritDoc} */
@@ -113,26 +85,6 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T>, Externali
}
/** {@inheritDoc} */
- @Override public boolean concurrentNotify() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void concurrentNotify(boolean concurNotify) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void syncNotify(boolean syncNotify) {
- this.syncNotify = syncNotify;
- }
-
- /** {@inheritDoc} */
- @Override public boolean syncNotify() {
- return syncNotify;
- }
-
- /** {@inheritDoc} */
@Override public boolean cancel() {
return false;
}
@@ -167,52 +119,24 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T>, Externali
/** {@inheritDoc} */
@Override public void listenAsync(final IgniteInClosure<? super IgniteInternalFuture<T>> lsnr) {
- if (ctx == null)
- throw new IllegalStateException("Cannot attach listener to deserialized future (context is null): " + this);
-
- if (lsnr != null) {
- if (syncNotify)
- lsnr.apply(this);
- else
- ctx.closure().runLocalSafe(new GPR() {
- @Override public void run() {
- lsnr.apply(GridFinishedFuture.this);
- }
- }, true);
- }
+ if (lsnr != null)
+ lsnr.apply(this);
}
/** {@inheritDoc} */
@Override public <R> IgniteInternalFuture<R> chain(final IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb) {
- GridFutureAdapter<R> fut = new GridFutureAdapter<R>(ctx, syncNotify) {
+ GridFutureAdapter<R> fut = new GridFutureAdapter<R>() {
@Override public String toString() {
return "ChainFuture[orig=" + GridFinishedFuture.this + ", doneCb=" + doneCb + ']';
}
};
- listenAsync(new GridFutureChainListener<>(ctx, fut, doneCb));
+ listenAsync(new GridFutureChainListener<>(fut, doneCb));
return fut;
}
/** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(t);
- out.writeObject(err);
- out.writeObject(ctx);
- out.writeBoolean(syncNotify);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- t = (T)in.readObject();
- err = (Throwable)in.readObject();
- ctx = (GridKernalContext)in.readObject();
- syncNotify = in.readBoolean();
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridFinishedFuture.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFutureEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFutureEx.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFutureEx.java
deleted file mode 100644
index 84323be..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFutureEx.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.future;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.concurrent.*;
-
-/**
- * Future that is completed at creation time. This future is different from
- * {@link GridFinishedFuture} as it does not take context as a parameter and
- * performs notifications in the same thread.
- */
-public class GridFinishedFutureEx<T> implements IgniteInternalFuture<T>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Complete value. */
- private T t;
-
- /** Error. */
- private Throwable err;
-
- /** Start time. */
- private final long startTime = U.currentTimeMillis();
-
- /**
- * Created finished future with {@code null} value.
- */
- public GridFinishedFutureEx() {
- this(null, null);
- }
-
- /**
- * Creates finished future with complete value.
- *
- * @param t Finished value.
- */
- public GridFinishedFutureEx(T t) {
- this(t, null);
- }
-
- /**
- * @param err Future error.
- */
- public GridFinishedFutureEx(Throwable err) {
- this(null, err);
- }
-
- /**
- * Creates finished future with complete value and error.
- *
- * @param t Finished value.
- * @param err Future error.
- */
- public GridFinishedFutureEx(T t, Throwable err) {
- this.err = err;
-
- this.t = t;
- }
-
- /** {@inheritDoc} */
- @Override public long startTime() {
- return startTime;
- }
-
- /** {@inheritDoc} */
- @Override public long duration() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public boolean concurrentNotify() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void concurrentNotify(boolean concurNotify) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void syncNotify(boolean syncNotify) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean syncNotify() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean cancel() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isCancelled() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isDone() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public T get() throws IgniteCheckedException {
- if (err != null)
- throw U.cast(err);
-
- return t;
- }
-
- /** {@inheritDoc} */
- @Override public T get(long timeout) throws IgniteCheckedException {
- return get();
- }
-
- /** {@inheritDoc} */
- @Override public T get(long timeout, TimeUnit unit) throws IgniteCheckedException {
- return get();
- }
-
- /** {@inheritDoc} */
- @Override public <R> IgniteInternalFuture<R> chain(IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb) {
- try {
- return new GridFinishedFutureEx<>(doneCb.apply(this));
- }
- catch (GridClosureException e) {
- return new GridFinishedFutureEx<>(U.unwrap(e));
- }
- catch (RuntimeException | Error e) {
- U.warn(null, "Failed to notify chained future [doneCb=" + doneCb + ", err=" + e.getMessage() + ']');
-
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void listenAsync(IgniteInClosure<? super IgniteInternalFuture<T>> lsnr) {
- if (lsnr != null)
- lsnr.apply(this);
- }
-
- /**
- * @return {@code True} if future failed.
- */
- protected boolean failed() {
- return err != null;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(t);
- out.writeObject(err);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- t = (T)in.readObject();
- err = (Throwable)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridFinishedFutureEx.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 4beb89a..b107c15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -24,35 +24,17 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
-import java.io.*;
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
-import static org.apache.ignite.IgniteSystemProperties.*;
-
/**
* Future adapter.
*/
-public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R>,
- Externalizable {
+public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R> {
/** */
private static final long serialVersionUID = 0L;
- /** Logger reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- protected static IgniteLogger log;
-
- /** Synchronous notification flag. */
- private static final boolean SYNC_NOTIFY = IgniteSystemProperties.getBoolean(IGNITE_FUT_SYNC_NOTIFICATION, true);
-
- /** Concurrent notification flag. */
- private static final boolean CONCUR_NOTIFY =
- IgniteSystemProperties.getBoolean(IGNITE_FUT_CONCURRENT_NOTIFICATION, false);
-
/** Initial state. */
private static final int INIT = 0;
@@ -75,52 +57,12 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
/** Future end time. */
private volatile long endTime;
- /** Set to {@code false} on deserialization whenever incomplete future is serialized. */
- private boolean valid = true;
-
/** Asynchronous listeners. */
private Collection<IgniteInClosure<? super IgniteInternalFuture<R>>> lsnrs;
- /** Context. */
- protected GridKernalContext ctx;
-
- /** Synchronous notification flag. */
- private volatile boolean syncNotify = SYNC_NOTIFY;
-
- /** Concurrent notification flag. */
- private volatile boolean concurNotify = CONCUR_NOTIFY;
-
/** */
private final Object mux = new Object();
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public GridFutureAdapter() {
- // No-op.
- }
-
- /**
- * @param ctx Kernal context.
- */
- public GridFutureAdapter(GridKernalContext ctx) {
- this(ctx, SYNC_NOTIFY);
- }
-
- /**
- * @param syncNotify Synchronous notify flag.
- * @param ctx Kernal context.
- */
- public GridFutureAdapter(GridKernalContext ctx, boolean syncNotify) {
- assert ctx != null;
-
- this.syncNotify = syncNotify;
-
- this.ctx = ctx;
-
- log = U.logger(ctx, logRef, GridFutureAdapter.class);
- }
-
/** {@inheritDoc} */
@Override public long startTime() {
return startTime;
@@ -140,48 +82,10 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
return endTime;
}
- /** {@inheritDoc} */
- @Override public boolean concurrentNotify() {
- return concurNotify;
- }
-
- /** {@inheritDoc} */
- @Override public void concurrentNotify(boolean concurNotify) {
- this.concurNotify = concurNotify;
- }
-
- /** {@inheritDoc} */
- @Override public boolean syncNotify() {
- return syncNotify;
- }
-
- /** {@inheritDoc} */
- @Override public void syncNotify(boolean syncNotify) {
- this.syncNotify = syncNotify;
- }
-
- /**
- * Checks that future is in usable state.
- */
- protected void checkValid() {
- if (!valid)
- throw new IllegalStateException("Incomplete future was serialized and cannot " +
- "be used after deserialization.");
- }
-
- /**
- * @return Valid flag.
- */
- protected boolean isValid() {
- return valid;
- }
-
/**
* @return Value of error.
*/
protected Throwable error() {
- checkValid();
-
return err;
}
@@ -189,15 +93,11 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
* @return Value of result.
*/
protected R result() {
- checkValid();
-
return res;
}
/** {@inheritDoc} */
@Override public R get() throws IgniteCheckedException {
- checkValid();
-
try {
if (endTime == 0)
acquireSharedInterruptibly(0);
@@ -228,8 +128,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout);
A.notNull(unit, "unit");
- checkValid();
-
try {
return get0(unit.toNanos(timeout));
}
@@ -263,8 +161,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
/** {@inheritDoc} */
@Override public void listenAsync(@Nullable final IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
if (lsnr != null) {
- checkValid();
-
boolean done = isDone();
if (!done) {
@@ -281,27 +177,14 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
}
if (done) {
- try {
- if (syncNotify)
- notifyListener(lsnr);
- else {
- ctx.closure().runLocalSafe(new GPR() {
- @Override public void run() {
- notifyListener(lsnr);
- }
- }, true);
- }
- }
- catch (IllegalStateException ignore) {
- U.warn(null, "Future notification will not proceed because grid is stopped: " + ctx.gridName());
- }
+ notifyListener(lsnr);
}
}
}
/** {@inheritDoc} */
@Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
- return new ChainFuture<>(ctx, syncNotify, this, doneCb);
+ return new ChainFuture<>(this, doneCb);
}
/**
@@ -321,30 +204,8 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
assert !lsnrs0.isEmpty();
- if (concurNotify) {
- for (final IgniteInClosure<? super IgniteInternalFuture<R>> lsnr : lsnrs0)
- ctx.closure().runLocalSafe(new GPR() {
- @Override public void run() {
- notifyListener(lsnr);
- }
- }, true);
- }
- else {
- // Always notify in the thread different from start thread.
- if (!syncNotify) {
- ctx.closure().runLocalSafe(new GPR() {
- @Override public void run() {
- // Since concurrent notifications are off, we notify
- // all listeners in one thread.
- for (IgniteInClosure<? super IgniteInternalFuture<R>> lsnr : lsnrs0)
- notifyListener(lsnr);
- }
- }, true);
- }
- else
- for (IgniteInClosure<? super IgniteInternalFuture<R>> lsnr : lsnrs0)
- notifyListener(lsnr);
- }
+ for (IgniteInClosure<? super IgniteInternalFuture<R>> lsnr : lsnrs0)
+ notifyListener(lsnr);
}
/**
@@ -359,11 +220,11 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
lsnr.apply(this);
}
catch (IllegalStateException e) {
- U.warn(null, "Failed to notify listener (is grid stopped?) [grid=" + ctx.gridName() +
+ U.warn(null, "Failed to notify listener (is grid stopped?) [fut=" + this +
", lsnr=" + lsnr + ", err=" + e.getMessage() + ']');
}
catch (RuntimeException | Error e) {
- U.error(log, "Failed to notify listener: " + lsnr, e);
+ U.error(null, "Failed to notify listener: " + lsnr, e);
throw e;
}
@@ -376,8 +237,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
* indeed did happen.
*/
@Override public boolean cancel() throws IgniteCheckedException {
- checkValid();
-
return false;
}
@@ -398,8 +257,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
/** {@inheritDoc} */
@Override public boolean isCancelled() {
- checkValid();
-
return getState() == CANCELLED;
}
@@ -454,8 +311,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
* @return {@code True} if result was set by this call.
*/
private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
- checkValid();
-
boolean notify = false;
try {
@@ -510,54 +365,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
}
/** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- int state0 = getState();
-
- out.writeByte(state0);
- out.writeBoolean(syncNotify);
- out.writeBoolean(concurNotify);
-
- // Don't write any further if not done, as deserialized future
- // will be invalid anyways.
- if (state0 != INIT) {
- try {
- acquireSharedInterruptibly(0);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IOException("Thread has been interrupted.", e);
- }
-
- out.writeObject(res);
- out.writeObject(err);
- out.writeObject(ctx);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- int state0 = in.readByte();
-
- setState(state0);
-
- syncNotify = in.readBoolean();
- concurNotify = in.readBoolean();
-
- if (state0 == INIT)
- valid = false;
- else {
- res = (R)in.readObject();
- err = (Throwable)in.readObject();
- ctx = (GridKernalContext)in.readObject();
-
- // Prevent any thread from being locked on deserialized future.
- // This will also set 'endTime'.
- releaseShared(0);
- }
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridFutureAdapter.class, this, "state", state());
}
@@ -583,19 +390,19 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
}
/**
- * @param ctx Context.
- * @param syncNotify Sync notify flag.
* @param fut Future.
* @param doneCb Closure.
*/
- ChainFuture(GridKernalContext ctx, boolean syncNotify,
- GridFutureAdapter<R> fut, IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
- super(ctx, syncNotify);
+ ChainFuture(
+ GridFutureAdapter<R> fut,
+ IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb
+ ) {
+ super();
this.fut = fut;
this.doneCb = doneCb;
- fut.listenAsync(new GridFutureChainListener<>(ctx, this, doneCb));
+ fut.listenAsync(new GridFutureChainListener<>(this, doneCb));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapterEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapterEx.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapterEx.java
deleted file mode 100644
index ccce6e7..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapterEx.java
+++ /dev/null
@@ -1,500 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.future;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Future adapter without kernal context.
- */
-public class GridFutureAdapterEx<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Initial state. */
- private static final int INIT = 0;
-
- /** Cancelled state. */
- private static final int CANCELLED = 1;
-
- /** Done state. */
- private static final int DONE = 2;
-
- /** Result. */
- @GridToStringInclude
- private R res;
-
- /** Error. */
- private Throwable err;
-
- /** Future start time. */
- private final long startTime = U.currentTimeMillis();
-
- /** Future end time. */
- private volatile long endTime;
-
- /** Set to {@code false} on deserialization whenever incomplete future is serialized. */
- private boolean valid = true;
-
- /** Asynchronous listener. */
- private final ConcurrentLinkedDeque8<IgniteInClosure<? super IgniteInternalFuture<R>>>
- lsnrs = new ConcurrentLinkedDeque8<>();
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- @SuppressWarnings("RedundantNoArgConstructor")
- public GridFutureAdapterEx() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public long startTime() {
- return startTime;
- }
-
- /** {@inheritDoc} */
- @Override public long duration() {
- long endTime = this.endTime;
-
- return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime;
- }
-
- /** {@inheritDoc} */
- @Override public boolean concurrentNotify() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void concurrentNotify(boolean concurNotify) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean syncNotify() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void syncNotify(boolean syncNotify) {
- // No-op
- }
-
- /**
- * Checks that future is in usable state.
- */
- protected void checkValid() {
- if (!valid)
- throw new IllegalStateException("Incomplete future was serialized and cannot " +
- "be used after deserialization.");
- }
-
- /**
- * @return Valid flag.
- */
- protected boolean isValid() {
- return valid;
- }
-
- /**
- * @return Value of error.
- */
- protected Throwable error() {
- checkValid();
-
- return err;
- }
-
- /**
- * @return Value of result.
- */
- protected R result() {
- checkValid();
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public R get() throws IgniteCheckedException {
- checkValid();
-
- try {
- if (endTime == 0)
- acquireSharedInterruptibly(0);
-
- if (getState() == CANCELLED)
- throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
-
- if (err != null)
- throw U.cast(err);
-
- return res;
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public R get(long timeout) throws IgniteCheckedException {
- // Do not replace with static import, as it may not compile.
- return get(timeout, TimeUnit.MILLISECONDS);
- }
-
- /** {@inheritDoc} */
- @Override public R get(long timeout, TimeUnit unit) throws IgniteCheckedException {
- A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout);
- A.notNull(unit, "unit");
-
- checkValid();
-
- try {
- return get0(unit.toNanos(timeout));
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e);
- }
- }
-
- /**
- * @param nanosTimeout Timeout (nanoseconds).
- * @return Result.
- * @throws InterruptedException If interrupted.
- * @throws org.apache.ignite.internal.IgniteFutureTimeoutCheckedException If timeout reached before computation completed.
- * @throws IgniteCheckedException If error occurred.
- */
- @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException {
- if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout))
- throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed.");
-
- if (getState() == CANCELLED)
- throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this);
-
- if (err != null)
- throw U.cast(err);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked", "TooBroadScope"})
- @Override public void listenAsync(@Nullable final IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
- if (lsnr != null) {
- checkValid();
-
- boolean done;
-
- IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0 = lsnr;
-
- done = isDone();
-
- if (!done) {
- lsnr0 = new IgniteInClosure<IgniteInternalFuture<R>>() {
- private final AtomicBoolean called = new AtomicBoolean();
-
- @Override public void apply(IgniteInternalFuture<R> t) {
- if (called.compareAndSet(false, true))
- lsnr.apply(t);
- }
-
- @Override public boolean equals(Object o) {
- return o != null && (o == this || o == lsnr || o.equals(lsnr));
- }
-
- @Override public String toString() {
- return lsnr.toString();
- }
- };
-
- lsnrs.add(lsnr0);
-
- done = isDone(); // Double check.
- }
-
- if (done)
- notifyListener(lsnr0);
- }
- }
-
- /** {@inheritDoc} */
- @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
- final GridFutureAdapterEx<T> fut = new GridFutureAdapterEx<T>() {
- @Override public String toString() {
- return "ChainFuture[orig=" + GridFutureAdapterEx.this + ", doneCb=" + doneCb + ']';
- }
- };
-
- listenAsync(new IgniteInClosure<IgniteInternalFuture<R>>() {
- @Override public void apply(IgniteInternalFuture<R> t) {
- try {
- fut.onDone(doneCb.apply(t));
- }
- catch (GridClosureException e) {
- fut.onDone(e.unwrap());
- }
- catch (RuntimeException e) {
- U.warn(null, "Failed to notify chained future (is grid stopped?) [, doneCb=" + doneCb +
- ", err=" + e.getMessage() + ']');
-
- fut.onDone(e);
-
- throw e;
- }
- catch (Error e) {
- U.warn(null, "Failed to notify chained future (is grid stopped?) [doneCb=" + doneCb +
- ", err=" + e.getMessage() + ']');
-
- fut.onDone(e);
-
- throw e;
- }
- }
- });
-
- return fut;
- }
-
- /**
- * Notifies all registered listeners.
- */
- private void notifyListeners() {
- if (lsnrs.isEmptyx())
- return;
-
- for (IgniteInClosure<? super IgniteInternalFuture<R>> lsnr : lsnrs)
- notifyListener(lsnr);
- }
-
- /**
- * Notifies single listener.
- *
- * @param lsnr Listener.
- */
- private void notifyListener(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
- assert lsnr != null;
-
- try {
- lsnr.apply(this);
- }
- catch (IllegalStateException e) {
- U.warn(null, "Failed to notify listener (is grid stopped?) [lsnr=" + lsnr +
- ", err=" + e.getMessage() + ']');
- }
- catch (RuntimeException | Error e) {
- U.error(null, "Failed to notify listener: " + lsnr, e);
-
- throw e;
- }
- }
-
- /**
- * Default no-op implementation that always returns {@code false}.
- * Futures that do support cancellation should override this method
- * and call {@link #onCancelled()} callback explicitly if cancellation
- * indeed did happen.
- */
- @Override public boolean cancel() throws IgniteCheckedException {
- checkValid();
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isDone() {
- // Don't check for "valid" here, as "done" flag can be read
- // even in invalid state.
- return endTime != 0;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isCancelled() {
- checkValid();
-
- return getState() == CANCELLED;
- }
-
- /**
- * Callback to notify that future is finished with {@code null} result.
- * This method must delegate to {@link #onDone(Object, Throwable)} method.
- *
- * @return {@code True} if result was set by this call.
- */
- public final boolean onDone() {
- return onDone(null, null);
- }
-
- /**
- * Callback to notify that future is finished.
- * This method must delegate to {@link #onDone(Object, Throwable)} method.
- *
- * @param res Result.
- * @return {@code True} if result was set by this call.
- */
- public final boolean onDone(@Nullable R res) {
- return onDone(res, null);
- }
-
- /**
- * Callback to notify that future is finished.
- * This method must delegate to {@link #onDone(Object, Throwable)} method.
- *
- * @param err Error.
- * @return {@code True} if result was set by this call.
- */
- public final boolean onDone(@Nullable Throwable err) {
- return onDone(null, err);
- }
-
- /**
- * Callback to notify that future is finished. Note that if non-{@code null} exception is passed in
- * the result value will be ignored.
- *
- * @param res Optional result.
- * @param err Optional error.
- * @return {@code True} if result was set by this call.
- */
- public boolean onDone(@Nullable R res, @Nullable Throwable err) {
- return onDone(res, err, false);
- }
-
- /**
- * @param res Result.
- * @param err Error.
- * @param cancel {@code True} if future is being cancelled.
- * @return {@code True} if result was set by this call.
- */
- private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
- checkValid();
-
- boolean notify = false;
-
- try {
- if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) {
- this.res = res;
- this.err = err;
-
- notify = true;
-
- releaseShared(0);
-
- return true;
- }
-
- return false;
- }
- finally {
- if (notify)
- notifyListeners();
- }
- }
-
- /**
- * Callback to notify that future is cancelled.
- *
- * @return {@code True} if cancel flag was set by this call.
- */
- public boolean onCancelled() {
- return onDone(null, null, true);
- }
-
- /** {@inheritDoc} */
- @Override protected final int tryAcquireShared(int ignore) {
- return endTime != 0 ? 1 : -1;
- }
-
- /** {@inheritDoc} */
- @Override protected final boolean tryReleaseShared(int ignore) {
- endTime = U.currentTimeMillis();
-
- // Always signal after setting final done status.
- return true;
- }
-
- /**
- * @return String representation of state.
- */
- private String state() {
- int s = getState();
-
- return s == INIT ? "INIT" : s == CANCELLED ? "CANCELLED" : "DONE";
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- int state0 = getState();
-
- out.writeByte(state0);
-
- // Don't write any further if not done, as deserialized future
- // will be invalid anyways.
- if (state0 != INIT) {
- try {
- acquireSharedInterruptibly(0);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IOException("Thread has been interrupted.", e);
- }
-
- out.writeObject(res);
- out.writeObject(err);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- int state0 = in.readByte();
-
- setState(state0);
-
- if (state0 == INIT)
- valid = false;
- else {
- res = (R)in.readObject();
- err = (Throwable)in.readObject();
-
- // Prevent any thread from being locked on deserialized future.
- // This will also set 'endTime'.
- releaseShared(0);
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridFutureAdapterEx.class, this, "state", state());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
index 0599053..d98538e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
@@ -29,9 +29,6 @@ public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInte
/** */
private static final long serialVersionUID = 0L;
- /** Context. */
- private final GridKernalContext ctx;
-
/** Target future. */
private final GridFutureAdapter<R> fut;
@@ -40,14 +37,13 @@ public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInte
/**
* Constructs chain listener.
- *
- * @param ctx Kernal context.
- * @param fut Target future.
+ * @param fut Target future.
* @param doneCb Done callback.
*/
- public GridFutureChainListener(GridKernalContext ctx, GridFutureAdapter<R> fut,
- IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb) {
- this.ctx = ctx;
+ public GridFutureChainListener(
+ GridFutureAdapter<R> fut,
+ IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb
+ ) {
this.fut = fut;
this.doneCb = doneCb;
}
@@ -61,8 +57,8 @@ public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInte
fut.onDone(e.unwrap());
}
catch (RuntimeException | Error e) {
- U.warn(null, "Failed to notify chained future (is grid stopped?) [grid=" + ctx.gridName() +
- ", doneCb=" + doneCb + ", err=" + e.getMessage() + ']');
+ U.warn(null, "Failed to notify chained future (is grid stopped?) [doneCb=" + doneCb +
+ ", err=" + e.getMessage() + ']');
fut.onDone(e);