You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/01 14:57:27 UTC

ignite git commit: IGNITE-1340: Fixes to exception propagation.

Repository: ignite
Updated Branches:
  refs/heads/master 5f58bbb57 -> 9b06cf3d8


IGNITE-1340: Fixes to exception propagation.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b06cf3d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b06cf3d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b06cf3d

Branch: refs/heads/master
Commit: 9b06cf3d896d17af5a132a8849df41f8822dce96
Parents: 5f58bbb
Author: Pavel Tupitsyn <pt...@gridgain.com>
Authored: Tue Sep 1 15:57:36 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 1 15:57:36 2015 +0300

----------------------------------------------------------------------
 .../platform/PlatformAbstractTarget.java        | 34 +++-----
 .../platform/cache/PlatformCache.java           | 42 ++++++----
 .../PlatformCachePartialUpdateException.java    |  9 ++-
 .../datastreamer/PlatformDataStreamer.java      |  2 +-
 .../transactions/PlatformTransactions.java      |  2 +-
 .../platform/utils/PlatformFutureUtils.java     | 82 ++++++++++++++++++--
 6 files changed, 116 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 71d1657..0f46517 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -18,14 +18,14 @@
 package org.apache.ignite.internal.processors.platform;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.portable.PortableRawReaderEx;
 import org.apache.ignite.internal.portable.PortableRawWriterEx;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
 import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
-import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
@@ -171,7 +171,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
      * @param e Exception to convert.
      * @return Converted exception.
      */
-    protected Exception convertException(Exception e) {
+    public Exception convertException(Exception e) {
         return e;
     }
 
@@ -184,12 +184,12 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
 
     /** {@inheritDoc} */
     @Override public void listenFuture(final long futId, int typ) throws Exception {
-        PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null);
+        PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null, this);
     }
 
     /** {@inheritDoc} */
     @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception {
-        PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId));
+        PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId), this);
     }
 
     /**
@@ -199,26 +199,10 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
-    protected IgniteFuture currentFutureWrapped() throws IgniteCheckedException {
-        return currentFuture().chain(new IgniteClosure<IgniteFuture, Object>() {
-            @Override public Object apply(IgniteFuture o) {
-                try {
-                    return o.get();
-                }
-                catch (RuntimeException e) {
-                    Exception converted = convertException(e);
-
-                    if (converted instanceof RuntimeException)
-                        throw (RuntimeException)converted;
-                    else {
-                        log.error("Interop future result cannot be obtained due to exception.", converted);
-
-                        throw new IgniteException("Interop future result cannot be obtained due to exception " +
-                            "(see log for more details).");
-                    }
-                }
-            }
-        });
+    protected IgniteInternalFuture currentFutureWrapped() throws IgniteCheckedException {
+        IgniteFutureImpl fut = (IgniteFutureImpl)currentFuture();
+
+        return fut.internalFuture();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index a7c741e..e579be7 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -17,18 +17,6 @@
 
 package org.apache.ignite.internal.processors.platform.cache;
 
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import javax.cache.Cache;
-import javax.cache.expiry.Duration;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheEntryProcessor;
@@ -43,6 +31,7 @@ import org.apache.ignite.cache.query.TextQuery;
 import org.apache.ignite.internal.portable.PortableRawReaderEx;
 import org.apache.ignite.internal.portable.PortableRawWriterEx;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -58,6 +47,19 @@ import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
+import javax.cache.Cache;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+
 /**
  * Native cache wrapper implementation.
  */
@@ -618,9 +620,16 @@ public class PlatformCache extends PlatformAbstractTarget {
     }
 
     /** {@inheritDoc} */
-    @Override protected Exception convertException(Exception e) {
+    @Override public Exception convertException(Exception e) {
         if (e instanceof CachePartialUpdateException)
-            return new PlatformCachePartialUpdateException((CachePartialUpdateException)e, platformCtx, keepPortable);
+            return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e.getCause(),
+                platformCtx, keepPortable);
+
+        if (e instanceof CachePartialUpdateCheckedException)
+            return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e, platformCtx, keepPortable);
+
+        if (e.getCause() instanceof EntryProcessorException)
+            return (EntryProcessorException) e.getCause();
 
         return super.convertException(e);
     }
@@ -788,11 +797,10 @@ public class PlatformCache extends PlatformAbstractTarget {
      */
     public void rebalance(long futId) {
         PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() {
-            @Override
-            public Object apply(IgniteFuture fut) {
+            @Override public Object apply(IgniteFuture fut) {
                 return null;
             }
-        }), futId, PlatformFutureUtils.TYP_OBJ);
+        }), futId, PlatformFutureUtils.TYP_OBJ, this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
index 354cef7..58dfa4c 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
@@ -17,14 +17,15 @@
 
 package org.apache.ignite.internal.processors.platform.cache;
 
-import java.util.Collection;
-import org.apache.ignite.cache.CachePartialUpdateException;
 import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformException;
 import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 
+import java.util.Collection;
+
 /**
  * Interop cache partial update exception.
  */
@@ -45,7 +46,7 @@ public class PlatformCachePartialUpdateException extends PlatformException imple
      * @param ctx Context.
      * @param keepPortable Keep portable flag.
      */
-    public PlatformCachePartialUpdateException(CachePartialUpdateException cause, PlatformContext ctx,
+    public PlatformCachePartialUpdateException(CachePartialUpdateCheckedException cause, PlatformContext ctx,
         boolean keepPortable) {
         super(cause);
 
@@ -60,7 +61,7 @@ public class PlatformCachePartialUpdateException extends PlatformException imple
 
     /** {@inheritDoc} */
     @Override public void writeData(PortableRawWriterEx writer) {
-        Collection keys = ((CachePartialUpdateException)getCause()).failedKeys();
+        Collection keys = ((CachePartialUpdateCheckedException)getCause()).failedKeys();
 
         writer.writeBoolean(keepPortable);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index e0e9305..ef64ef9 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -110,7 +110,7 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
                             vals.add(new GridMapEntry(reader.readObjectDetached(), reader.readObjectDetached()));
 
                         PlatformFutureUtils.listen(platformCtx, ldr.addData(vals), futPtr,
-                            PlatformFutureUtils.TYP_OBJ);
+                            PlatformFutureUtils.TYP_OBJ, this);
                     }
 
                     if (plc == PLC_CLOSE) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
index 86942c5..1d2c315 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
@@ -179,7 +179,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
             }
         });
 
-        PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ);
+        PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b06cf3d/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
index 59e5463..0019986 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.processors.platform.utils;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
@@ -66,8 +68,35 @@ public class PlatformFutureUtils {
      * @param futPtr Native future pointer.
      * @param typ Expected return type.
      */
-    public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ) {
-        listen(ctx, new FutureListenable(fut), futPtr, typ, null);
+    public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, final int typ,
+        PlatformAbstractTarget target) {
+        listen(ctx, new InternalFutureListenable(fut), futPtr, typ, null, target);
+    }
+    /**
+     * Listen future.
+     *
+     * @param ctx Context.
+     * @param fut Java future.
+     * @param futPtr Native future pointer.
+     * @param typ Expected return type.
+     */
+    public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ,
+        PlatformAbstractTarget target) {
+        listen(ctx, new FutureListenable(fut), futPtr, typ, null, target);
+    }
+
+    /**
+     * Listen future.
+     *
+     * @param ctx Context.
+     * @param fut Java future.
+     * @param futPtr Native future pointer.
+     * @param typ Expected return type.
+     * @param writer Writer.
+     */
+    public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, final int typ,
+        Writer writer, PlatformAbstractTarget target) {
+        listen(ctx, new InternalFutureListenable(fut), futPtr, typ, writer, target);
     }
 
     /**
@@ -80,8 +109,8 @@ public class PlatformFutureUtils {
      * @param writer Writer.
      */
     public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ,
-        Writer writer) {
-        listen(ctx, new FutureListenable(fut), futPtr, typ, writer);
+        Writer writer, PlatformAbstractTarget target) {
+        listen(ctx, new FutureListenable(fut), futPtr, typ, writer, target);
     }
 
     /**
@@ -92,8 +121,9 @@ public class PlatformFutureUtils {
      * @param futPtr Native future pointer.
      * @param writer Writer.
      */
-    public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, Writer writer) {
-        listen(ctx, new FutureListenable(fut), futPtr, TYP_OBJ, writer);
+    public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, Writer writer,
+        PlatformAbstractTarget target) {
+        listen(ctx, new InternalFutureListenable(fut), futPtr, TYP_OBJ, writer, target);
     }
 
     /**
@@ -107,13 +137,16 @@ public class PlatformFutureUtils {
      */
     @SuppressWarnings("unchecked")
     private static void listen(final PlatformContext ctx, Listenable listenable, final long futPtr, final int typ,
-        @Nullable final Writer writer) {
+        @Nullable final Writer writer, final PlatformAbstractTarget target) {
         final PlatformCallbackGateway gate = ctx.gateway();
 
         listenable.listen(new IgniteBiInClosure<Object, Throwable>() {
             private static final long serialVersionUID = 0L;
 
             @Override public void apply(Object res, Throwable err) {
+                if (err instanceof Exception)
+                    err = target.convertException((Exception)err);
+
                 if (writer != null && writeToWriter(res, err, ctx, writer, futPtr))
                     return;
 
@@ -326,4 +359,39 @@ public class PlatformFutureUtils {
             });
         }
     }
+
+    /**
+     * Listenable around Ignite future.
+     */
+    private static class InternalFutureListenable implements Listenable {
+        /** Future. */
+        private final IgniteInternalFuture fut;
+
+        /**
+         * Constructor.
+         *
+         * @param fut Future.
+         */
+        public InternalFutureListenable(IgniteInternalFuture fut) {
+            this.fut = fut;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public void listen(final IgniteBiInClosure<Object, Throwable> lsnr) {
+            fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+                private static final long serialVersionUID = 0L;
+
+                @Override public void apply(IgniteInternalFuture fut0) {
+                    try {
+                        lsnr.apply(fut0.get(), null);
+                    }
+                    catch (Throwable err) {
+                        lsnr.apply(null, err);
+                    }
+                }
+            });
+        }
+    }
+
 }
\ No newline at end of file