You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/01 14:57:27 UTC
ignite git commit: IGNITE-1340: Fixes to exception propagation.
Repository: ignite
Updated Branches:
refs/heads/master 5f58bbb57 -> 9b06cf3d8
IGNITE-1340: Fixes to exception propagation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b06cf3d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b06cf3d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b06cf3d
Branch: refs/heads/master
Commit: 9b06cf3d896d17af5a132a8849df41f8822dce96
Parents: 5f58bbb
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Tue Sep 1 15:57:36 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 1 15:57:36 2015 +0300
----------------------------------------------------------------------
.../platform/PlatformAbstractTarget.java | 34 +++-----
.../platform/cache/PlatformCache.java | 42 ++++++----
.../PlatformCachePartialUpdateException.java | 9 ++-
.../datastreamer/PlatformDataStreamer.java | 2 +-
.../transactions/PlatformTransactions.java | 2 +-
.../platform/utils/PlatformFutureUtils.java | 82 ++++++++++++++++++--
6 files changed, 116 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 71d1657..0f46517 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -18,14 +18,14 @@
package org.apache.ignite.internal.processors.platform;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.portable.PortableRawReaderEx;
import org.apache.ignite.internal.portable.PortableRawWriterEx;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
-import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
@@ -171,7 +171,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
* @param e Exception to convert.
* @return Converted exception.
*/
- protected Exception convertException(Exception e) {
+ public Exception convertException(Exception e) {
return e;
}
@@ -184,12 +184,12 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
/** {@inheritDoc} */
@Override public void listenFuture(final long futId, int typ) throws Exception {
- PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null);
+ PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null, this);
}
/** {@inheritDoc} */
@Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception {
- PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId));
+ PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId), this);
}
/**
@@ -199,26 +199,10 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
- protected IgniteFuture currentFutureWrapped() throws IgniteCheckedException {
- return currentFuture().chain(new IgniteClosure<IgniteFuture, Object>() {
- @Override public Object apply(IgniteFuture o) {
- try {
- return o.get();
- }
- catch (RuntimeException e) {
- Exception converted = convertException(e);
-
- if (converted instanceof RuntimeException)
- throw (RuntimeException)converted;
- else {
- log.error("Interop future result cannot be obtained due to exception.", converted);
-
- throw new IgniteException("Interop future result cannot be obtained due to exception " +
- "(see log for more details).");
- }
- }
- }
- });
+ protected IgniteInternalFuture currentFutureWrapped() throws IgniteCheckedException {
+ IgniteFutureImpl fut = (IgniteFutureImpl)currentFuture();
+
+ return fut.internalFuture();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index a7c741e..e579be7 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -17,18 +17,6 @@
package org.apache.ignite.internal.processors.platform.cache;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import javax.cache.Cache;
-import javax.cache.expiry.Duration;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheEntryProcessor;
@@ -43,6 +31,7 @@ import org.apache.ignite.cache.query.TextQuery;
import org.apache.ignite.internal.portable.PortableRawReaderEx;
import org.apache.ignite.internal.portable.PortableRawWriterEx;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -58,6 +47,19 @@ import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
+import javax.cache.Cache;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+
/**
* Native cache wrapper implementation.
*/
@@ -618,9 +620,16 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Exception convertException(Exception e) {
+ @Override public Exception convertException(Exception e) {
if (e instanceof CachePartialUpdateException)
- return new PlatformCachePartialUpdateException((CachePartialUpdateException)e, platformCtx, keepPortable);
+ return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e.getCause(),
+ platformCtx, keepPortable);
+
+ if (e instanceof CachePartialUpdateCheckedException)
+ return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e, platformCtx, keepPortable);
+
+ if (e.getCause() instanceof EntryProcessorException)
+ return (EntryProcessorException) e.getCause();
return super.convertException(e);
}
@@ -788,11 +797,10 @@ public class PlatformCache extends PlatformAbstractTarget {
*/
public void rebalance(long futId) {
PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() {
- @Override
- public Object apply(IgniteFuture fut) {
+ @Override public Object apply(IgniteFuture fut) {
return null;
}
- }), futId, PlatformFutureUtils.TYP_OBJ);
+ }), futId, PlatformFutureUtils.TYP_OBJ, this);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
index 354cef7..58dfa4c 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
@@ -17,14 +17,15 @@
package org.apache.ignite.internal.processors.platform.cache;
-import java.util.Collection;
-import org.apache.ignite.cache.CachePartialUpdateException;
import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformException;
import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import java.util.Collection;
+
/**
* Interop cache partial update exception.
*/
@@ -45,7 +46,7 @@ public class PlatformCachePartialUpdateException extends PlatformException imple
* @param ctx Context.
* @param keepPortable Keep portable flag.
*/
- public PlatformCachePartialUpdateException(CachePartialUpdateException cause, PlatformContext ctx,
+ public PlatformCachePartialUpdateException(CachePartialUpdateCheckedException cause, PlatformContext ctx,
boolean keepPortable) {
super(cause);
@@ -60,7 +61,7 @@ public class PlatformCachePartialUpdateException extends PlatformException imple
/** {@inheritDoc} */
@Override public void writeData(PortableRawWriterEx writer) {
- Collection keys = ((CachePartialUpdateException)getCause()).failedKeys();
+ Collection keys = ((CachePartialUpdateCheckedException)getCause()).failedKeys();
writer.writeBoolean(keepPortable);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index e0e9305..ef64ef9 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -110,7 +110,7 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
vals.add(new GridMapEntry(reader.readObjectDetached(), reader.readObjectDetached()));
PlatformFutureUtils.listen(platformCtx, ldr.addData(vals), futPtr,
- PlatformFutureUtils.TYP_OBJ);
+ PlatformFutureUtils.TYP_OBJ, this);
}
if (plc == PLC_CLOSE) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
index 86942c5..1d2c315 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
@@ -179,7 +179,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
}
});
- PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ);
+ PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, this);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
index 59e5463..0019986 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.processors.platform.utils;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
@@ -66,8 +68,35 @@ public class PlatformFutureUtils {
* @param futPtr Native future pointer.
* @param typ Expected return type.
*/
- public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ) {
- listen(ctx, new FutureListenable(fut), futPtr, typ, null);
+ public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, final int typ,
+ PlatformAbstractTarget target) {
+ listen(ctx, new InternalFutureListenable(fut), futPtr, typ, null, target);
+ }
+ /**
+ * Listen future.
+ *
+ * @param ctx Context.
+ * @param fut Java future.
+ * @param futPtr Native future pointer.
+ * @param typ Expected return type.
+ */
+ public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ,
+ PlatformAbstractTarget target) {
+ listen(ctx, new FutureListenable(fut), futPtr, typ, null, target);
+ }
+
+ /**
+ * Listen future.
+ *
+ * @param ctx Context.
+ * @param fut Java future.
+ * @param futPtr Native future pointer.
+ * @param typ Expected return type.
+ * @param writer Writer.
+ */
+ public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, final int typ,
+ Writer writer, PlatformAbstractTarget target) {
+ listen(ctx, new InternalFutureListenable(fut), futPtr, typ, writer, target);
}
/**
@@ -80,8 +109,8 @@ public class PlatformFutureUtils {
* @param writer Writer.
*/
public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ,
- Writer writer) {
- listen(ctx, new FutureListenable(fut), futPtr, typ, writer);
+ Writer writer, PlatformAbstractTarget target) {
+ listen(ctx, new FutureListenable(fut), futPtr, typ, writer, target);
}
/**
@@ -92,8 +121,9 @@ public class PlatformFutureUtils {
* @param futPtr Native future pointer.
* @param writer Writer.
*/
- public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, Writer writer) {
- listen(ctx, new FutureListenable(fut), futPtr, TYP_OBJ, writer);
+ public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, Writer writer,
+ PlatformAbstractTarget target) {
+ listen(ctx, new InternalFutureListenable(fut), futPtr, TYP_OBJ, writer, target);
}
/**
@@ -107,13 +137,16 @@ public class PlatformFutureUtils {
*/
@SuppressWarnings("unchecked")
private static void listen(final PlatformContext ctx, Listenable listenable, final long futPtr, final int typ,
- @Nullable final Writer writer) {
+ @Nullable final Writer writer, final PlatformAbstractTarget target) {
final PlatformCallbackGateway gate = ctx.gateway();
listenable.listen(new IgniteBiInClosure<Object, Throwable>() {
private static final long serialVersionUID = 0L;
@Override public void apply(Object res, Throwable err) {
+ if (err instanceof Exception)
+ err = target.convertException((Exception)err);
+
if (writer != null && writeToWriter(res, err, ctx, writer, futPtr))
return;
@@ -326,4 +359,39 @@ public class PlatformFutureUtils {
});
}
}
+
+ /**
+ * Listenable around Ignite future.
+ */
+ private static class InternalFutureListenable implements Listenable {
+ /** Future. */
+ private final IgniteInternalFuture fut;
+
+ /**
+ * Constructor.
+ *
+ * @param fut Future.
+ */
+ public InternalFutureListenable(IgniteInternalFuture fut) {
+ this.fut = fut;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void listen(final IgniteBiInClosure<Object, Throwable> lsnr) {
+ fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public void apply(IgniteInternalFuture fut0) {
+ try {
+ lsnr.apply(fut0.get(), null);
+ }
+ catch (Throwable err) {
+ lsnr.apply(null, err);
+ }
+ }
+ });
+ }
+ }
+
}
\ No newline at end of file