You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/02/02 15:02:08 UTC
[1/2] incubator-ignite git commit: # ignite-26
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-26 6e104b80a -> addef7ebf
# ignite-26
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cf72dda0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cf72dda0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cf72dda0
Branch: refs/heads/ignite-26
Commit: cf72dda0913a86f0cf4c0a7eb7d477572e4371fa
Parents: 0693fa9
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 2 17:01:38 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 2 17:01:38 2015 +0300
----------------------------------------------------------------------
.../ignite/compute/ComputeTaskSession.java | 2 +-
.../internal/ComputeTaskInternalFuture.java | 139 +++++++++++++
.../ignite/internal/GridJobSessionImpl.java | 4 +-
.../ignite/internal/GridTaskSessionImpl.java | 10 +-
.../closure/GridClosureProcessor.java | 104 +++++-----
.../util/future/IgniteFinishedFutureImpl.java | 7 +
.../org/apache/ignite/GridTestTaskSession.java | 2 +-
.../internal/GridSpiExceptionSelfTest.java | 8 +-
.../IgniteComputeEmptyClusterGroupTest.java | 198 +++++++++++++++++++
.../collision/GridTestCollisionTaskSession.java | 2 +-
.../testsuites/IgniteComputeGridTestSuite.java | 1 +
11 files changed, 416 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
index 9dc83fc..3e5f805 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
@@ -440,5 +440,5 @@ public interface ComputeTaskSession {
*
* @return Future that will be completed when task "<tt>map</tt>" step has completed.
*/
- public IgniteInternalFuture<?> mapFuture();
+ public IgniteFuture<?> mapFuture();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java
index a6a6004..a5a4574 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java
@@ -21,9 +21,11 @@ import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
import java.io.*;
import java.util.*;
@@ -70,6 +72,143 @@ public class ComputeTaskInternalFuture<R> extends GridFutureAdapter<R> {
}
/**
+ * @param ctx Context.
+ * @param taskCls Task class.
+ * @param e Error.
+ * @return Finished task future.
+ */
+ public static <R> ComputeTaskInternalFuture<R> finishedFuture(final GridKernalContext ctx,
+ final Class<?> taskCls,
+ IgniteCheckedException e) {
+ assert ctx != null;
+ assert taskCls != null;
+ assert e != null;
+
+ final long time = U.currentTimeMillis();
+
+ final IgniteUuid id = IgniteUuid.fromUuid(ctx.localNodeId());
+
+ ComputeTaskSession ses = new ComputeTaskSession() {
+ @Override public String getTaskName() {
+ return taskCls.getName();
+ }
+
+ @Override public UUID getTaskNodeId() {
+ return ctx.localNodeId();
+ }
+
+ @Override public long getStartTime() {
+ return time;
+ }
+
+ @Override public long getEndTime() {
+ return time;
+ }
+
+ @Override public IgniteUuid getId() {
+ return id;
+ }
+
+ @Override public ClassLoader getClassLoader() {
+ return null;
+ }
+
+ @Override public Collection<ComputeJobSibling> getJobSiblings() throws IgniteException {
+ return Collections.emptyList();
+ }
+
+ @Override public Collection<ComputeJobSibling> refreshJobSiblings() throws IgniteException {
+ return Collections.emptyList();
+ }
+
+ @Nullable @Override public ComputeJobSibling getJobSibling(IgniteUuid jobId) throws IgniteException {
+ return null;
+ }
+
+ @Override public void setAttribute(Object key, @Nullable Object val) throws IgniteException {
+ }
+
+ @Nullable @Override public <K, V> V getAttribute(K key) {
+ return null;
+ }
+
+ @Override public void setAttributes(Map<?, ?> attrs) throws IgniteException {
+ // No-op.
+ }
+
+ @Override public Map<?, ?> getAttributes() {
+ return Collections.emptyMap();
+ }
+
+ @Override public void addAttributeListener(ComputeTaskSessionAttributeListener lsnr, boolean rewind) {
+ // No-op.
+ }
+
+ @Override public boolean removeAttributeListener(ComputeTaskSessionAttributeListener lsnr) {
+ return false;
+ }
+
+ @Override public <K, V> V waitForAttribute(K key, long timeout) throws InterruptedException {
+ throw new InterruptedException("Session was closed.");
+ }
+
+ @Override public <K, V> boolean waitForAttribute(K key, @Nullable V val, long timeout) throws InterruptedException {
+ throw new InterruptedException("Session was closed.");
+ }
+
+ @Override public Map<?, ?> waitForAttributes(Collection<?> keys, long timeout) throws InterruptedException {
+ throw new InterruptedException("Session was closed.");
+ }
+
+ @Override public boolean waitForAttributes(Map<?, ?> attrs, long timeout) throws InterruptedException {
+ throw new InterruptedException("Session was closed.");
+ }
+
+ @Override public void saveCheckpoint(String key, Object state) {
+ throw new IgniteException("Session was closed.");
+ }
+
+ @Override public void saveCheckpoint(String key,
+ Object state,
+ ComputeTaskSessionScope scope,
+ long timeout)
+ {
+ throw new IgniteException("Session was closed.");
+ }
+
+ @Override public void saveCheckpoint(String key,
+ Object state,
+ ComputeTaskSessionScope scope,
+ long timeout,
+ boolean overwrite) {
+ throw new IgniteException("Session was closed.");
+ }
+
+ @Nullable @Override public <T> T loadCheckpoint(String key) throws IgniteException {
+ throw new IgniteException("Session was closed.");
+ }
+
+ @Override public boolean removeCheckpoint(String key) throws IgniteException {
+ throw new IgniteException("Session was closed.");
+ }
+
+ @Override public Collection<UUID> getTopology() {
+ return Collections.emptyList();
+ }
+
+ @Override public IgniteFuture<?> mapFuture() {
+ return new IgniteFinishedFutureImpl<Object>(ctx);
+ }
+ };
+
+ ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx);
+
+ fut.onDone(e);
+
+ return fut;
+ }
+
+ /**
* @return Future returned by public API.
*/
public ComputeTaskFuture<R> publicFuture() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
index ef90408..5b26961 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
@@ -291,8 +291,8 @@ public class GridJobSessionImpl implements GridTaskSessionInternal {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> mapFuture() {
- return new GridFinishedFuture<>(ctx);
+ @Override public IgniteFuture<?> mapFuture() {
+ return new IgniteFinishedFutureImpl<>(ctx, null);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
index 10283f8..be9ade4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
@@ -97,7 +97,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
private final UUID subjId;
/** */
- private final GridFutureAdapter mapFut;
+ private final IgniteFutureImpl mapFut;
/**
* @param taskNodeId Task node ID.
@@ -156,7 +156,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
this.fullSup = fullSup;
this.subjId = subjId;
- mapFut = new GridFutureAdapter(ctx);
+ mapFut = new IgniteFutureImpl(new GridFutureAdapter(ctx));
}
/** {@inheritDoc} */
@@ -832,18 +832,18 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
* Task map callback.
*/
public void onMapped() {
- mapFut.onDone();
+ ((GridFutureAdapter)mapFut.internalFuture()).onDone();
}
/**
* Finish task callback.
*/
public void onDone() {
- mapFut.onDone();
+ ((GridFutureAdapter)mapFut.internalFuture()).onDone();
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> mapFuture() {
+ @Override public IgniteFuture<?> mapFuture() {
return mapFut;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index f521161..19d15b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -124,7 +124,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param nodes Grid nodes.
* @return Task execution future.
*/
- public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs,
+ public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs,
@Nullable Collection<ClusterNode> nodes) {
return runAsync(mode, jobs, nodes, false);
}
@@ -136,18 +136,19 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param sys If {@code true}, then system pool will be used.
* @return Task execution future.
*/
- public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs,
- @Nullable Collection<ClusterNode> nodes, boolean sys) {
+ public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
+ Collection<? extends Runnable> jobs,
+ @Nullable Collection<ClusterNode> nodes,
+ boolean sys)
+ {
assert mode != null;
+ assert !F.isEmpty(jobs) : jobs;
enterBusy();
try {
- if (F.isEmpty(jobs))
- return new GridFinishedFuture(ctx);
-
if (F.isEmpty(nodes))
- return new GridFinishedFuture(ctx, U.emptyTopologyException());
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
@@ -164,7 +165,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param nodes Grid nodes.
* @return Task execution future.
*/
- public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job,
+ public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable job,
@Nullable Collection<ClusterNode> nodes) {
return runAsync(mode, job, nodes, false);
}
@@ -176,18 +177,19 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param sys If {@code true}, then system pool will be used.
* @return Task execution future.
*/
- public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job,
- @Nullable Collection<ClusterNode> nodes, boolean sys) {
+ public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
+ Runnable job,
+ @Nullable Collection<ClusterNode> nodes,
+ boolean sys)
+ {
assert mode != null;
+ assert job != null;
enterBusy();
try {
- if (job == null)
- return new GridFinishedFuture(ctx);
-
if (F.isEmpty(nodes))
- return new GridFinishedFuture(ctx, U.emptyTopologyException());
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T2.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
@@ -314,19 +316,20 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param <R2> Type.
* @return Reduced result.
*/
- public <R1, R2> IgniteInternalFuture<R2> forkjoinAsync(GridClosureCallMode mode,
- @Nullable Collection<? extends Callable<R1>> jobs,
- @Nullable IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) {
+ public <R1, R2> ComputeTaskInternalFuture<R2> forkjoinAsync(GridClosureCallMode mode,
+ Collection<? extends Callable<R1>> jobs,
+ IgniteReducer<R1, R2> rdc,
+ @Nullable Collection<ClusterNode> nodes)
+ {
assert mode != null;
+ assert rdc != null;
+ assert !F.isEmpty(jobs);
enterBusy();
try {
- if (F.isEmpty(jobs) || rdc == null)
- return new GridFinishedFuture<>(ctx);
-
if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T3.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
@@ -344,7 +347,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
- public <R> IgniteInternalFuture<Collection<R>> callAsync(
+ public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(
GridClosureCallMode mode,
@Nullable Collection<? extends Callable<R>> jobs,
@Nullable Collection<ClusterNode> nodes) {
@@ -359,19 +362,19 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
- public <R> IgniteInternalFuture<Collection<R>> callAsync(GridClosureCallMode mode,
- @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes,
- boolean sys) {
+ public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(GridClosureCallMode mode,
+ Collection<? extends Callable<R>> jobs,
+ @Nullable Collection<ClusterNode> nodes,
+ boolean sys)
+ {
assert mode != null;
+ assert !F.isEmpty(jobs);
enterBusy();
try {
- if (F.isEmpty(jobs))
- return new GridFinishedFuture<>(ctx);
-
if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T6.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
@@ -390,7 +393,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
- public <R> IgniteInternalFuture<R> callAsync(GridClosureCallMode mode,
+ public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
@Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) {
return callAsync(mode, job, nodes, false);
}
@@ -402,13 +405,13 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param nodes Grid nodes.
* @return Job future.
*/
- public <R> IgniteInternalFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job,
+ public <R> ComputeTaskInternalFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job,
@Nullable Collection<ClusterNode> nodes) {
enterBusy();
try {
if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException());
// In case cache key is passed instead of affinity key.
final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
@@ -418,7 +421,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
return ctx.task().execute(new T5<>(cacheName, affKey0, job), null, false);
}
catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(ctx, e);
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e);
}
finally {
leaveBusy();
@@ -432,13 +435,13 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param nodes Grid nodes.
* @return Job future.
*/
- public IgniteInternalFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job,
+ public ComputeTaskInternalFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job,
@Nullable Collection<ClusterNode> nodes) {
enterBusy();
try {
if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException());
// In case cache key is passed instead of affinity key.
final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
@@ -448,7 +451,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
return ctx.task().execute(new T4(cacheName, affKey0, job), null, false);
}
catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(ctx, e);
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e);
}
finally {
leaveBusy();
@@ -526,18 +529,19 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
- public <R> IgniteInternalFuture<R> callAsync(GridClosureCallMode mode,
- @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes, boolean sys) {
+ public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
+ Callable<R> job,
+ @Nullable Collection<ClusterNode> nodes,
+ boolean sys)
+ {
assert mode != null;
+ assert job != null;
enterBusy();
try {
- if (job == null)
- return new GridFinishedFuture<>(ctx);
-
if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T7.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
@@ -554,13 +558,13 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param nodes Grid nodes.
* @return Grid future for execution result.
*/
- public <T, R> IgniteInternalFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg,
+ public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg,
@Nullable Collection<ClusterNode> nodes) {
enterBusy();
try {
if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T8.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
@@ -624,13 +628,15 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param nodes Grid nodes.
* @return Grid future for execution result.
*/
- public <T, R> IgniteInternalFuture<Collection<R>> callAsync(IgniteClosure<T, R> job, @Nullable Collection<? extends T> args,
- @Nullable Collection<ClusterNode> nodes) {
+ public <T, R> ComputeTaskInternalFuture<Collection<R>> callAsync(IgniteClosure<T, R> job,
+ @Nullable Collection<? extends T> args,
+ @Nullable Collection<ClusterNode> nodes)
+ {
enterBusy();
try {
if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T9.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
@@ -648,13 +654,13 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param nodes Grid nodes.
* @return Grid future for execution result.
*/
- public <T, R1, R2> IgniteInternalFuture<R2> callAsync(IgniteClosure<T, R1> job,
+ public <T, R1, R2> ComputeTaskInternalFuture<R2> callAsync(IgniteClosure<T, R1> job,
Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) {
enterBusy();
try {
if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+ return ComputeTaskInternalFuture.finishedFuture(ctx, T10.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java
index 3aa9f4d..6af2d2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java
@@ -30,4 +30,11 @@ public class IgniteFinishedFutureImpl<V> extends IgniteFutureImpl<V> {
public IgniteFinishedFutureImpl(GridKernalContext ctx, Throwable err) {
super(new GridFinishedFuture<V>(ctx, err));
}
+
+ /**
+ * @param ctx Context.
+ */
+ public IgniteFinishedFutureImpl(GridKernalContext ctx) {
+ super(new GridFinishedFuture<>(ctx, (V)null));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java b/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java
index 5147b3d..2cb0ee9 100644
--- a/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java
@@ -212,7 +212,7 @@ public class GridTestTaskSession implements ComputeTaskSession {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> mapFuture() {
+ @Override public IgniteFuture<?> mapFuture() {
assert false : "Not implemented";
return null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java
index 50cff92..559b31c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java
@@ -78,9 +78,13 @@ public class GridSpiExceptionSelfTest extends GridCommonAbstractTest {
assert false : "Exception should be thrown";
}
catch (IgniteException e) {
- assert e.getCause() instanceof GridTestSpiException : "Wrong cause exception type. " + e;
+ assertTrue(e.getCause() instanceof IgniteCheckedException);
- assert e.getCause().getMessage().startsWith(TEST_MSG) : "Wrong exception message." + e.getMessage();
+ Throwable err = e.getCause().getCause();
+
+ assert err instanceof GridTestSpiException : "Wrong cause exception type. " + e;
+
+ assert err.getMessage().startsWith(TEST_MSG) : "Wrong exception message." + e.getMessage();
}
}
finally {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
new file mode 100644
index 0000000..bdbdd86
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAsync() throws Exception {
+ ClusterGroup empty = ignite(0).cluster().forNodeId(UUID.randomUUID());
+
+ assertEquals(0, empty.nodes().size());
+
+ IgniteCompute comp = ignite(0).compute(empty).withAsync();
+
+ comp.affinityRun(null, 1, new FailRunnable());
+
+ checkFutureFails(comp);
+
+ comp.apply(new FailClosure(), new Object());
+
+ checkFutureFails(comp);
+
+ comp.affinityCall(null, 1, new FailCallable());
+
+ checkFutureFails(comp);
+
+ comp.broadcast(new FailCallable());
+
+ checkFutureFails(comp);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSync() throws Exception {
+ ClusterGroup empty = ignite(0).cluster().forNodeId(UUID.randomUUID());
+
+ assertEquals(0, empty.nodes().size());
+
+ final IgniteCompute comp = ignite(0).compute(empty);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ comp.affinityRun(null, 1, new FailRunnable());
+
+ return null;
+ }
+ }, ClusterGroupEmptyException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ comp.apply(new FailClosure(), new Object());
+
+ return null;
+ }
+ }, ClusterGroupEmptyException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ comp.affinityCall(null, 1, new FailCallable());
+
+ return null;
+ }
+ }, ClusterGroupEmptyException.class, null);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ comp.broadcast(new FailCallable());
+
+ return null;
+ }
+ }, ClusterGroupEmptyException.class, null);
+ }
+
+ /**
+ * @param comp Compute.
+ */
+ private void checkFutureFails(IgniteCompute comp) {
+ final ComputeTaskFuture fut = comp.future();
+
+ assertNotNull(fut);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ fut.get();
+
+ return null;
+ }
+ }, ClusterGroupEmptyException.class, null);
+ }
+
+ /**
+ *
+ */
+ private static class FailClosure implements IgniteClosure<Object, Object> {
+ /** {@inheritDoc} */
+ @Override public Object apply(Object o) {
+ fail();
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class FailRunnable implements Runnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ fail();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class FailCallable implements Callable<Object> {
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ fail();
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java b/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
index fc2cd42..d15b048 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
@@ -199,7 +199,7 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> mapFuture() {
+ @Override public IgniteFuture<?> mapFuture() {
assert false : "Not implemented";
return null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 0e83e66..efe7e5f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -92,6 +92,7 @@ public class IgniteComputeGridTestSuite {
suite.addTestSuite(GridMultinodeRedeploySharedModeSelfTest.class);
suite.addTestSuite(GridMultinodeRedeployPrivateModeSelfTest.class);
suite.addTestSuite(GridMultinodeRedeployIsolatedModeSelfTest.class);
+ suite.addTestSuite(IgniteComputeEmptyClusterGroupTest.class);
return suite;
}
[2/2] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-26' into ignite-26
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-26' into ignite-26
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/addef7eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/addef7eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/addef7eb
Branch: refs/heads/ignite-26
Commit: addef7ebf59ca061c8de7421d445a5c8ede3eca0
Parents: cf72dda 6e104b8
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 2 17:02:04 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 2 17:02:04 2015 +0300
----------------------------------------------------------------------
.../processors/fs/GridGgfsMetaManager.java | 69 +++++++++++---------
.../fs/GridGgfsMetaManagerSelfTest.java | 22 ++++---
.../processors/fs/GridGgfsModesSelfTest.java | 4 +-
.../fs/hadoop/v1/GridGgfsHadoopFileSystem.java | 4 +-
4 files changed, 57 insertions(+), 42 deletions(-)
----------------------------------------------------------------------