You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/08/19 11:49:05 UTC
[48/53] [abbrv] ignite git commit: IGNITE-2943 .NET: Improve cache
error propagation and interop performance
IGNITE-2943 .NET: Improve cache error propagation and interop performance
This closes #672
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6899e06f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6899e06f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6899e06f
Branch: refs/heads/ignite-3299
Commit: 6899e06fbcfebff7b6de7e43b0f0b1107d649e00
Parents: 314eec5
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Thu Aug 18 18:14:12 2016 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Aug 18 18:14:12 2016 +0300
----------------------------------------------------------------------
.../platform/PlatformAbstractTarget.java | 17 +-
.../platform/cache/PlatformCache.java | 335 ++++++++++---------
.../dotnet/PlatformDotNetCacheStore.java | 12 +-
.../platform/utils/PlatformFutureUtils.java | 6 +-
.../platform/utils/PlatformUtils.java | 25 ++
.../include/ignite/impl/binary/binary_utils.h | 87 +++++
.../src/impl/binary/binary_reader_impl.cpp | 30 +-
.../ignite/impl/interop/interop_target.h | 15 +-
.../cpp/core/include/ignite/impl/operations.h | 47 ++-
.../cpp/core/src/impl/cache/cache_impl.cpp | 14 +-
.../core/src/impl/interop/interop_target.cpp | 41 ++-
.../src/impl/transactions/transactions_impl.cpp | 5 +
.../Apache.Ignite.Benchmarks/BenchmarkRunner.cs | 5 +-
.../Interop/PlatformBenchmarkBase.cs | 2 +-
.../Cache/CacheAbstractTest.cs | 2 +-
.../Cache/Store/CacheStoreTest.cs | 39 ++-
.../Cache/Store/CacheTestStore.cs | 50 ++-
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 239 ++++++-------
.../Apache.Ignite.Core/Impl/ExceptionUtils.cs | 10 +-
.../Apache.Ignite.Core/Impl/PlatformTarget.cs | 80 ++++-
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 20 +-
21 files changed, 745 insertions(+), 336 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 0cd683d..0ca4453 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -38,6 +38,9 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
/** Constant: FALSE. */
protected static final int FALSE = 0;
+ /** Constant: ERROR. */
+ protected static final int ERROR = -1;
+
/** */
private static final int OP_META = -1;
@@ -69,7 +72,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
return TRUE;
}
else
- return processInStreamOutLong(type, reader);
+ return processInStreamOutLong(type, reader, mem);
}
catch (Exception e) {
throw convertException(e);
@@ -235,6 +238,18 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
}
/**
+ * Process IN operation.
+ *
+ * @param type Type.
+ * @param reader Binary reader.
+ * @return Result.
+ * @throws IgniteCheckedException In case of exception.
+ */
+ protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException {
+ return processInStreamOutLong(type, reader);
+ }
+
+ /**
* Process IN-OUT operation.
*
* @param type Type.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index d572e8b..a7b6e41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -42,10 +42,13 @@ import org.apache.ignite.internal.processors.platform.PlatformNativeException;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.C1;
@@ -290,109 +293,207 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
- switch (type) {
- case OP_PUT:
- cache.put(reader.readObjectDetached(), reader.readObjectDetached());
+ @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException {
+ try {
+ switch (type) {
+ case OP_PUT:
+ cache.put(reader.readObjectDetached(), reader.readObjectDetached());
- return TRUE;
+ return TRUE;
- case OP_REMOVE_BOOL:
- return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
+ case OP_GET:
+ return writeResult(mem, cache.get(reader.readObjectDetached()));
- case OP_REMOVE_ALL:
- cache.removeAll(PlatformUtils.readSet(reader));
+ case OP_REMOVE_BOOL:
+ return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
- return TRUE;
+ case OP_REMOVE_ALL:
+ cache.removeAll(PlatformUtils.readSet(reader));
- case OP_PUT_ALL:
- cache.putAll(PlatformUtils.readMap(reader));
+ return TRUE;
- return TRUE;
+ case OP_PUT_ALL:
+ cache.putAll(PlatformUtils.readMap(reader));
- case OP_LOC_EVICT:
- cache.localEvict(PlatformUtils.readCollection(reader));
+ return TRUE;
- return TRUE;
+ case OP_LOC_EVICT:
+ cache.localEvict(PlatformUtils.readCollection(reader));
- case OP_CONTAINS_KEY:
- return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE;
+ return TRUE;
- case OP_CONTAINS_KEYS:
- return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE;
+ case OP_CONTAINS_KEY:
+ return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE;
- case OP_LOC_PROMOTE: {
- cache.localPromote(PlatformUtils.readSet(reader));
+ case OP_CONTAINS_KEYS:
+ return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE;
- break;
- }
+ case OP_LOC_PROMOTE: {
+ cache.localPromote(PlatformUtils.readSet(reader));
- case OP_REPLACE_3:
- return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(),
- reader.readObjectDetached()) ? TRUE : FALSE;
+ return TRUE;
+ }
- case OP_LOC_LOAD_CACHE:
- loadCache0(reader, true);
+ case OP_REPLACE_3:
+ return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(),
+ reader.readObjectDetached()) ? TRUE : FALSE;
- break;
+ case OP_LOC_LOAD_CACHE:
+ loadCache0(reader, true);
- case OP_LOAD_CACHE:
- loadCache0(reader, false);
+ return TRUE;
- break;
+ case OP_LOAD_CACHE:
+ loadCache0(reader, false);
- case OP_CLEAR:
- cache.clear(reader.readObjectDetached());
+ return TRUE;
- break;
+ case OP_CLEAR:
+ cache.clear(reader.readObjectDetached());
- case OP_CLEAR_ALL:
- cache.clearAll(PlatformUtils.readSet(reader));
+ return TRUE;
- break;
+ case OP_CLEAR_ALL:
+ cache.clearAll(PlatformUtils.readSet(reader));
- case OP_LOCAL_CLEAR:
- cache.localClear(reader.readObjectDetached());
+ return TRUE;
- break;
+ case OP_LOCAL_CLEAR:
+ cache.localClear(reader.readObjectDetached());
- case OP_LOCAL_CLEAR_ALL:
- cache.localClearAll(PlatformUtils.readSet(reader));
+ return TRUE;
- break;
+ case OP_LOCAL_CLEAR_ALL:
+ cache.localClearAll(PlatformUtils.readSet(reader));
- case OP_PUT_IF_ABSENT: {
- return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
- }
+ return TRUE;
- case OP_REPLACE_2: {
- return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
- }
+ case OP_PUT_IF_ABSENT:
+ return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
- case OP_REMOVE_OBJ: {
- return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE;
- }
+ case OP_REPLACE_2:
+ return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
+
+ case OP_REMOVE_OBJ:
+ return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE;
+
+ case OP_IS_LOCAL_LOCKED:
+ return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE;
+
+ case OP_LOAD_ALL: {
+ long futId = reader.readLong();
+ boolean replaceExisting = reader.readBoolean();
+
+ CompletionListenable fut = new CompletionListenable();
+
+ PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, null, this);
+
+ cache.loadAll(PlatformUtils.readSet(reader), replaceExisting, fut);
+
+ return TRUE;
+ }
+
+ case OP_GET_AND_PUT:
+ return writeResult(mem, cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached()));
+
+ case OP_GET_AND_REPLACE:
+ return writeResult(mem, cache.getAndReplace(reader.readObjectDetached(), reader.readObjectDetached()));
+
+ case OP_GET_AND_REMOVE:
+ return writeResult(mem, cache.getAndRemove(reader.readObjectDetached()));
+
+ case OP_GET_AND_PUT_IF_ABSENT:
+ return writeResult(mem, cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()));
+
+ case OP_PEEK: {
+ Object key = reader.readObjectDetached();
+
+ CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt());
+
+ return writeResult(mem, cache.localPeek(key, modes));
+ }
+
+ case OP_GET_ALL: {
+ Set keys = PlatformUtils.readSet(reader);
+
+ Map entries = cache.getAll(keys);
+
+ return writeResult(mem, entries, new PlatformWriterClosure<Map>() {
+ @Override public void write(BinaryRawWriterEx writer, Map val) {
+ PlatformUtils.writeNullableMap(writer, val);
+ }
+ });
+ }
+
+ case OP_INVOKE: {
+ Object key = reader.readObjectDetached();
- case OP_IS_LOCAL_LOCKED:
- return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE;
+ CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
- case OP_LOAD_ALL: {
- long futId = reader.readLong();
- boolean replaceExisting = reader.readBoolean();
+ return writeResult(mem, cache.invoke(key, proc));
+ }
- CompletionListenable fut = new CompletionListenable();
+ case OP_INVOKE_ALL: {
+ Set<Object> keys = PlatformUtils.readSet(reader);
- PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, null, this);
+ CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
- cache.loadAll(PlatformUtils.readSet(reader), replaceExisting, fut);
+ Map results = cache.invokeAll(keys, proc);
- return TRUE;
+ return writeResult(mem, results, new PlatformWriterClosure<Map>() {
+ @Override public void write(BinaryRawWriterEx writer, Map val) {
+ writeInvokeAllResult(writer, val);
+ }
+ });
+ }
+
+ case OP_LOCK:
+ return registerLock(cache.lock(reader.readObjectDetached()));
+
+ case OP_LOCK_ALL:
+ return registerLock(cache.lockAll(PlatformUtils.readCollection(reader)));
}
+ }
+ catch (Exception e) {
+ PlatformOutputStream out = mem.output();
+ BinaryRawWriterEx writer = platformCtx.writer(out);
- default:
- return super.processInStreamOutLong(type, reader);
+ Exception err = convertException(e);
+
+ PlatformUtils.writeError(err, writer);
+ PlatformUtils.writeErrorData(err, writer);
+
+ out.synchronize();
+
+ return ERROR;
}
+ return super.processInStreamOutLong(type, reader, mem);
+ }
+
+ /**
+ * Writes the result to reused stream, if any.
+ */
+ private long writeResult(PlatformMemory mem, Object obj) {
+ return writeResult(mem, obj, null);
+ }
+
+ /**
+ * Writes the result to reused stream, if any.
+ */
+ private long writeResult(PlatformMemory mem, Object obj, PlatformWriterClosure clo) {
+ if (obj == null)
+ return FALSE;
+
+ PlatformOutputStream out = mem.output();
+ BinaryRawWriterEx writer = platformCtx.writer(out);
+
+ if (clo == null)
+ writer.writeObjectDetached(obj);
+ else
+ clo.write(writer, obj);
+
+ out.synchronize();
return TRUE;
}
@@ -555,106 +656,6 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"})
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
- throws IgniteCheckedException {
- switch (type) {
- case OP_GET: {
- writer.writeObjectDetached(cache.get(reader.readObjectDetached()));
-
- break;
- }
-
- case OP_GET_AND_PUT: {
- writer.writeObjectDetached(cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached()));
-
- break;
- }
-
- case OP_GET_AND_REPLACE: {
- writer.writeObjectDetached(cache.getAndReplace(reader.readObjectDetached(),
- reader.readObjectDetached()));
-
- break;
- }
-
- case OP_GET_AND_REMOVE: {
- writer.writeObjectDetached(cache.getAndRemove(reader.readObjectDetached()));
-
- break;
- }
-
- case OP_GET_AND_PUT_IF_ABSENT: {
- writer.writeObjectDetached(cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()));
-
- break;
- }
-
- case OP_PEEK: {
- Object key = reader.readObjectDetached();
-
- CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt());
-
- writer.writeObjectDetached(cache.localPeek(key, modes));
-
- break;
- }
-
- case OP_GET_ALL: {
- Set keys = PlatformUtils.readSet(reader);
-
- Map entries = cache.getAll(keys);
-
- PlatformUtils.writeNullableMap(writer, entries);
-
- break;
- }
-
- case OP_INVOKE: {
- Object key = reader.readObjectDetached();
-
- CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
-
- try {
- writer.writeObjectDetached(cache.invoke(key, proc));
- }
- catch (EntryProcessorException ex)
- {
- if (ex.getCause() instanceof PlatformNativeException)
- writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause());
- else
- throw ex;
- }
-
- break;
- }
-
- case OP_INVOKE_ALL: {
- Set<Object> keys = PlatformUtils.readSet(reader);
-
- CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
-
- writeInvokeAllResult(writer, cache.invokeAll(keys, proc));
-
- break;
- }
-
- case OP_LOCK:
- writer.writeLong(registerLock(cache.lock(reader.readObjectDetached())));
-
- break;
-
- case OP_LOCK_ALL:
- writer.writeLong(registerLock(cache.lockAll(PlatformUtils.readCollection(reader))));
-
- break;
-
- default:
- super.processInStreamOutStream(type, reader, writer);
- }
- }
-
- /** {@inheritDoc} */
@Override public Exception convertException(Exception e) {
if (e instanceof CachePartialUpdateException)
return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e.getCause(),
@@ -699,7 +700,7 @@ public class PlatformCache extends PlatformAbstractTarget {
catch (Exception ex) {
writer.writeBoolean(true); // Exception
- writeError(writer, ex);
+ PlatformUtils.writeError(ex, writer);
}
}
}
@@ -1033,7 +1034,7 @@ public class PlatformCache extends PlatformAbstractTarget {
else {
writer.writeBoolean(true); // Error.
- writeError(writer, (Exception) err);
+ PlatformUtils.writeError(err, writer);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
index 1c60a42..d38fd8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
@@ -389,10 +389,9 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
*
* @param task Task.
* @param cb Optional callback.
- * @return Result.
* @throws org.apache.ignite.IgniteCheckedException If failed.
*/
- protected int doInvoke(IgniteInClosureX<BinaryRawWriterEx> task, @Nullable PlatformCacheStoreCallback cb)
+ protected void doInvoke(IgniteInClosureX<BinaryRawWriterEx> task, @Nullable PlatformCacheStoreCallback cb)
throws IgniteCheckedException{
try (PlatformMemory mem = platformCtx.memory().allocate()) {
PlatformOutputStream out = mem.output();
@@ -403,7 +402,14 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
out.synchronize();
- return platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer(), cb);
+ int res = platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer(), cb);
+
+ if (res != 0) {
+ // Read error
+ Object nativeErr = platformCtx.reader(mem.input()).readObjectDetached();
+
+ throw platformCtx.createNativeException(nativeErr);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
index 6692a23..5985d22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
@@ -293,10 +292,7 @@ public class PlatformFutureUtils {
BinaryRawWriterEx outWriter = ctx.writer(out);
- outWriter.writeString(err.getClass().getName());
- outWriter.writeString(err.getMessage());
- outWriter.writeString(X.getFullStackTrace(err));
-
+ PlatformUtils.writeError(err, outWriter);
PlatformUtils.writeErrorData(err, outWriter);
out.synchronize();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index dd90fda..ccdd59d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -575,6 +575,31 @@ public class PlatformUtils {
}
/**
+ * Writes error.
+ *
+ * @param ex Error.
+ * @param writer Writer.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ public static void writeError(Throwable ex, BinaryRawWriterEx writer) {
+ writer.writeObjectDetached(ex.getClass().getName());
+
+ writer.writeObjectDetached(ex.getMessage());
+
+ writer.writeObjectDetached(X.getFullStackTrace(ex));
+
+ PlatformNativeException nativeCause = X.cause(ex, PlatformNativeException.class);
+
+ if (nativeCause != null) {
+ writer.writeBoolean(true);
+
+ writer.writeObjectDetached(nativeCause.cause());
+ }
+ else
+ writer.writeBoolean(false);
+ }
+
+ /**
* Writer error data.
*
* @param err Error.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h
index 88130d8..3abd651 100644
--- a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h
+++ b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h
@@ -26,6 +26,8 @@
#include "ignite/date.h"
#include "ignite/timestamp.h"
+#include "ignite/binary/binary_type.h"
+
namespace ignite
{
namespace impl
@@ -538,7 +540,92 @@ namespace ignite
*/
static Timestamp MakeTimestampLocal(int year = 1900, int month = 1,
int day = 1, int hour = 0, int min = 0, int sec = 0, long ns = 0);
+
+ /**
+ * Get default value for the type.
+ *
+ * @return Null value for non primitive types and zeroes for primitives.
+ */
+ template<typename T>
+ static T GetDefaultValue()
+ {
+ ignite::binary::BinaryType<T> binType;
+
+ return binType.GetNull();
+ }
};
+
+ template<>
+ inline int8_t BinaryUtils::GetDefaultValue<int8_t>()
+ {
+ return 0;
+ }
+
+ template<>
+ inline int16_t BinaryUtils::GetDefaultValue<int16_t>()
+ {
+ return 0;
+ }
+
+ template<>
+ inline uint16_t BinaryUtils::GetDefaultValue<uint16_t>()
+ {
+ return 0;
+ }
+
+ template<>
+ inline int32_t BinaryUtils::GetDefaultValue<int32_t>()
+ {
+ return 0;
+ }
+
+ template<>
+ inline int64_t BinaryUtils::GetDefaultValue<int64_t>()
+ {
+ return 0;
+ }
+
+ template<>
+ inline bool BinaryUtils::GetDefaultValue<bool>()
+ {
+ return false;
+ }
+
+ template<>
+ inline float BinaryUtils::GetDefaultValue<float>()
+ {
+ return 0.0f;
+ }
+
+ template<>
+ inline double BinaryUtils::GetDefaultValue<double>()
+ {
+ return 0.0;
+ }
+
+ template<>
+ inline Guid BinaryUtils::GetDefaultValue<Guid>()
+ {
+ return Guid();
+ }
+
+ template<>
+ inline Date BinaryUtils::GetDefaultValue<Date>()
+ {
+ return Date();
+ }
+
+ template<>
+ inline Timestamp BinaryUtils::GetDefaultValue<Timestamp>()
+ {
+ return Timestamp();
+ }
+
+ template<>
+ inline std::string BinaryUtils::GetDefaultValue<std::string>()
+ {
+ return std::string();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp
index 33205e4..c3f4fcc 100644
--- a/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp
+++ b/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp
@@ -676,49 +676,57 @@ namespace ignite
template <>
int8_t BinaryReaderImpl::ReadTopObject<int8_t>()
{
- return ReadTopObject0(IGNITE_TYPE_BYTE, BinaryUtils::ReadInt8, static_cast<int8_t>(0));
+ return ReadTopObject0(IGNITE_TYPE_BYTE, BinaryUtils::ReadInt8,
+ BinaryUtils::GetDefaultValue<int8_t>());
}
template <>
bool BinaryReaderImpl::ReadTopObject<bool>()
{
- return ReadTopObject0(IGNITE_TYPE_BOOL, BinaryUtils::ReadBool, static_cast<bool>(0));
+ return ReadTopObject0(IGNITE_TYPE_BOOL, BinaryUtils::ReadBool,
+ BinaryUtils::GetDefaultValue<bool>());
}
template <>
int16_t BinaryReaderImpl::ReadTopObject<int16_t>()
{
- return ReadTopObject0(IGNITE_TYPE_SHORT, BinaryUtils::ReadInt16, static_cast<int16_t>(0));
+ return ReadTopObject0(IGNITE_TYPE_SHORT, BinaryUtils::ReadInt16,
+ BinaryUtils::GetDefaultValue<int16_t>());
}
template <>
uint16_t BinaryReaderImpl::ReadTopObject<uint16_t>()
{
- return ReadTopObject0(IGNITE_TYPE_CHAR, BinaryUtils::ReadUInt16, static_cast<uint16_t>(0));
+ return ReadTopObject0(IGNITE_TYPE_CHAR, BinaryUtils::ReadUInt16,
+ BinaryUtils::GetDefaultValue<uint16_t>());
}
template <>
int32_t BinaryReaderImpl::ReadTopObject<int32_t>()
{
- return ReadTopObject0(IGNITE_TYPE_INT, BinaryUtils::ReadInt32, static_cast<int32_t>(0));
+ return ReadTopObject0(IGNITE_TYPE_INT, BinaryUtils::ReadInt32,
+ BinaryUtils::GetDefaultValue<int32_t>());
}
template <>
int64_t BinaryReaderImpl::ReadTopObject<int64_t>()
{
- return ReadTopObject0(IGNITE_TYPE_LONG, BinaryUtils::ReadInt64, static_cast<int64_t>(0));
+ return ReadTopObject0(IGNITE_TYPE_LONG, BinaryUtils::ReadInt64,
+ BinaryUtils::GetDefaultValue<int64_t>());
}
template <>
float BinaryReaderImpl::ReadTopObject<float>()
{
- return ReadTopObject0(IGNITE_TYPE_FLOAT, BinaryUtils::ReadFloat, static_cast<float>(0));
+ return ReadTopObject0(IGNITE_TYPE_FLOAT, BinaryUtils::ReadFloat,
+ BinaryUtils::GetDefaultValue<float>());
}
template <>
double BinaryReaderImpl::ReadTopObject<double>()
{
- return ReadTopObject0(IGNITE_TYPE_DOUBLE, BinaryUtils::ReadDouble, static_cast<double>(0));
+ return ReadTopObject0(IGNITE_TYPE_DOUBLE, BinaryUtils::ReadDouble,
+ BinaryUtils::GetDefaultValue<double>());
}
template <>
@@ -729,7 +737,7 @@ namespace ignite
if (typeId == IGNITE_TYPE_UUID)
return BinaryUtils::ReadGuid(stream);
else if (typeId == IGNITE_HDR_NULL)
- return Guid();
+ return BinaryUtils::GetDefaultValue<Guid>();
else {
int32_t pos = stream->Position() - 1;
@@ -747,7 +755,7 @@ namespace ignite
else if (typeId == IGNITE_TYPE_TIMESTAMP)
return Date(BinaryUtils::ReadTimestamp(stream).GetMilliseconds());
else if (typeId == IGNITE_HDR_NULL)
- return Date();
+ return BinaryUtils::GetDefaultValue<Date>();
else {
int32_t pos = stream->Position() - 1;
@@ -763,7 +771,7 @@ namespace ignite
if (typeId == IGNITE_TYPE_TIMESTAMP)
return BinaryUtils::ReadTimestamp(stream);
else if (typeId == IGNITE_HDR_NULL)
- return Timestamp();
+ return BinaryUtils::GetDefaultValue<Timestamp>();
else {
int32_t pos = stream->Position() - 1;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
index 8b6ebb9..4042fa2 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
@@ -68,14 +68,25 @@ namespace ignite
/**
* Internal out-in operation.
+ * Uses two independent memory pieces to write and read data.
*
* @param opType Operation type.
* @param inOp Input.
* @param outOp Output.
* @param err Error.
*/
- void OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp,
- IgniteError* err);
+ void OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err);
+
+ /**
+ * Internal out-in operation.
+ * Uses single memory piece to write and read data.
+ *
+ * @param opType Operation type.
+ * @param inOp Input.
+ * @param outOp Output.
+ * @param err Error.
+ */
+ void OutInOpX(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err);
/**
* Get environment shared pointer.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/core/include/ignite/impl/operations.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/operations.h b/modules/platforms/cpp/core/include/ignite/impl/operations.h
index ed01ece..a8fef93 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/operations.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/operations.h
@@ -27,6 +27,7 @@
#include "ignite/cache/cache_entry.h"
#include "ignite/impl/binary/binary_reader_impl.h"
#include "ignite/impl/binary/binary_writer_impl.h"
+#include "ignite/impl/binary/binary_utils.h"
#include "ignite/binary/binary.h"
namespace ignite
@@ -270,7 +271,12 @@ namespace ignite
*
* @param reader Reader.
*/
- virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) = 0;
+ virtual void ProcessOutput(binary::BinaryReaderImpl& reader) = 0;
+
+ /**
+ * Sets result to null value.
+ */
+ virtual void SetNull() = 0;
};
/**
@@ -288,11 +294,16 @@ namespace ignite
// No-op.
}
- virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader)
+ virtual void ProcessOutput(binary::BinaryReaderImpl& reader)
{
val = reader.ReadTopObject<T>();
}
+ virtual void SetNull()
+ {
+ val = binary::BinaryUtils::GetDefaultValue<T>();
+ }
+
/**
* Get value.
*
@@ -324,12 +335,18 @@ namespace ignite
// No-op.
}
- virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader)
+ virtual void ProcessOutput(binary::BinaryReaderImpl& reader)
{
val1 = reader.ReadTopObject<T1>();
val2 = reader.ReadTopObject<T2>();
}
+ virtual void SetNull()
+ {
+ val1 = binary::BinaryUtils::GetDefaultValue<T1>();
+ val2 = binary::BinaryUtils::GetDefaultValue<T2>();
+ }
+
/**
* Get value 1.
*
@@ -375,7 +392,7 @@ namespace ignite
// No-op.
}
- virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader)
+ virtual void ProcessOutput(binary::BinaryReaderImpl& reader)
{
val1 = reader.ReadTopObject<T1>();
val2 = reader.ReadTopObject<T2>();
@@ -383,6 +400,14 @@ namespace ignite
val4 = reader.ReadTopObject<T4>();
}
+ virtual void SetNull()
+ {
+ val1 = binary::BinaryUtils::GetDefaultValue<T1>();
+ val2 = binary::BinaryUtils::GetDefaultValue<T2>();
+ val3 = binary::BinaryUtils::GetDefaultValue<T3>();
+ val4 = binary::BinaryUtils::GetDefaultValue<T4>();
+ }
+
/**
* Get value 1.
*
@@ -454,7 +479,7 @@ namespace ignite
// No-op.
}
- virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader)
+ virtual void ProcessOutput(binary::BinaryReaderImpl& reader)
{
bool exists = reader.GetStream()->ReadBool();
@@ -475,6 +500,11 @@ namespace ignite
}
}
+ virtual void SetNull()
+ {
+ // No-op.
+ }
+
/**
* Get value.
*
@@ -506,7 +536,7 @@ namespace ignite
// No-op.
}
- virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader)
+ virtual void ProcessOutput(binary::BinaryReaderImpl& reader)
{
int32_t cnt = reader.ReadInt32();
@@ -519,6 +549,11 @@ namespace ignite
}
}
+ virtual void SetNull()
+ {
+ res->clear();
+ }
+
private:
/** Entries. */
std::vector<ignite::cache::CacheEntry<K, V>>* res;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
index e728f55..8197187 100644
--- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp
@@ -156,17 +156,17 @@ namespace ignite
void CacheImpl::LocalPeek(InputOperation& inOp, OutputOperation& outOp, int32_t peekModes, IgniteError* err)
{
- OutInOp(OP_LOCAL_PEEK, inOp, outOp, err);
+ OutInOpX(OP_LOCAL_PEEK, inOp, outOp, err);
}
void CacheImpl::Get(InputOperation& inOp, OutputOperation& outOp, IgniteError* err)
{
- OutInOp(OP_GET, inOp, outOp, err);
+ OutInOpX(OP_GET, inOp, outOp, err);
}
void CacheImpl::GetAll(InputOperation& inOp, OutputOperation& outOp, IgniteError* err)
{
- OutInOp(OP_GET_ALL, inOp, outOp, err);
+ OutInOpX(OP_GET_ALL, inOp, outOp, err);
}
void CacheImpl::Put(InputOperation& inOp, IgniteError* err)
@@ -181,17 +181,17 @@ namespace ignite
void CacheImpl::GetAndPut(InputOperation& inOp, OutputOperation& outOp, IgniteError* err)
{
- OutInOp(OP_GET_AND_PUT, inOp, outOp, err);
+ OutInOpX(OP_GET_AND_PUT, inOp, outOp, err);
}
void CacheImpl::GetAndReplace(InputOperation& inOp, OutputOperation& outOp, IgniteError* err)
{
- OutInOp(OP_GET_AND_REPLACE, inOp, outOp, err);
+ OutInOpX(OP_GET_AND_REPLACE, inOp, outOp, err);
}
void CacheImpl::GetAndRemove(InputOperation& inOp, OutputOperation& outOp, IgniteError* err)
{
- OutInOp(OP_GET_AND_REMOVE, inOp, outOp, err);
+ OutInOpX(OP_GET_AND_REMOVE, inOp, outOp, err);
}
bool CacheImpl::PutIfAbsent(InputOperation& inOp, IgniteError* err)
@@ -201,7 +201,7 @@ namespace ignite
void CacheImpl::GetAndPutIfAbsent(InputOperation& inOp, OutputOperation& outOp, IgniteError* err)
{
- OutInOp(OP_GET_AND_PUT_IF_ABSENT, inOp, outOp, err);
+ OutInOpX(OP_GET_AND_PUT_IF_ABSENT, inOp, outOp, err);
}
bool CacheImpl::Replace(InputOperation& inOp, IgniteError* err)
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
index 05764c7..5d17214 100644
--- a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
+++ b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
@@ -31,6 +31,21 @@ namespace ignite
{
namespace interop
{
+ /**
+ * Operation result.
+ */
+ enum OperationResult
+ {
+ /** Null. */
+ ResultNull = 0,
+
+ /** Object. */
+ ResultObject = 1,
+
+ /** Error. */
+ ResultError = -1
+ };
+
InteropTarget::InteropTarget(SharedPointer<IgniteEnvironment> env, jobject javaRef) :
env(env), javaRef(javaRef)
{
@@ -116,8 +131,7 @@ namespace ignite
return false;
}
- void InteropTarget::OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp,
- IgniteError* err)
+ void InteropTarget::OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err)
{
JniErrorInfo jniErr;
@@ -137,6 +151,29 @@ namespace ignite
ReadFrom(inMem.Get(), outOp);
}
}
+
+ void InteropTarget::OutInOpX(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err)
+ {
+ JniErrorInfo jniErr;
+
+ SharedPointer<InteropMemory> outInMem = env.Get()->AllocateMemory();
+
+ int64_t outInPtr = WriteTo(outInMem.Get(), inOp, err);
+
+ if (outInPtr)
+ {
+ int64_t res = env.Get()->Context()->TargetInStreamOutLong(javaRef, opType, outInPtr, &jniErr);
+
+ IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+ if (jniErr.code == IGNITE_JNI_ERR_SUCCESS && res == ResultObject)
+ ReadFrom(outInMem.Get(), outOp);
+ else if (res == ResultNull)
+ outOp.SetNull();
+
+ //Read and process error if res == ResultError here.
+ }
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
index 6c01332..fed43fc 100644
--- a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
@@ -145,6 +145,11 @@ namespace ignite
val = TransactionMetrics(commitTime, rollbackTime, commits, rollbacks);
}
+ virtual void SetNull()
+ {
+ // No-op.
+ }
+
/**
* Get value.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs
index 5d8e78a..40ae01e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Benchmarks
using System.Diagnostics;
using System.Text;
using Apache.Ignite.Benchmarks.Binary;
+ using Apache.Ignite.Benchmarks.Interop;
/// <summary>
/// Benchmark runner.
@@ -35,8 +36,8 @@ namespace Apache.Ignite.Benchmarks
public static void Main(string[] args)
{
args = new[] {
- typeof(BinarizableReadBenchmark).FullName,
- "-ConfigPath", @"modules\platforms\dotnet\Apache.Ignite.Benchmarks\Config\benchmark.xml",
+ typeof(GetBenchmark).FullName,
+ "-ConfigPath", @"C:\W\incubator-ignite\modules\platforms\dotnet\Apache.Ignite.Benchmarks\Config\benchmark.xml",
"-Threads", "1",
"-Warmup", "0",
"-Duration", "60",
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs
index eeebed0..f437eb8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs
@@ -66,7 +66,7 @@ namespace Apache.Ignite.Benchmarks.Interop
"-DIGNITE_QUIET=false",
"-DIGNITE_NO_SHUTDOWN_HOOK=true"
},
- JvmClasspath = Classpath ?? Core.Impl.Common.Classpath.CreateClasspath(),
+ JvmClasspath = Classpath ?? Core.Impl.Common.Classpath.CreateClasspath(forceTestClasspath: true),
JvmDllPath = DllPath,
SpringConfigUrl = ConfigPath
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
index 5fb2cdd..9fd1f1d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
@@ -2952,7 +2952,7 @@ namespace Apache.Ignite.Core.Tests.Cache
Assert.IsInstanceOf<CacheEntryProcessorException>(ex);
if (string.IsNullOrEmpty(containsText))
- Assert.AreEqual(ex.InnerException.Message, AddArgCacheEntryProcessor.ExceptionText);
+ Assert.AreEqual(AddArgCacheEntryProcessor.ExceptionText, ex.InnerException.Message);
else
Assert.IsTrue(ex.ToString().Contains(containsText));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
index 8061e9f..d39ccde 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
@@ -178,13 +178,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
[TearDown]
public void AfterTest()
{
+ CacheTestStore.Reset();
+
var cache = GetCache();
cache.Clear();
- Assert.IsTrue(cache.IsEmpty(), "Cache is not empty: " + cache.GetSize());
-
- CacheTestStore.Reset();
+ Assert.IsTrue(cache.IsEmpty(),
+ "Cache is not empty: " +
+ string.Join(", ", cache.Select(x => string.Format("[{0}:{1}]", x.Key, x.Value))));
TestUtils.AssertHandleRegistryHasItems(300, _storeCount, Ignition.GetIgnite(GridName));
@@ -210,6 +212,11 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
// Test exception in filter
Assert.Throws<CacheStoreException>(() => cache.LoadCache(new ExceptionalEntryFilter(), 100, 10));
+
+ // Test exception in store
+ CacheTestStore.ThrowError = true;
+ CheckCustomStoreError(Assert.Throws<CacheStoreException>(() =>
+ cache.LoadCache(new CacheEntryFilter(), 100, 10)).InnerException);
}
[Test]
@@ -262,6 +269,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
{
Assert.AreEqual("val_" + i, cache.GetAsync(i).Result);
}
+
+ // Test errors
+ CacheTestStore.ThrowError = true;
+ CheckCustomStoreError(
+ Assert.Throws<AggregateException>(
+ () => cache.LocalLoadCacheAsync(new CacheEntryFilter(), 100, 10).Wait())
+ .InnerException);
}
[Test]
@@ -282,6 +296,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
Assert.AreEqual("val", cache.Get(1));
Assert.AreEqual(1, cache.GetSize());
+
+ // Test errors
+ CacheTestStore.ThrowError = true;
+ CheckCustomStoreError(Assert.Throws<CacheStoreException>(() => cache.Put(-2, "fail")).InnerException);
+
+ cache.LocalEvict(new[] { 1 });
+ CheckCustomStoreError(Assert.Throws<CacheStoreException>(() => cache.Get(1)).InnerException);
}
[Test]
@@ -418,8 +439,6 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
using (var tx = cache.Ignite.GetTransactions().TxStart())
{
- CacheTestStore.ExpCommit = true;
-
tx.AddMeta("meta", 100);
cache.Put(1, "val");
@@ -549,6 +568,16 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
return Ignition.GetIgnite(GridName).GetOrCreateCache<int, string>(cacheName);
}
+
+ private static void CheckCustomStoreError(Exception err)
+ {
+ var customErr = err as CacheTestStore.CustomStoreException ??
+ err.InnerException as CacheTestStore.CustomStoreException;
+
+ Assert.IsNotNull(customErr);
+
+ Assert.AreEqual(customErr.Message, customErr.Details);
+ }
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
index b4b1670..4224835 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
@@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
+ using System.Runtime.Serialization;
using System.Threading;
using Apache.Ignite.Core.Cache.Store;
using Apache.Ignite.Core.Resource;
@@ -32,12 +33,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
{
public static readonly IDictionary Map = new ConcurrentDictionary<object, object>();
- public static bool ExpCommit;
-
public static bool LoadMultithreaded;
public static bool LoadObjects;
+ public static bool ThrowError;
+
[InstanceResource]
private IIgnite _grid = null;
@@ -54,13 +55,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
{
Map.Clear();
- ExpCommit = false;
LoadMultithreaded = false;
LoadObjects = false;
+ ThrowError = false;
}
public void LoadCache(Action<object, object> act, params object[] args)
{
+ ThrowIfNeeded();
+
Debug.Assert(_grid != null);
if (LoadMultithreaded)
@@ -91,6 +94,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
public object Load(object key)
{
+ ThrowIfNeeded();
+
Debug.Assert(_grid != null);
return Map[key];
@@ -98,6 +103,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
public IDictionary LoadAll(ICollection keys)
{
+ ThrowIfNeeded();
+
Debug.Assert(_grid != null);
return keys.OfType<object>().ToDictionary(key => key, key => "val_" + key);
@@ -105,6 +112,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
public void Write(object key, object val)
{
+ ThrowIfNeeded();
+
Debug.Assert(_grid != null);
Map[key] = val;
@@ -112,6 +121,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
public void WriteAll(IDictionary map)
{
+ ThrowIfNeeded();
+
Debug.Assert(_grid != null);
foreach (DictionaryEntry e in map)
@@ -120,6 +131,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
public void Delete(object key)
{
+ ThrowIfNeeded();
+
Debug.Assert(_grid != null);
Map.Remove(key);
@@ -127,6 +140,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
public void DeleteAll(ICollection keys)
{
+ ThrowIfNeeded();
+
Debug.Assert(_grid != null);
foreach (object key in keys)
@@ -151,5 +166,34 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
get { return stringProperty; }
set { stringProperty = value; }
}
+
+ private static void ThrowIfNeeded()
+ {
+ if (ThrowError)
+ throw new CustomStoreException("Exception in cache store");
+ }
+
+ [Serializable]
+ public class CustomStoreException : Exception, ISerializable
+ {
+ public string Details { get; private set; }
+
+ public CustomStoreException(string message) : base(message)
+ {
+ Details = message;
+ }
+
+ protected CustomStoreException(SerializationInfo info, StreamingContext ctx) : base(info, ctx)
+ {
+ Details = info.GetString("details");
+ }
+
+ public override void GetObjectData(SerializationInfo info, StreamingContext context)
+ {
+ info.AddValue("details", Details);
+
+ base.GetObjectData(info, context);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index 32c59de..8ba3e29 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -271,7 +271,7 @@ namespace Apache.Ignite.Core.Impl.Cache
/// </summary>
private void LoadCache0(ICacheEntryFilter<TK, TV> p, object[] args, int opId)
{
- DoOutOp(opId, writer =>
+ DoOutInOpX(opId, writer =>
{
if (p != null)
{
@@ -284,7 +284,7 @@ namespace Apache.Ignite.Core.Impl.Cache
writer.WriteObject<CacheEntryFilterHolder>(null);
writer.WriteArray(args);
- });
+ }, ReadException);
}
/** <inheritDoc /> */
@@ -296,7 +296,7 @@ namespace Apache.Ignite.Core.Impl.Cache
/** <inheritDoc /> */
public Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues)
{
- return GetFuture<object>((futId, futTyp) => DoOutOp((int) CacheOp.LoadAll, writer =>
+ return GetFuture<object>((futId, futTyp) => DoOutOp(CacheOp.LoadAll, writer =>
{
writer.WriteLong(futId);
writer.WriteBoolean(replaceExistingValues);
@@ -309,7 +309,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutOp((int)CacheOp.ContainsKey, key) == True;
+ return DoOutOp(CacheOp.ContainsKey, key);
}
/** <inheritDoc /> */
@@ -325,7 +325,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutOp((int)CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys)) == True;
+ return DoOutOp(CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys));
}
/** <inheritDoc /> */
@@ -354,11 +354,14 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- var res = DoOutInOpNullable<TV>((int)CacheOp.Peek, writer =>
- {
- writer.Write(key);
- writer.WriteInt(EncodePeekModes(modes));
- });
+ var res = DoOutInOpX((int)CacheOp.Peek,
+ w =>
+ {
+ w.Write(key);
+ w.WriteInt(EncodePeekModes(modes));
+ },
+ (s, r) => r == True ? new CacheResult<TV>(Unmarshal<TV>(s)) : new CacheResult<TV>(),
+ ReadException);
value = res.Success ? res.Value : default(TV);
@@ -389,19 +392,22 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- var result = DoOutInOpNullable<TK, TV>((int) CacheOp.Get, key);
-
- if (!IsAsync)
- {
- if (!result.Success)
- throw GetKeyNotFoundException();
+ return DoOutInOpX((int) CacheOp.Get,
+ w => w.Write(key),
+ (stream, res) =>
+ {
+ if (res == True) // Not null
+ {
+ Debug.Assert(!IsAsync);
- return result.Value;
- }
+ return Unmarshal<TV>(stream);
+ }
- Debug.Assert(!result.Success);
+ if (!IsAsync)
+ throw GetKeyNotFoundException();
- return default(TV);
+ return default(TV);
+ }, ReadException);
}
/** <inheritDoc /> */
@@ -426,7 +432,7 @@ namespace Apache.Ignite.Core.Impl.Cache
if (IsAsync)
throw new InvalidOperationException("TryGet can't be used in async mode.");
- var res = DoOutInOpNullable<TK, TV>((int) CacheOp.Get, key);
+ var res = DoOutInOpNullable(CacheOp.Get, key);
value = res.Value;
@@ -448,14 +454,10 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutInOp((int)CacheOp.GetAll,
+ return DoOutInOpX((int) CacheOp.GetAll,
writer => WriteEnumerable(writer, keys),
- input =>
- {
- var reader = Marshaller.StartUnmarshal(input, _flagKeepBinary);
-
- return ReadGetAllDictionary(reader);
- });
+ (s, r) => r == True ? ReadGetAllDictionary(Marshaller.StartUnmarshal(s, _flagKeepBinary)) : null,
+ ReadException);
}
/** <inheritDoc /> */
@@ -473,7 +475,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(val, "val");
- DoOutOp((int)CacheOp.Put, key, val);
+ DoOutOp(CacheOp.Put, key, val);
}
/** <inheritDoc /> */
@@ -491,7 +493,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpNullable<TK, TV, TV>((int)CacheOp.GetAndPut, key, val);
+ return DoOutInOpNullable(CacheOp.GetAndPut, key, val);
}
/** <inheritDoc /> */
@@ -509,7 +511,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpNullable<TK, TV, TV>((int) CacheOp.GetAndReplace, key, val);
+ return DoOutInOpNullable(CacheOp.GetAndReplace, key, val);
}
/** <inheritDoc /> */
@@ -525,7 +527,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpNullable<TK, TV>((int)CacheOp.GetAndRemove, key);
+ return DoOutInOpNullable(CacheOp.GetAndRemove, key);
}
/** <inheritDoc /> */
@@ -543,7 +545,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutOp((int) CacheOp.PutIfAbsent, key, val) == True;
+ return DoOutOp(CacheOp.PutIfAbsent, key, val);
}
/** <inheritDoc /> */
@@ -561,7 +563,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpNullable<TK, TV, TV>((int)CacheOp.GetAndPutIfAbsent, key, val);
+ return DoOutInOpNullable(CacheOp.GetAndPutIfAbsent, key, val);
}
/** <inheritDoc /> */
@@ -579,7 +581,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutOp((int) CacheOp.Replace2, key, val) == True;
+ return DoOutOp(CacheOp.Replace2, key, val);
}
/** <inheritDoc /> */
@@ -599,7 +601,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(newVal, "newVal");
- return DoOutOp((int)CacheOp.Replace3, key, oldVal, newVal) == True;
+ return DoOutOp(CacheOp.Replace3, key, oldVal, newVal);
}
/** <inheritDoc /> */
@@ -615,7 +617,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(vals, "vals");
- DoOutOp((int) CacheOp.PutAll, writer => WriteDictionary(writer, vals));
+ DoOutOp(CacheOp.PutAll, writer => WriteDictionary(writer, vals));
}
/** <inheritDoc /> */
@@ -631,7 +633,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp((int) CacheOp.LocEvict, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.LocEvict, writer => WriteEnumerable(writer, keys));
}
/** <inheritdoc /> */
@@ -653,7 +655,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- DoOutOp((int) CacheOp.Clear, key);
+ DoOutOp(CacheOp.Clear, key);
}
/** <inheritDoc /> */
@@ -669,7 +671,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp((int)CacheOp.ClearAll, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.ClearAll, writer => WriteEnumerable(writer, keys));
}
/** <inheritDoc /> */
@@ -685,7 +687,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- DoOutOp((int) CacheOp.LocalClear, key);
+ DoOutOp(CacheOp.LocalClear, key);
}
/** <inheritdoc /> */
@@ -693,7 +695,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp((int)CacheOp.LocalClearAll, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.LocalClearAll, writer => WriteEnumerable(writer, keys));
}
/** <inheritdoc /> */
@@ -701,7 +703,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutOp((int) CacheOp.RemoveObj, key) == True;
+ return DoOutOp(CacheOp.RemoveObj, key);
}
/** <inheritDoc /> */
@@ -719,7 +721,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutOp((int)CacheOp.RemoveBool, key, val) == True;
+ return DoOutOp(CacheOp.RemoveBool, key, val);
}
/** <inheritDoc /> */
@@ -735,7 +737,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp((int)CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys));
}
/** <inheritDoc /> */
@@ -798,7 +800,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp((int)CacheOp.LocPromote, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.LocPromote, writer => WriteEnumerable(writer, keys));
}
/** <inheritdoc /> */
@@ -811,12 +813,14 @@ namespace Apache.Ignite.Core.Impl.Cache
var holder = new CacheEntryProcessorHolder(processor, arg,
(e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
- return DoOutInOp((int)CacheOp.Invoke, writer =>
- {
- writer.Write(key);
- writer.Write(holder);
- },
- input => GetResultOrThrow<TRes>(Unmarshal<object>(input)));
+ return DoOutInOpX((int) CacheOp.Invoke,
+ writer =>
+ {
+ writer.Write(key);
+ writer.Write(holder);
+ },
+ (input, res) => res == True ? Unmarshal<TRes>(input) : default(TRes),
+ ReadException);
}
/** <inheritDoc /> */
@@ -849,17 +853,19 @@ namespace Apache.Ignite.Core.Impl.Cache
var holder = new CacheEntryProcessorHolder(processor, arg,
(e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
- return DoOutInOp((int) CacheOp.InvokeAll,
+ return DoOutInOpX((int) CacheOp.InvokeAll,
writer =>
{
WriteEnumerable(writer, keys);
writer.Write(holder);
},
- input => ReadInvokeAllResults<TRes>(input));
+ (input, res) => res == True ? ReadInvokeAllResults<TRes>(input) : null,
+ ReadException);
}
/** <inheritDoc /> */
- public Task<IDictionary<TK, ICacheEntryProcessorResult<TRes>>> InvokeAllAsync<TArg, TRes>(IEnumerable<TK> keys, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
+ public Task<IDictionary<TK, ICacheEntryProcessorResult<TRes>>> InvokeAllAsync<TArg, TRes>(IEnumerable<TK> keys,
+ ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
{
AsyncInstance.InvokeAll(keys, processor, arg);
@@ -871,10 +877,8 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOp((int)CacheOp.Lock, writer =>
- {
- writer.Write(key);
- }, input => new CacheLock(input.ReadInt(), Target));
+ return DoOutInOpX((int) CacheOp.Lock, w => w.Write(key),
+ (stream, res) => new CacheLock(res, Target), ReadException);
}
/** <inheritdoc /> */
@@ -882,10 +886,8 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutInOp((int)CacheOp.LockAll, writer =>
- {
- WriteEnumerable(writer, keys);
- }, input => new CacheLock(input.ReadInt(), Target));
+ return DoOutInOpX((int) CacheOp.LockAll, w => WriteEnumerable(w, keys),
+ (stream, res) => new CacheLock(res, Target), ReadException);
}
/** <inheritdoc /> */
@@ -893,11 +895,11 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutOp((int)CacheOp.IsLocalLocked, writer =>
+ return DoOutOp(CacheOp.IsLocalLocked, writer =>
{
writer.Write(key);
writer.WriteBoolean(byCurrentThread);
- }) == True;
+ });
}
/** <inheritDoc /> */
@@ -1159,22 +1161,6 @@ namespace Apache.Ignite.Core.Impl.Cache
}
/// <summary>
- /// Unwraps an exception.
- /// </summary>
- /// <typeparam name="T">Result type.</typeparam>
- /// <param name="obj">Object.</param>
- /// <returns>Result.</returns>
- private static T GetResultOrThrow<T>(object obj)
- {
- var err = obj as Exception;
-
- if (err != null)
- throw err as CacheEntryProcessorException ?? new CacheEntryProcessorException(err);
-
- return obj == null ? default(T) : (T) obj;
- }
-
- /// <summary>
/// Reads results of InvokeAll operation.
/// </summary>
/// <typeparam name="T">The type of the result.</typeparam>
@@ -1208,9 +1194,11 @@ namespace Apache.Ignite.Core.Impl.Cache
/// </summary>
/// <param name="inStream">The stream.</param>
/// <returns>Exception.</returns>
- private CacheEntryProcessorException ReadException(IBinaryStream inStream)
+ private Exception ReadException(IBinaryStream inStream)
{
- var item = Unmarshal<object>(inStream);
+ var reader = Marshaller.StartUnmarshal(inStream, _flagKeepBinary);
+
+ var item = reader.ReadObject<object>();
var clsName = item as string;
@@ -1219,8 +1207,9 @@ namespace Apache.Ignite.Core.Impl.Cache
var msg = Unmarshal<string>(inStream);
var trace = Unmarshal<string>(inStream);
-
- return new CacheEntryProcessorException(ExceptionUtils.GetException(_ignite, clsName, msg, trace));
+ var inner = reader.ReadBoolean() ? reader.ReadObject<Exception>() : null;
+
+ return ExceptionUtils.GetException(_ignite, clsName, msg, trace, reader, inner);
}
/// <summary>
@@ -1272,49 +1261,73 @@ namespace Apache.Ignite.Core.Impl.Cache
}
/// <summary>
- /// Perform simple out-in operation accepting single argument.
+ /// Does the out op.
/// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="val">Value.</param>
- /// <returns>Result.</returns>
- private CacheResult<TR> DoOutInOpNullable<T1, TR>(int type, T1 val)
+ private bool DoOutOp<T1>(CacheOp op, T1 x)
{
- var res = DoOutInOp<T1, object>(type, val);
+ return DoOutInOpX((int) op, w =>
+ {
+ w.Write(x);
+ }, ReadException);
+ }
- return res == null
- ? new CacheResult<TR>()
- : new CacheResult<TR>((TR)res);
+ /// <summary>
+ /// Does the out op.
+ /// </summary>
+ private bool DoOutOp<T1, T2>(CacheOp op, T1 x, T2 y)
+ {
+ return DoOutInOpX((int) op, w =>
+ {
+ w.Write(x);
+ w.Write(y);
+ }, ReadException);
}
/// <summary>
- /// Perform out-in operation.
+ /// Does the out op.
/// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="outAction">Out action.</param>
- /// <returns>Result.</returns>
- private CacheResult<TR> DoOutInOpNullable<TR>(int type, Action<BinaryWriter> outAction)
+ private bool DoOutOp<T1, T2, T3>(CacheOp op, T1 x, T2 y, T3 z)
{
- var res = DoOutInOp<object>(type, outAction);
+ return DoOutInOpX((int) op, w =>
+ {
+ w.Write(x);
+ w.Write(y);
+ w.Write(z);
+ }, ReadException);
+ }
- return res == null
- ? new CacheResult<TR>()
- : new CacheResult<TR>((TR)res);
+ /// <summary>
+ /// Does the out op.
+ /// </summary>
+ private bool DoOutOp(CacheOp op, Action<BinaryWriter> write)
+ {
+ return DoOutInOpX((int) op, write, ReadException);
}
/// <summary>
- /// Perform simple out-in operation accepting single argument.
+ /// Does the out-in op.
/// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="val1">Value.</param>
- /// <param name="val2">Value.</param>
- /// <returns>Result.</returns>
- private CacheResult<TR> DoOutInOpNullable<T1, T2, TR>(int type, T1 val1, T2 val2)
+ private CacheResult<TV> DoOutInOpNullable(CacheOp cacheOp, TK x)
{
- var res = DoOutInOp<T1, T2, object>(type, val1, val2);
+ return DoOutInOpX((int)cacheOp,
+ w => w.Write(x),
+ (stream, res) => res == True ? new CacheResult<TV>(Unmarshal<TV>(stream)) : new CacheResult<TV>(),
+ ReadException);
+ }
- return res == null
- ? new CacheResult<TR>()
- : new CacheResult<TR>((TR)res);
+ /// <summary>
+ /// Does the out-in op.
+ /// </summary>
+ private CacheResult<TV> DoOutInOpNullable<T1, T2>(CacheOp cacheOp, T1 x, T2 y)
+ {
+ return DoOutInOpX((int)cacheOp,
+ w =>
+ {
+ w.Write(x);
+ w.Write(y);
+ },
+ (stream, res) => res == True ? new CacheResult<TV>(Unmarshal<TV>(stream)) : new CacheResult<TV>(),
+ ReadException);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
index a8c1471..a59ca5f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs
@@ -115,10 +115,14 @@ namespace Apache.Ignite.Core.Impl
/// <param name="msg">Exception message.</param>
/// <param name="stackTrace">Native stack trace.</param>
/// <param name="reader">Error data reader.</param>
+ /// <param name="innerException">Inner exception.</param>
/// <returns>Exception.</returns>
- public static Exception GetException(IIgnite ignite, string clsName, string msg, string stackTrace, BinaryReader reader = null)
+ public static Exception GetException(IIgnite ignite, string clsName, string msg, string stackTrace,
+ BinaryReader reader = null, Exception innerException = null)
{
- Exception innerException = string.IsNullOrEmpty(stackTrace) ? null : new JavaException(stackTrace);
+ // Set JavaException as inner only if there is no InnerException.
+ if (innerException == null && !string.IsNullOrEmpty(stackTrace))
+ innerException = new JavaException(stackTrace);
ExceptionFactoryDelegate ctor;
@@ -158,7 +162,7 @@ namespace Apache.Ignite.Core.Impl
/// <param name="reader">Reader.</param>
/// <returns>CachePartialUpdateException.</returns>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
- private static Exception ProcessCachePartialUpdateException(IIgnite ignite, string msg, string stackTrace,
+ private static Exception ProcessCachePartialUpdateException(IIgnite ignite, string msg, string stackTrace,
BinaryReader reader)
{
if (reader == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index f757cbc..ac613c6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@ -43,6 +43,9 @@ namespace Apache.Ignite.Core.Impl
protected const int True = 1;
/** */
+ protected const int Error = -1;
+
+ /** */
private const int OpMeta = -1;
/** */
@@ -470,7 +473,82 @@ namespace Apache.Ignite.Core.Impl
}
}
}
-
+
+ /// <summary>
+ /// Perform out-in operation with a single stream.
+ /// </summary>
+ /// <typeparam name="TR">The type of the r.</typeparam>
+ /// <param name="type">Operation type.</param>
+ /// <param name="outAction">Out action.</param>
+ /// <param name="inAction">In action.</param>
+ /// <param name="inErrorAction">The action to read an error.</param>
+ /// <returns>
+ /// Result.
+ /// </returns>
+ protected TR DoOutInOpX<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, long, TR> inAction,
+ Func<IBinaryStream, Exception> inErrorAction)
+ {
+ Debug.Assert(inErrorAction != null);
+
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ var writer = _marsh.StartMarshal(stream);
+
+ outAction(writer);
+
+ FinishMarshal(writer);
+
+ var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
+
+ if (res != Error && inAction == null)
+ return default(TR); // quick path for void operations
+
+ stream.SynchronizeInput();
+
+ stream.Seek(0, SeekOrigin.Begin);
+
+ if (res != Error)
+ return inAction != null ? inAction(stream, res) : default(TR);
+
+ throw inErrorAction(stream);
+ }
+ }
+
+ /// <summary>
+ /// Perform out-in operation with a single stream.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="outAction">Out action.</param>
+ /// <param name="inErrorAction">The action to read an error.</param>
+ /// <returns>
+ /// Result.
+ /// </returns>
+ protected bool DoOutInOpX(int type, Action<BinaryWriter> outAction,
+ Func<IBinaryStream, Exception> inErrorAction)
+ {
+ Debug.Assert(inErrorAction != null);
+
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ var writer = _marsh.StartMarshal(stream);
+
+ outAction(writer);
+
+ FinishMarshal(writer);
+
+ var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
+
+ if (res != Error)
+ return res == True;
+
+ stream.SynchronizeInput();
+
+ stream.Seek(0, SeekOrigin.Begin);
+
+ throw inErrorAction(stream);
+ }
+ }
+
/// <summary>
/// Perform out-in operation.
/// </summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index 2f70426..c9284d5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
+ using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using Apache.Ignite.Core.Cache.Affinity;
@@ -328,6 +329,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
}, true);
}
+ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
private int CacheStoreInvoke(void* target, long objPtr, long memPtr, void* cb)
{
@@ -342,7 +344,18 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- return t.Invoke(stream, cb0, _ignite);
+ try
+ {
+ return t.Invoke(stream, cb0, _ignite);
+ }
+ catch (Exception e)
+ {
+ stream.Seek(0, SeekOrigin.Begin);
+
+ _ignite.Marshaller.StartMarshal(stream).WriteObject(e);
+
+ return -1;
+ }
}
});
}
@@ -772,8 +785,9 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
string errCls = reader.ReadString();
string errMsg = reader.ReadString();
string stackTrace = reader.ReadString();
+ Exception inner = reader.ReadBoolean() ? reader.ReadObject<Exception>() : null;
- Exception err = ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, reader);
+ Exception err = ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, reader, inner);
ProcessFuture(futPtr, fut => { fut.OnError(err); });
}
@@ -1100,7 +1114,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
// Stream disposal intentionally omitted: IGNITE-1598
var stream = new PlatformRawMemory(errData, errDataLen).GetStream();
- throw ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace,
+ throw ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace,
_ignite.Marshaller.StartUnmarshal(stream));
}