You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/26 11:47:15 UTC
ignite git commit: Moved platform future utils to Ignite.
Repository: ignite
Updated Branches:
refs/heads/master bcf3054b0 -> cdf82e942
Moved platform future utils to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cdf82e94
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cdf82e94
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cdf82e94
Branch: refs/heads/master
Commit: cdf82e942d0a2577cd077faf5c60dcc3bec4886a
Parents: bcf3054
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Aug 26 12:47:55 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Aug 26 12:47:55 2015 +0300
----------------------------------------------------------------------
.../platform/PlatformExtendedException.java | 39 +++
.../platform/utils/PlatformFutureUtils.java | 326 +++++++++++++++++++
.../platform/utils/PlatformUtils.java | 54 +++
3 files changed, 419 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cdf82e94/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java
new file mode 100644
index 0000000..80b1703
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.apache.ignite.internal.portable.*;
+
+/**
+ * Denotes an exception which has some data to be written in a special manner.
+ */
+public interface PlatformExtendedException {
+ /**
+ * Gets platform context.
+ *
+ * @return Platform context.
+ */
+ public PlatformContext context();
+
+ /**
+ * Write data.
+ *
+ * @param writer Writer.
+ */
+ public void writeData(PortableRawWriterEx writer);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cdf82e94/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
new file mode 100644
index 0000000..fa986fe
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.utils;
+
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.callback.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Interop future utils.
+ */
+public class PlatformFutureUtils {
+ /** Future type: byte. */
+ public static final int TYP_BYTE = 1;
+
+ /** Future type: boolean. */
+ public static final int TYP_BOOL = 2;
+
+ /** Future type: short. */
+ public static final int TYP_SHORT = 3;
+
+ /** Future type: char. */
+ public static final int TYP_CHAR = 4;
+
+ /** Future type: int. */
+ public static final int TYP_INT = 5;
+
+ /** Future type: float. */
+ public static final int TYP_FLOAT = 6;
+
+ /** Future type: long. */
+ public static final int TYP_LONG = 7;
+
+ /** Future type: double. */
+ public static final int TYP_DOUBLE = 8;
+
+ /** Future type: object. */
+ public static final int TYP_OBJ = 9;
+
+ /**
+ * Listen future.
+ *
+ * @param ctx Interop context.
+ * @param fut Java future.
+ * @param futPtr Native future pointer.
+ * @param typ Expected return type.
+ */
+ public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ) {
+ listen(ctx, new FutureListenable(fut), futPtr, typ, null);
+ }
+
+ /**
+ * Listen future.
+ *
+ * @param ctx Interop context.
+ * @param fut Java future.
+ * @param futPtr Native future pointer.
+ * @param typ Expected return type.
+ * @param writer Writer.
+ */
+ public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ,
+ Writer writer) {
+ listen(ctx, new FutureListenable(fut), futPtr, typ, writer);
+ }
+
+ /**
+ * Listen future.
+ *
+ * @param ctx Interop context.
+ * @param fut Java future.
+ * @param futPtr Native future pointer.
+ * @param writer Writer.
+ */
+ public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, Writer writer) {
+ listen(ctx, new FutureListenable(fut), futPtr, TYP_OBJ, writer);
+ }
+
+ /**
+ * Listen future.
+ *
+ * @param ctx Interop context.
+ * @param listenable Listenable entry.
+ * @param futPtr Native future pointer.
+ * @param typ Expected return type.
+ * @param writer Optional writer.
+ */
+ @SuppressWarnings("unchecked")
+ private static void listen(final PlatformContext ctx, Listenable listenable, final long futPtr, final int typ,
+ @Nullable final Writer writer) {
+ final PlatformCallbackGateway gate = ctx.gateway();
+
+ listenable.listen(new IgniteBiInClosure<Object, Throwable>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public void apply(Object res, Throwable err) {
+ if (writer != null && writeToWriter(res, err, ctx, writer, futPtr))
+ return;
+
+ if (err != null) {
+ writeFutureError(ctx, futPtr, err);
+
+ return;
+ }
+
+ try {
+ if (typ == TYP_OBJ) {
+ if (res == null)
+ gate.futureNullResult(futPtr);
+ else {
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx outWriter = ctx.writer(out);
+
+ outWriter.writeObjectDetached(res);
+
+ out.synchronize();
+
+ gate.futureObjectResult(futPtr, mem.pointer());
+ }
+ }
+ }
+ else if (res == null)
+ gate.futureNullResult(futPtr);
+ else {
+ switch (typ) {
+ case TYP_BYTE:
+ gate.futureByteResult(futPtr, (byte) res);
+
+ break;
+
+ case TYP_BOOL:
+ gate.futureBoolResult(futPtr, (boolean) res ? 1 : 0);
+
+ break;
+
+ case TYP_SHORT:
+ gate.futureShortResult(futPtr, (short) res);
+
+ break;
+
+ case TYP_CHAR:
+ gate.futureCharResult(futPtr, (char) res);
+
+ break;
+
+ case TYP_INT:
+ gate.futureIntResult(futPtr, (int) res);
+
+ break;
+
+ case TYP_FLOAT:
+ gate.futureFloatResult(futPtr, (float) res);
+
+ break;
+
+ case TYP_LONG:
+ gate.futureLongResult(futPtr, (long) res);
+
+ break;
+
+ case TYP_DOUBLE:
+ gate.futureDoubleResult(futPtr, (double) res);
+
+ break;
+
+ default:
+ assert false : "Should not reach this: " + typ;
+ }
+ }
+ }
+ catch (Throwable t) {
+ writeFutureError(ctx, futPtr, t);
+
+ if (t instanceof Error)
+ throw t;
+ }
+ }
+ });
+ }
+
+ /**
+ * Write future error.
+ *
+ * @param ctx Interop context.
+ * @param futPtr Future pointer.
+ * @param err Error.
+ */
+ private static void writeFutureError(final PlatformContext ctx, long futPtr, Throwable err) {
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx outWriter = ctx.writer(out);
+
+ outWriter.writeString(err.getClass().getName());
+ outWriter.writeString(err.getMessage());
+
+ PlatformUtils.writeErrorData(err, outWriter);
+
+ out.synchronize();
+
+ ctx.gateway().futureError(futPtr, mem.pointer());
+ }
+ }
+
+ /**
+ * Write result to a custom writer
+ *
+ * @param obj Object to write.
+ * @param err Error to write.
+ * @param ctx Interop context.
+ * @param writer Writer.
+ * @param futPtr Future pointer.
+ * @return Value indicating whether custom write was performed. When false, default write will be used.
+ */
+ private static boolean writeToWriter(Object obj, Throwable err, PlatformContext ctx, Writer writer, long futPtr) {
+ boolean canWrite = writer.canWrite(obj, err);
+
+ if (!canWrite)
+ return false;
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx outWriter = ctx.writer(out);
+
+ writer.write(outWriter, obj, err);
+
+ out.synchronize();
+
+ ctx.gateway().futureObjectResult(futPtr, mem.pointer());
+ }
+
+ return true;
+ }
+
+ /**
+ * Writer allowing special future result handling.
+ */
+ public static interface Writer {
+ /**
+ * Write object.
+ *
+ * @param writer Writer.
+ * @param obj Object.
+ * @param err Error.
+ */
+ public void write(PortableRawWriterEx writer, Object obj, Throwable err);
+
+ /**
+ * Determines whether this writer can write given data.
+ *
+ * @param obj Object.
+ * @param err Error.
+ * @return Value indicating whether this writer can write given data.
+ */
+ public boolean canWrite(Object obj, Throwable err);
+ }
+
+ /**
+ * Listenable entry.
+ */
+ private static interface Listenable {
+ /**
+ * Listen.
+ *
+ * @param lsnr Listener.
+ */
+ public void listen(IgniteBiInClosure<Object, Throwable> lsnr);
+ }
+
+ /**
+ * Listenable around Ignite future.
+ */
+ private static class FutureListenable implements Listenable {
+ /** Future. */
+ private final IgniteFuture fut;
+
+ /**
+ * Constructor.
+ *
+ * @param fut Future.
+ */
+ public FutureListenable(IgniteFuture fut) {
+ this.fut = fut;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void listen(final IgniteBiInClosure<Object, Throwable> lsnr) {
+ fut.listen(new IgniteInClosure<IgniteFuture>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public void apply(IgniteFuture fut0) {
+ try {
+ lsnr.apply(fut0.get(), null);
+ }
+ catch (Throwable err) {
+ lsnr.apply(null, err);
+
+ if (err instanceof Error)
+ throw err;
+ }
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cdf82e94/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index a620f8e..614346a 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -523,6 +523,60 @@ public class PlatformUtils {
}
/**
+ * Writer error data.
+ *
+ * @param err Error.
+ * @param writer Writer.
+ */
+ public static void writeErrorData(Throwable err, PortableRawWriterEx writer) {
+ writeErrorData(err, writer, null);
+ }
+
+ /**
+ * Write error data.
+ * @param err Error.
+ * @param writer Writer.
+ * @param log Optional logger.
+ */
+ public static void writeErrorData(Throwable err, PortableRawWriterEx writer, @Nullable IgniteLogger log) {
+ // Write additional data if needed.
+ if (err instanceof PlatformExtendedException) {
+ PlatformExtendedException err0 = (PlatformExtendedException)err;
+
+ writer.writeBoolean(true); // Data exists.
+
+ int pos = writer.out().position();
+
+ try {
+ writer.writeBoolean(true); // Optimistically assume that we will be able to write it.
+ err0.writeData(writer);
+ }
+ catch (Exception e) {
+ if (log != null)
+ U.warn(log, "Failed to write interop exception data: " + e.getMessage(), e);
+
+ writer.out().position(pos);
+
+ writer.writeBoolean(false); // Error occurred.
+ writer.writeString(e.getClass().getName());
+
+ String innerMsg;
+
+ try {
+ innerMsg = e.getMessage();
+ }
+ catch (Exception innerErr) {
+ innerMsg = "Exception message is not available.";
+ }
+
+ writer.writeString(innerMsg);
+ }
+ }
+ else
+ writer.writeBoolean(false);
+ }
+
+ /**
* Private constructor.
*/
private PlatformUtils() {