You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/02 11:31:06 UTC
[29/50] [abbrv] ignite git commit: IGNITE-4628 Add Java callback
support for platform plugins
IGNITE-4628 Add Java callback support for platform plugins
This closes #1543
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f2328a45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f2328a45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f2328a45
Branch: refs/heads/ignite-4565-ddl
Commit: f2328a4593a6e4902dd3f4c946357fa054c3f1e2
Parents: f52ba0f
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Thu Feb 16 17:46:02 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Feb 16 17:46:02 2017 +0300
----------------------------------------------------------------------
.../callback/PlatformCallbackGateway.java | 23 +++++
.../platform/callback/PlatformCallbackOp.java | 3 +
.../plugin/PlatformTestPluginTarget.java | 29 ++++++-
.../Plugin/PluginTest.cs | 10 ++-
.../Plugin/TestIgnitePluginProvider.cs | 14 ++++
.../Apache.Ignite.Core.csproj | 1 +
.../Impl/Plugin/PluginContext.cs | 8 ++
.../Impl/Plugin/PluginProcessor.cs | 88 ++++++++++++++++++++
.../Impl/Unmanaged/UnmanagedCallbackOp.cs | 3 +-
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 6 ++
.../Apache.Ignite.Core/Plugin/IPluginContext.cs | 7 ++
.../Apache.Ignite.Core/Plugin/PluginCallback.cs | 29 +++++++
12 files changed, 215 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index fc311da..aee14d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.platform.callback;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.util.GridStripedSpinBusyLock;
/**
@@ -1223,6 +1224,28 @@ public class PlatformCallbackGateway {
}
/**
+ * Invoke plugin callback by id.
+ *
+ * @param callbackId Id of a callback registered in Platform.
+ * @param outMem Out memory (Java writes, platform reads).
+ * @param inMem In memory (platform writes, Java reads).
+ */
+ public void pluginCallback(long callbackId, PlatformMemory outMem, PlatformMemory inMem) {
+ enter();
+
+ try {
+ long outPtr = outMem == null ? 0 : outMem.pointer();
+ long inPtr = inMem == null ? 0 : inMem.pointer();
+
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.PluginCallbackInLongLongOutLong, callbackId, outPtr, inPtr, null);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
* Enter gateway.
*/
protected void enter() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java
index 500a4f3..d77d2d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java
@@ -221,4 +221,7 @@ class PlatformCallbackOp {
/** */
public static final int CachePluginIgniteStop = 67;
+
+ /** */
+ public static final int PluginCallbackInLongLongOutLong = 68;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
index 94a21a3..e80a23f 100644
--- a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
@@ -24,12 +24,15 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformTarget;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.plugin.PluginConfiguration;
import org.jetbrains.annotations.Nullable;
/**
* Test target.
*/
+@SuppressWarnings("ConstantConditions")
class PlatformTestPluginTarget extends PlatformAbstractTarget {
/** */
private final String name;
@@ -87,11 +90,35 @@ class PlatformTestPluginTarget extends PlatformAbstractTarget {
throws IgniteCheckedException {
PlatformTestPluginTarget t = (PlatformTestPluginTarget)arg;
- writer.writeString(t.name);
+ writer.writeString(invokeCallback(t.name));
return new PlatformTestPluginTarget(platformCtx, t.name + reader.readString());
}
+ /**
+ * Invokes the platform callback.
+ *
+ * @param val Value to send.
+ * @return Result.
+ */
+ private String invokeCallback(String val) {
+ PlatformMemory outMem = platformCtx.memory().allocate();
+ PlatformMemory inMem = platformCtx.memory().allocate();
+
+ PlatformOutputStream outStream = outMem.output();
+ BinaryRawWriterEx writer = platformCtx.writer(outStream);
+
+ writer.writeString(val);
+
+ outStream.synchronize();
+
+ platformCtx.gateway().pluginCallback(1, outMem, inMem);
+
+ BinaryRawReaderEx reader = platformCtx.reader(inMem);
+
+ return reader.readString();
+ }
+
/** {@inheritDoc} */
@Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
writer.writeString(name);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
index 0af7b10..efb14ff 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
@@ -73,7 +73,7 @@ namespace Apache.Ignite.Core.Tests.Plugin
var extension = plugin.Provider.Context.GetExtension(0);
Assert.IsNotNull(extension);
- CheckPluginTarget(extension, "barbaz");
+ CheckPluginTarget(extension, "barbaz", plugin.Provider);
}
Assert.AreEqual(true, plugin.Provider.Stopped);
@@ -83,7 +83,8 @@ namespace Apache.Ignite.Core.Tests.Plugin
/// <summary>
/// Checks the plugin target operations.
/// </summary>
- private static void CheckPluginTarget(IPlatformTarget target, string expectedName)
+ private static void CheckPluginTarget(IPlatformTarget target, string expectedName,
+ TestIgnitePluginProvider provider)
{
// Returns name.
Assert.AreEqual(expectedName, target.OutStream(1, r => r.ReadString()));
@@ -104,12 +105,13 @@ namespace Apache.Ignite.Core.Tests.Plugin
var newTarget = target.InStreamOutObject(1, w => w.WriteString("name1"));
Assert.AreEqual("name1", newTarget.OutStream(1, r => r.ReadString()));
- // Returns target with specified name appended.
+ // Invokes callback to modify name, returns target with specified name appended.
var res = target.InObjectStreamOutObjectStream(1, newTarget, w => w.WriteString("_abc"),
(reader, t) => Tuple.Create(reader.ReadString(), t));
- Assert.AreEqual("name1", res.Item1); // Old name
+ Assert.AreEqual("NAME1", res.Item1); // Old name converted by callback.
Assert.AreEqual("name1_abc", res.Item2.OutStream(1, r => r.ReadString()));
+ Assert.AreEqual("name1", provider.CallbackResult); // Old name.
// Returns a copy with same name.
var resCopy = res.Item2.OutObject(1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/TestIgnitePluginProvider.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/TestIgnitePluginProvider.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/TestIgnitePluginProvider.cs
index d86750f..161d797 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/TestIgnitePluginProvider.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/TestIgnitePluginProvider.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Core.Tests.Plugin
{
using System;
+ using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Plugin;
using NUnit.Framework;
@@ -69,6 +70,17 @@ namespace Apache.Ignite.Core.Tests.Plugin
(className, message, inner, ignite) =>
new TestIgnitePluginException(className, message, ignite, inner));
+ context.RegisterCallback(1, (input, output) =>
+ {
+ CallbackResult = input.ReadString();
+ output.WriteString(CallbackResult.ToUpper());
+
+ return CallbackResult.Length;
+ });
+
+ var ex = Assert.Throws<IgniteException>(() => context.RegisterCallback(1, (input, output) => 0));
+ Assert.AreEqual("Plugin callback with id 1 is already registered", ex.Message);
+
Context = context;
EnsureIgniteWorks();
@@ -113,6 +125,8 @@ namespace Apache.Ignite.Core.Tests.Plugin
/// </summary>
public bool? IgniteStopped { get; set; }
+ public string CallbackResult { get; private set; }
+
/// <summary>
/// Gets the context.
/// </summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 7c86429..58002db 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -492,6 +492,7 @@
<Compile Include="Plugin\IPluginConfiguration.cs" />
<Compile Include="Plugin\IPluginContext.cs" />
<Compile Include="Plugin\IPluginProvider.cs" />
+ <Compile Include="Plugin\PluginCallback.cs" />
<Compile Include="Plugin\PluginNotFoundException.cs" />
<Compile Include="Plugin\PluginProviderTypeAttribute.cs" />
<Compile Include="Plugin\Cache\ICachePluginConfiguration.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginContext.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginContext.cs
index 0e01244..fd7033c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginContext.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginContext.cs
@@ -79,5 +79,13 @@ namespace Apache.Ignite.Core.Impl.Plugin
_pluginProcessor.RegisterExceptionMapping(className, factory);
}
+
+ /** <inheritdoc /> */
+ public void RegisterCallback(long callbackId, PluginCallback callback)
+ {
+ IgniteArgumentCheck.NotNull(callback, "callback");
+
+ _pluginProcessor.RegisterCallback(callbackId, callback);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginProcessor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginProcessor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginProcessor.cs
index 7cafcc0..7ed7141 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginProcessor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Plugin/PluginProcessor.cs
@@ -21,8 +21,12 @@ namespace Apache.Ignite.Core.Impl.Plugin
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
+ using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Memory;
using Apache.Ignite.Core.Log;
using Apache.Ignite.Core.Plugin;
@@ -42,6 +46,10 @@ namespace Apache.Ignite.Core.Impl.Plugin
private readonly CopyOnWriteConcurrentDictionary<string, ExceptionFactory> _exceptionMappings
= new CopyOnWriteConcurrentDictionary<string, ExceptionFactory>();
+ /** Plugin callbacks. */
+ private readonly CopyOnWriteConcurrentDictionary<long, PluginCallback> _callbacks
+ = new CopyOnWriteConcurrentDictionary<long, PluginCallback>();
+
/** */
private readonly Ignite _ignite;
@@ -144,6 +152,86 @@ namespace Apache.Ignite.Core.Impl.Plugin
}
/// <summary>
+ /// Registers the callback.
+ /// </summary>
+ /// <param name="callbackId">Calback id.</param>
+ /// <param name="callback">Callback delegate</param>
+ public void RegisterCallback(long callbackId, PluginCallback callback)
+ {
+ Debug.Assert(callback != null);
+
+ var res = _callbacks.GetOrAdd(callbackId, _ => callback);
+
+ if (res != callback)
+ {
+ throw new IgniteException(string.Format(
+ "Plugin callback with id {0} is already registered", callbackId));
+ }
+ }
+
+ /// <summary>
+ /// Invokes the callback.
+ /// </summary>
+ public long InvokeCallback(long callbackId, long inPtr, long outPtr)
+ {
+ PluginCallback callback;
+
+ if (!_callbacks.TryGetValue(callbackId, out callback))
+ {
+ throw new IgniteException(string.Format(
+ "Plugin callback with id {0} is not registered", callbackId));
+ }
+
+ using (var inStream = GetStream(inPtr))
+ using (var outStream = GetStream(outPtr))
+ {
+ var reader = GetReader(inStream);
+ var writer = GetWriter(outStream);
+
+ var res = callback(reader, writer);
+
+ if (writer != null)
+ {
+ outStream.SynchronizeOutput();
+ writer.Marshaller.FinishMarshal(writer);
+ }
+
+ return res;
+ }
+ }
+
+ /// <summary>
+ /// Gets the stream.
+ /// </summary>
+ private static PlatformMemoryStream GetStream(long ptr)
+ {
+ return ptr == 0 ? null : IgniteManager.Memory.Get(ptr).GetStream();
+ }
+
+ /// <summary>
+ /// Gets the reader.
+ /// </summary>
+ private IBinaryRawReader GetReader(IBinaryStream stream)
+ {
+ return stream == null ? null : Ignite.Marshaller.StartUnmarshal(stream).GetRawReader();
+ }
+
+ /// <summary>
+ /// Gets the writer.
+ /// </summary>
+ private BinaryWriter GetWriter(IBinaryStream stream)
+ {
+ if (stream == null)
+ return null;
+
+ var res = Ignite.Marshaller.StartMarshal(stream);
+
+ res.GetRawWriter();
+
+ return res;
+ }
+
+ /// <summary>
/// Loads the plugins.
/// </summary>
private void LoadPlugins()
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs
index 27d1124..91df822 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackOp.cs
@@ -87,6 +87,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
CachePluginCreate = 64,
CachePluginDestroy = 65,
CachePluginIgniteStart = 66,
- CachePluginIgniteStop = 67
+ CachePluginIgniteStop = 67,
+ PluginCallbackInLongLongOutLong = 68
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index 1d88f76..91ffabb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -246,6 +246,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
AddHandler(UnmanagedCallbackOp.CachePluginDestroy, CachePluginDestroy);
AddHandler(UnmanagedCallbackOp.CachePluginIgniteStart, CachePluginIgniteStart);
AddHandler(UnmanagedCallbackOp.CachePluginIgniteStop, CachePluginIgniteStop);
+ AddHandler(UnmanagedCallbackOp.PluginCallbackInLongLongOutLong, PluginCallbackInLongLongOutLong);
}
/// <summary>
@@ -1285,6 +1286,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
return 0;
}
+ private long PluginCallbackInLongLongOutLong(long callbackId, long inPtr, long outPtr, void* arg)
+ {
+ return _ignite.PluginProcessor.InvokeCallback(callbackId, inPtr, outPtr);
+ }
+
#endregion
#region HELPERS
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/IPluginContext.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/IPluginContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/IPluginContext.cs
index 97b566c..03d130b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/IPluginContext.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/IPluginContext.cs
@@ -57,5 +57,12 @@ namespace Apache.Ignite.Core.Plugin
/// <param name="className">Name of the Java exception class to be mapped.</param>
/// <param name="factory">Exception factory delegate.</param>
void RegisterExceptionMapping(string className, ExceptionFactory factory);
+
+ /// <summary>
+ /// Registers Java->.NET callback.
+ /// </summary>
+ /// <param name="callbackId">Callback id.</param>
+ /// <param name="callback">Callback delegate.</param>
+ void RegisterCallback(long callbackId, PluginCallback callback);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2328a45/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/PluginCallback.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/PluginCallback.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/PluginCallback.cs
new file mode 100644
index 0000000..5ba675e
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Plugin/PluginCallback.cs
@@ -0,0 +1,29 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Plugin
+{
+ using Apache.Ignite.Core.Binary;
+
+ /// <summary>
+ /// Plugin callback delegate.
+ /// </summary>
+ /// <param name="input">Input reader. May be null.</param>
+ /// <param name="output">Output writer. May be null.</param>
+ /// <returns>Result code.</returns>
+ public delegate long PluginCallback(IBinaryRawReader input, IBinaryRawWriter output);
+}