You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/18 12:04:15 UTC
[11/14] ignite git commit: IGNITE-1513: Merged Java to core module.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationClosure.java
new file mode 100644
index 0000000..648726b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationClosure.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cpp;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PlatformConfiguration;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.portable.PortableMarshaller;
+import org.apache.ignite.platform.cpp.PlatformCppConfiguration;
+
+import java.util.Collections;
+
+/**
+ * Interop CPP configuration closure.
+ */
+public class PlatformCppConfigurationClosure extends PlatformAbstractConfigurationClosure {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructor.
+ *
+ * @param envPtr Environment pointer.
+ */
+ public PlatformCppConfigurationClosure(long envPtr) {
+ super(envPtr);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override protected void apply0(IgniteConfiguration igniteCfg) {
+ // 3. Validate and copy Interop configuration setting environment pointer along the way.
+ PlatformConfiguration interopCfg = igniteCfg.getPlatformConfiguration();
+
+ if (interopCfg != null && !(interopCfg instanceof PlatformCppConfiguration))
+ throw new IgniteException("Illegal interop configuration (must be of type " +
+ PlatformCppConfiguration.class.getName() + "): " + interopCfg.getClass().getName());
+
+ PlatformCppConfiguration cppCfg = interopCfg != null ? (PlatformCppConfiguration)interopCfg : null;
+
+ if (cppCfg == null)
+ cppCfg = new PlatformCppConfiguration();
+
+ PlatformMemoryManagerImpl memMgr = new PlatformMemoryManagerImpl(gate, 1024);
+
+ PlatformCppConfigurationEx cppCfg0 = new PlatformCppConfigurationEx(cppCfg, gate, memMgr);
+
+ igniteCfg.setPlatformConfiguration(cppCfg0);
+
+ // Check marshaller
+ Marshaller marsh = igniteCfg.getMarshaller();
+
+ if (marsh == null) {
+ igniteCfg.setMarshaller(new PortableMarshaller());
+
+ cppCfg0.warnings(Collections.singleton("Marshaller is automatically set to " +
+ PortableMarshaller.class.getName() + " (other nodes must have the same marshaller type)."));
+ }
+ else if (!(marsh instanceof PortableMarshaller))
+ throw new IgniteException("Unsupported marshaller (only " + PortableMarshaller.class.getName() +
+ " can be used when running Ignite for C++): " + marsh.getClass().getName());
+
+ // Set Ignite home so that marshaller context works.
+ String ggHome = igniteCfg.getIgniteHome();
+
+ if (ggHome == null)
+ ggHome = U.getIgniteHome();
+ else
+ // If user provided IGNITE_HOME - set it as a system property.
+ U.setIgniteHome(ggHome);
+
+ try {
+ U.setWorkDirectory(igniteCfg.getWorkDirectory(), ggHome);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationEx.java
new file mode 100644
index 0000000..ea11ce9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppConfigurationEx.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cpp;
+
+import org.apache.ignite.internal.processors.platform.PlatformConfigurationEx;
+import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.platform.cpp.PlatformCppConfiguration;
+
+import java.util.Collection;
+
+/**
+ * Internal interop CPP configuration.
+ */
+public class PlatformCppConfigurationEx extends PlatformCppConfiguration implements PlatformConfigurationEx {
+ /** Native gateway. */
+ private final PlatformCallbackGateway gate;
+
+ /** Memory manager. */
+ private final PlatformMemoryManagerImpl memMgr;
+
+ /** Warnings */
+ private Collection<String> warns;
+
+ /**
+ * Copy constructor.
+ *
+ * @param cfg Configuration to copy.
+ * @param gate Native gateway.
+ * @param memMgr Memory manager.
+ */
+ public PlatformCppConfigurationEx(PlatformCppConfiguration cfg, PlatformCallbackGateway gate,
+ PlatformMemoryManagerImpl memMgr) {
+ super(cfg);
+
+ this.gate = gate;
+ this.memMgr = memMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformCallbackGateway gate() {
+ return gate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformMemoryManagerImpl memory() {
+ return memMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String platform() {
+ return PlatformUtils.PLATFORM_CPP;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> warnings() {
+ return warns;
+ }
+
+ /**
+ * @param warnings Warnings.
+ */
+ public void warnings(Collection<String> warnings) {
+ this.warns = warnings;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
new file mode 100644
index 0000000..ef64ef9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.datastreamer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.internal.util.lang.GridMapEntry;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
+/**
+ * Interop data streamer wrapper.
+ */
+@SuppressWarnings({"UnusedDeclaration", "unchecked"})
+public class PlatformDataStreamer extends PlatformAbstractTarget {
+ /** Policy: continue. */
+ private static final int PLC_CONTINUE = 0;
+
+ /** Policy: close. */
+ private static final int PLC_CLOSE = 1;
+
+ /** Policy: cancel and close. */
+ private static final int PLC_CANCEL_CLOSE = 2;
+
+ /** Policy: do flush. */
+ private static final int PLC_FLUSH = 3;
+
+ /** */
+ private static final int OP_UPDATE = 1;
+
+ /** */
+ private static final int OP_RECEIVER = 2;
+
+ /** Cache name. */
+ private final String cacheName;
+
+ /** Data streamer. */
+ private final DataStreamerImpl ldr;
+
+ /** Portable flag. */
+ private final boolean keepPortable;
+
+ /** Topology update event listener. */
+ private volatile GridLocalEventListener lsnr;
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ * @param ldr Data streamer.
+ */
+ public PlatformDataStreamer(PlatformContext platformCtx, String cacheName, DataStreamerImpl ldr,
+ boolean keepPortable) {
+ super(platformCtx);
+
+ this.cacheName = cacheName;
+ this.ldr = ldr;
+ this.keepPortable = keepPortable;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+ switch (type) {
+ case OP_UPDATE:
+ int plc = reader.readInt();
+
+ if (plc == PLC_CANCEL_CLOSE) {
+ // Close with cancel.
+ platformCtx.kernalContext().event().removeLocalEventListener(lsnr);
+
+ ldr.close(true);
+ }
+ else {
+ final long futPtr = reader.readLong();
+
+ int valsCnt = reader.readInt();
+
+ if (valsCnt > 0) {
+ Collection<GridMapEntry> vals = new ArrayList<>(valsCnt);
+
+ for (int i = 0; i < valsCnt; i++)
+ vals.add(new GridMapEntry(reader.readObjectDetached(), reader.readObjectDetached()));
+
+ PlatformFutureUtils.listen(platformCtx, ldr.addData(vals), futPtr,
+ PlatformFutureUtils.TYP_OBJ, this);
+ }
+
+ if (plc == PLC_CLOSE) {
+ platformCtx.kernalContext().event().removeLocalEventListener(lsnr);
+
+ ldr.close(false);
+ }
+ else if (plc == PLC_FLUSH)
+ ldr.tryFlush();
+ else
+ assert plc == PLC_CONTINUE;
+ }
+
+ return TRUE;
+
+ case OP_RECEIVER:
+ long ptr = reader.readLong();
+
+ Object rec = reader.readObjectDetached();
+
+ ldr.receiver(platformCtx.createStreamReceiver(rec, ptr, keepPortable));
+
+ return TRUE;
+
+ default:
+ return super.processInStreamOutLong(type, reader);
+ }
+ }
+
+ /**
+ * Listen topology changes.
+ *
+ * @param ptr Pointer.
+ */
+ public void listenTopology(final long ptr) {
+ lsnr = new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ long topVer = discoEvt.topologyVersion();
+ int topSize = platformCtx.kernalContext().discovery().cacheNodes(
+ cacheName, new AffinityTopologyVersion(topVer)).size();
+
+ platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize);
+ }
+ };
+
+ platformCtx.kernalContext().event().addLocalEventListener(lsnr, EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+ GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery();
+
+ long topVer = discoMgr.topologyVersion();
+ int topSize = discoMgr.cacheNodes(cacheName, new AffinityTopologyVersion(topVer)).size();
+
+ platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize);
+ }
+
+ /**
+ * @return Allow-overwrite flag.
+ */
+ public boolean allowOverwrite() {
+ return ldr.allowOverwrite();
+ }
+
+ /**
+ * @param val Allow-overwrite flag.
+ */
+ public void allowOverwrite(boolean val) {
+ ldr.allowOverwrite(val);
+ }
+
+ /**
+ * @return Skip store flag.
+ */
+ public boolean skipStore() {
+ return ldr.skipStore();
+ }
+
+ /**
+ * @param skipStore Skip store flag.
+ */
+ public void skipStore(boolean skipStore) {
+ ldr.skipStore(skipStore);
+ }
+
+ /**
+ * @return Per-node buffer size.
+ */
+ public int perNodeBufferSize() {
+ return ldr.perNodeBufferSize();
+ }
+
+ /**
+ * @param val Per-node buffer size.
+ */
+ public void perNodeBufferSize(int val) {
+ ldr.perNodeBufferSize(val);
+ }
+
+ /**
+ * @return Per-node parallel load operations.
+ */
+ public int perNodeParallelOperations() {
+ return ldr.perNodeParallelOperations();
+ }
+
+ /**
+ * @param val Per-node parallel load operations.
+ */
+ public void perNodeParallelOperations(int val) {
+ ldr.perNodeParallelOperations(val);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
new file mode 100644
index 0000000..92250c0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.datastreamer;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.cache.PlatformCache;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Interop receiver.
+ */
+public class PlatformStreamReceiverImpl extends PlatformAbstractPredicate implements PlatformStreamReceiver {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private boolean keepPortable;
+
+ /**
+ * Constructor.
+ */
+ public PlatformStreamReceiverImpl()
+ {
+ super();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param pred .Net portable receiver.
+ * @param ptr Pointer to receiver in the native platform.
+ * @param ctx Kernal context.
+ */
+ public PlatformStreamReceiverImpl(Object pred, long ptr, boolean keepPortable, PlatformContext ctx) {
+ super(pred, ptr, ctx);
+
+ assert pred != null;
+
+ this.keepPortable = keepPortable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void receive(IgniteCache<Object, Object> cache, Collection<Map.Entry<Object, Object>> collection)
+ throws IgniteException {
+ assert ctx != null;
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(pred);
+
+ writer.writeInt(collection.size());
+
+ for (Map.Entry<Object, Object> e : collection) {
+ writer.writeObject(e.getKey());
+ writer.writeObject(e.getValue());
+ }
+
+ out.synchronize();
+
+ ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, new PlatformCache(ctx, cache, keepPortable),
+ mem.pointer(), keepPortable);
+ }
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ @IgniteInstanceResource
+ public void setIgniteInstance(Ignite ignite) {
+ ctx = PlatformUtils.platformContext(ignite);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+
+ out.writeBoolean(keepPortable);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+
+ keepPortable = in.readBoolean();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrap.java
new file mode 100644
index 0000000..837ded9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrap.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.dotnet;
+
+import org.apache.ignite.internal.processors.platform.PlatformAbstractBootstrap;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure;
+
+/**
+ * Interop .Net bootstrap.
+ */
+public class PlatformDotNetBootstrap extends PlatformAbstractBootstrap {
+ /** {@inheritDoc} */
+ @Override protected PlatformAbstractConfigurationClosure closure(long envPtr) {
+ return new PlatformDotNetConfigurationClosure(envPtr);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrapFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrapFactory.java
new file mode 100644
index 0000000..6b2a6cd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetBootstrapFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.dotnet;
+
+import org.apache.ignite.internal.processors.platform.PlatformBootstrap;
+import org.apache.ignite.internal.processors.platform.PlatformBootstrapFactory;
+
+/**
+ * Interop .Net bootstrap factory.
+ */
+public class PlatformDotNetBootstrapFactory implements PlatformBootstrapFactory {
+ /** Bootstrap ID. */
+ public static final int ID = 1;
+
+ /** {@inheritDoc} */
+ @Override public int id() {
+ return ID;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformBootstrap create() {
+ return new PlatformDotNetBootstrap();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
new file mode 100644
index 0000000..c86de5d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.dotnet;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreSession;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.cache.store.PlatformCacheStore;
+import org.apache.ignite.internal.processors.platform.cache.store.PlatformCacheStoreCallback;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.internal.util.lang.GridMapEntry;
+import org.apache.ignite.internal.util.lang.GridTuple;
+import org.apache.ignite.internal.util.lang.IgniteInClosureX;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.Cache;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Wrapper for .NET cache store implementations.
+ * <p>
+ * This wrapper should be used if you have an implementation of
+ * {@code GridGain.Cache.IGridCacheStore} interface in .NET and
+ * would like to configure it a persistence storage for your cache.
+ * If properly configured, this wrapper will instantiate an instance
+ * of your cache store in .NET and delegate all calls to that instance.
+ * To create an instance, assembly name and class name are passed to
+ * <a target="_blank" href="http://msdn.microsoft.com/en-us/library/d133hta4.aspx">System.Activator.CreateInstance(String, String)</a>
+ * method in .NET during node startup. Refer to its documentation for
+ * details.
+ */
+public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, PlatformCacheStore {
+ /** Load cache operation code. */
+ private static final byte OP_LOAD_CACHE = (byte)0;
+
+ /** Load operation code. */
+ private static final byte OP_LOAD = (byte)1;
+
+ /** Load all operation code. */
+ private static final byte OP_LOAD_ALL = (byte)2;
+
+ /** Put operation code. */
+ private static final byte OP_PUT = (byte)3;
+
+ /** Put all operation code. */
+ private static final byte OP_PUT_ALL = (byte)4;
+
+ /** Remove operation code. */
+ private static final byte OP_RMV = (byte)5;
+
+ /** Remove all operation code. */
+ private static final byte OP_RMV_ALL = (byte)6;
+
+ /** Tx end operation code. */
+ private static final byte OP_SES_END = (byte)7;
+
+ /** Key used to distinguish session deployment. */
+ private static final Object KEY_SES = new Object();
+
+ /** */
+ @CacheStoreSessionResource
+ private CacheStoreSession ses;
+
+ /** .Net assembly name. */
+ private String assemblyName;
+
+ /** .Net class name. */
+ private String clsName;
+
+ /** Properties. */
+ private Map<String, ?> props;
+
+ /** Interop processor. */
+ protected PlatformContext platformCtx;
+
+ /** Pointer to native store. */
+ protected long ptr;
+
+ /**
+ * Gets .NET assembly name.
+ *
+ * @return .NET assembly name.
+ */
+ public String getAssemblyName() {
+ return assemblyName;
+ }
+
+ /**
+ * Set .NET assembly name.
+ *
+ * @param assemblyName .NET assembly name.
+ */
+ public void setAssemblyName(String assemblyName) {
+ this.assemblyName = assemblyName;
+ }
+
+ /**
+ * Gets .NET class name.
+ *
+ * @return .NET class name.
+ */
+ public String getClassName() {
+ return clsName;
+ }
+
+ /**
+ * Sets .NET class name.
+ *
+ * @param clsName .NET class name.
+ */
+ public void setClassName(String clsName) {
+ this.clsName = clsName;
+ }
+
+ /**
+ * Get properties.
+ *
+ * @return Properties.
+ */
+ public Map<String, ?> getProperties() {
+ return props;
+ }
+
+ /**
+ * Set properties.
+ *
+ * @param props Properties.
+ */
+ public void setProperties(Map<String, ?> props) {
+ this.props = props;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public V load(final K key) {
+ try {
+ final GridTuple<V> val = new GridTuple<>();
+
+ doInvoke(new IgniteInClosureX<PortableRawWriterEx>() {
+ @Override public void applyx(PortableRawWriterEx writer) throws IgniteCheckedException {
+ writer.writeByte(OP_LOAD);
+ writer.writeLong(session());
+ writer.writeString(ses.cacheName());
+ writer.writeObject(key);
+ }
+ }, new LoadCallback<>(platformCtx, val));
+
+ return val.get();
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> loadAll(final Iterable<? extends K> keys) {
+ try {
+ final Map<K, V> loaded = new HashMap<>();
+
+ doInvoke(new IgniteInClosureX<PortableRawWriterEx>() {
+ @Override public void applyx(PortableRawWriterEx writer) throws IgniteCheckedException {
+ writer.writeByte(OP_LOAD_ALL);
+ writer.writeLong(session());
+ writer.writeString(ses.cacheName());
+ writer.writeCollection((Collection) keys);
+ }
+ }, new LoadAllCallback<>(platformCtx, loaded));
+
+ return loaded;
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(final IgniteBiInClosure<K, V> clo, final @Nullable Object... args) {
+ try {
+ doInvoke(new IgniteInClosureX<PortableRawWriterEx>() {
+ @Override public void applyx(PortableRawWriterEx writer) throws IgniteCheckedException {
+ writer.writeByte(OP_LOAD_CACHE);
+ writer.writeLong(session());
+ writer.writeString(ses.cacheName());
+ writer.writeObjectArray(args);
+ }
+ }, new LoadCacheCallback<>(platformCtx, clo));
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(final Cache.Entry<? extends K, ? extends V> entry) {
+ try {
+ doInvoke(new IgniteInClosureX<PortableRawWriterEx>() {
+ @Override public void applyx(PortableRawWriterEx writer) throws IgniteCheckedException {
+ writer.writeByte(OP_PUT);
+ writer.writeLong(session());
+ writer.writeString(ses.cacheName());
+ writer.writeObject(entry.getKey());
+ writer.writeObject(entry.getValue());
+ }
+ }, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheWriterException(U.convertExceptionNoWrap(e));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"NullableProblems", "unchecked"})
+ @Override public void writeAll(final Collection<Cache.Entry<? extends K, ? extends V>> entries) {
+ try {
+ doInvoke(new IgniteInClosureX<PortableRawWriterEx>() {
+ @Override public void applyx(PortableRawWriterEx writer) throws IgniteCheckedException {
+ Map<K, V> map = new AbstractMap<K, V>() {
+ @Override public int size() {
+ return entries.size();
+ }
+
+ @Override public Set<Entry<K, V>> entrySet() {
+ return new AbstractSet<Entry<K, V>>() {
+ @Override public Iterator<Entry<K, V>> iterator() {
+ return F.iterator(entries, new C1<Cache.Entry<? extends K, ? extends V>, Entry<K, V>>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public Entry<K, V> apply(Cache.Entry<? extends K, ? extends V> entry) {
+ return new GridMapEntry<>(entry.getKey(), entry.getValue());
+ }
+ }, true);
+ }
+
+ @Override public int size() {
+ return entries.size();
+ }
+ };
+ }
+ };
+
+ writer.writeByte(OP_PUT_ALL);
+ writer.writeLong(session());
+ writer.writeString(ses.cacheName());
+ writer.writeMap(map);
+ }
+ }, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheWriterException(U.convertExceptionNoWrap(e));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(final Object key) {
+ try {
+ doInvoke(new IgniteInClosureX<PortableRawWriterEx>() {
+ @Override public void applyx(PortableRawWriterEx writer) throws IgniteCheckedException {
+ writer.writeByte(OP_RMV);
+ writer.writeLong(session());
+ writer.writeString(ses.cacheName());
+ writer.writeObject(key);
+ }
+ }, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheWriterException(U.convertExceptionNoWrap(e));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deleteAll(final Collection<?> keys) {
+ try {
+ doInvoke(new IgniteInClosureX<PortableRawWriterEx>() {
+ @Override public void applyx(PortableRawWriterEx writer) throws IgniteCheckedException {
+ writer.writeByte(OP_RMV_ALL);
+ writer.writeLong(session());
+ writer.writeString(ses.cacheName());
+ writer.writeCollection(keys);
+ }
+ }, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheWriterException(U.convertExceptionNoWrap(e));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sessionEnd(final boolean commit) {
+ try {
+ doInvoke(new IgniteInClosureX<PortableRawWriterEx>() {
+ @Override public void applyx(PortableRawWriterEx writer) throws IgniteCheckedException {
+ writer.writeByte(OP_SES_END);
+ writer.writeLong(session());
+ writer.writeString(ses.cacheName());
+ writer.writeBoolean(commit);
+ }
+ }, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheWriterException(U.convertExceptionNoWrap(e));
+ }
+ }
+
+ /**
+ * Initialize the store.
+ *
+ * @param ctx Context.
+ * @param convertPortable Convert portable flag.
+ * @throws org.apache.ignite.IgniteCheckedException
+ */
+ public void initialize(GridKernalContext ctx, boolean convertPortable) throws IgniteCheckedException {
+ A.notNull(assemblyName, "assemblyName");
+ A.notNull(clsName, "clsName");
+
+ platformCtx = PlatformUtils.platformContext(ctx.grid());
+
+ try (PlatformMemory mem = platformCtx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = platformCtx.writer(out);
+
+ writer.writeString(assemblyName);
+ writer.writeString(clsName);
+ writer.writeBoolean(convertPortable);
+ writer.writeMap(props);
+
+ out.synchronize();
+
+ ptr = platformCtx.gateway().cacheStoreCreate(mem.pointer());
+ }
+ }
+
+ /**
+ * Gets session pointer created in native platform.
+ *
+ * @return Session pointer.
+ * @throws org.apache.ignite.IgniteCheckedException If failed.
+ */
+ private long session() throws IgniteCheckedException {
+ Long sesPtr = (Long)ses.properties().get(KEY_SES);
+
+ if (sesPtr == null) {
+ // Session is not deployed yet, do that.
+ sesPtr = platformCtx.gateway().cacheStoreSessionCreate(ptr);
+
+ ses.properties().put(KEY_SES, sesPtr);
+ }
+
+ return sesPtr;
+ }
+
+ /**
+ * Perform actual invoke.
+ *
+ * @param task Task.
+ * @param cb Optional callback.
+ * @return Result.
+ * @throws org.apache.ignite.IgniteCheckedException If failed.
+ */
+ protected int doInvoke(IgniteInClosureX<PortableRawWriterEx> task, @Nullable PlatformCacheStoreCallback cb)
+ throws IgniteCheckedException{
+ try (PlatformMemory mem = platformCtx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = platformCtx.writer(out);
+
+ task.apply(writer);
+
+ out.synchronize();
+
+ return platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer(), cb);
+ }
+ }
+
+ /**
+ * Destroys interop-aware component.
+ *
+ * @param ctx Context.
+ */
+ public void destroy(GridKernalContext ctx) {
+ assert ctx != null;
+
+ platformCtx.gateway().cacheStoreDestroy(ptr);
+ }
+
+ /**
+ * Load callback.
+ */
+ private static class LoadCallback<V> extends PlatformCacheStoreCallback {
+ /** Value. */
+ private final GridTuple<V> val;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ * @param val Value.
+ */
+ public LoadCallback(PlatformContext ctx, GridTuple<V> val) {
+ super(ctx);
+
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected void invoke0(PortableRawReaderEx reader) {
+ val.set((V)reader.readObjectDetached());
+ }
+ }
+
+ /**
+ * Load callback.
+ */
+ private static class LoadAllCallback<K, V> extends PlatformCacheStoreCallback {
+ /** Value. */
+ private final Map<K, V> loaded;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ * @param loaded Map with loaded values.
+ */
+ public LoadAllCallback(PlatformContext ctx, Map<K, V> loaded) {
+ super(ctx);
+
+ this.loaded = loaded;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected void invoke0(PortableRawReaderEx reader) {
+ loaded.put((K) reader.readObjectDetached(), (V) reader.readObjectDetached());
+ }
+ }
+
+ /**
+ * Load callback.
+ */
+ private static class LoadCacheCallback<K, V> extends PlatformCacheStoreCallback {
+ /** Value. */
+ private final IgniteBiInClosure<K, V> clo;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Context.
+ * @param clo Closure.
+ */
+ public LoadCacheCallback(PlatformContext ctx, IgniteBiInClosure<K, V> clo) {
+ super(ctx);
+
+ this.clo = clo;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected void invoke0(PortableRawReaderEx reader) {
+ clo.apply((K) reader.readObjectDetached(), (V) reader.readObjectDetached());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java
new file mode 100644
index 0000000..8662375
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.dotnet;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PlatformConfiguration;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.portable.GridPortableMarshaller;
+import org.apache.ignite.internal.portable.PortableContext;
+import org.apache.ignite.internal.portable.PortableMetaDataHandler;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure;
+import org.apache.ignite.internal.processors.platform.lifecycle.PlatformLifecycleBean;
+import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleBean;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration;
+import org.apache.ignite.marshaller.portable.PortableMarshaller;
+import org.apache.ignite.platform.dotnet.PlatformDotNetLifecycleBean;
+import org.apache.ignite.portable.PortableException;
+import org.apache.ignite.portable.PortableMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Closure to apply dot net configuration.
+ */
+@SuppressWarnings({"UnusedDeclaration"})
+public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigurationClosure {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Configuration. */
+ private IgniteConfiguration cfg;
+
+ /** Memory manager. */
+ private PlatformMemoryManagerImpl memMgr;
+
+ /**
+ * Constructor.
+ *
+ * @param envPtr Environment pointer.
+ */
+ public PlatformDotNetConfigurationClosure(long envPtr) {
+ super(envPtr);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override protected void apply0(IgniteConfiguration igniteCfg) {
+ // 3. Validate and copy Interop configuration setting environment pointer along the way.
+ PlatformConfiguration interopCfg = igniteCfg.getPlatformConfiguration();
+
+ if (interopCfg != null && !(interopCfg instanceof PlatformDotNetConfiguration))
+ throw new IgniteException("Illegal platform configuration (must be of type " +
+ PlatformDotNetConfiguration.class.getName() + "): " + interopCfg.getClass().getName());
+
+ PlatformDotNetConfiguration dotNetCfg = interopCfg != null ? (PlatformDotNetConfiguration)interopCfg : null;
+
+ if (dotNetCfg == null)
+ dotNetCfg = new PlatformDotNetConfiguration();
+
+ memMgr = new PlatformMemoryManagerImpl(gate, 1024);
+
+ PlatformDotNetConfigurationEx dotNetCfg0 = new PlatformDotNetConfigurationEx(dotNetCfg, gate, memMgr);
+
+ igniteCfg.setPlatformConfiguration(dotNetCfg0);
+
+ // Check marshaller
+ Marshaller marsh = igniteCfg.getMarshaller();
+
+ if (marsh == null) {
+ igniteCfg.setMarshaller(new PortableMarshaller());
+
+ dotNetCfg0.warnings(Collections.singleton("Marshaller is automatically set to " +
+ PortableMarshaller.class.getName() + " (other nodes must have the same marshaller type)."));
+ }
+ else if (!(marsh instanceof PortableMarshaller))
+ throw new IgniteException("Unsupported marshaller (only " + PortableMarshaller.class.getName() +
+ " can be used when running Ignite for .Net): " + marsh.getClass().getName());
+
+ // Set Ignite home so that marshaller context works.
+ String ggHome = igniteCfg.getIgniteHome();
+
+ if (ggHome == null)
+ ggHome = U.getIgniteHome();
+ else
+ // If user provided IGNITE_HOME - set it as a system property.
+ U.setIgniteHome(ggHome);
+
+ try {
+ U.setWorkDirectory(igniteCfg.getWorkDirectory(), ggHome);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ // 4. Callback to .Net.
+ prepare(igniteCfg, dotNetCfg0);
+ }
+
+ /**
+ * Prepare .Net size.
+ *
+ * @param igniteCfg Ignite configuration.
+ * @param interopCfg Interop configuration.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private void prepare(IgniteConfiguration igniteCfg, PlatformDotNetConfigurationEx interopCfg) {
+ this.cfg = igniteCfg;
+
+ try (PlatformMemory outMem = memMgr.allocate()) {
+ try (PlatformMemory inMem = memMgr.allocate()) {
+ PlatformOutputStream out = outMem.output();
+
+ PortableRawWriterEx writer = marshaller().writer(out);
+
+ PlatformUtils.writeDotNetConfiguration(writer, interopCfg.unwrap());
+
+ List<PlatformDotNetLifecycleBean> beans = beans(igniteCfg);
+
+ writer.writeInt(beans.size());
+
+ for (PlatformDotNetLifecycleBean bean : beans) {
+ writer.writeString(bean.getAssemblyName());
+ writer.writeString(bean.getClassName());
+ writer.writeMap(bean.getProperties());
+ }
+
+ out.synchronize();
+
+ gate.extensionCallbackInLongLongOutLong(
+ PlatformUtils.OP_PREPARE_DOT_NET, outMem.pointer(), inMem.pointer());
+
+ processPrepareResult(inMem.input());
+ }
+ }
+ }
+
+ /**
+ * Process prepare result.
+ *
+ * @param in Input stream.
+ */
+ private void processPrepareResult(PlatformInputStream in) {
+ assert cfg != null;
+
+ List<PlatformDotNetLifecycleBean> beans = beans(cfg);
+ List<PlatformLifecycleBean> newBeans = new ArrayList<>();
+
+ int len = in.readInt();
+
+ for (int i = 0; i < len; i++) {
+ if (i < beans.size())
+ // Existing bean.
+ beans.get(i).initialize(gate, in.readLong());
+ else
+ // This bean is defined in .Net.
+ newBeans.add(new PlatformLifecycleBean(gate, in.readLong()));
+ }
+
+ if (!newBeans.isEmpty()) {
+ LifecycleBean[] newBeans0 = newBeans.toArray(new LifecycleBean[newBeans.size()]);
+
+ // New beans were added. Let's append them to the tail of the rest configured lifecycle beans.
+ LifecycleBean[] oldBeans = cfg.getLifecycleBeans();
+
+ if (oldBeans == null)
+ cfg.setLifecycleBeans(newBeans0);
+ else {
+ LifecycleBean[] mergedBeans = new LifecycleBean[oldBeans.length + newBeans.size()];
+
+ System.arraycopy(oldBeans, 0, mergedBeans, 0, oldBeans.length);
+ System.arraycopy(newBeans0, 0, mergedBeans, oldBeans.length, newBeans0.length);
+
+ cfg.setLifecycleBeans(mergedBeans);
+ }
+ }
+ }
+
+ /**
+ * Find .Net lifecycle beans in configuration.
+ *
+ * @param cfg Configuration.
+ * @return Beans.
+ */
+ private static List<PlatformDotNetLifecycleBean> beans(IgniteConfiguration cfg) {
+ List<PlatformDotNetLifecycleBean> res = new ArrayList<>();
+
+ if (cfg.getLifecycleBeans() != null) {
+ for (LifecycleBean bean : cfg.getLifecycleBeans()) {
+ if (bean instanceof PlatformDotNetLifecycleBean)
+ res.add((PlatformDotNetLifecycleBean)bean);
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Create portable marshaller.
+ *
+ * @return Marshaller.
+ */
+ @SuppressWarnings("deprecation")
+ private static GridPortableMarshaller marshaller() {
+ try {
+ PortableContext ctx = new PortableContext(new PortableMetaDataHandler() {
+ @Override public void addMeta(int typeId, PortableMetadata meta)
+ throws PortableException {
+ // No-op.
+ }
+
+ @Override public PortableMetadata metadata(int typeId) throws PortableException {
+ return null;
+ }
+ }, null);
+
+ PortableMarshaller marsh = new PortableMarshaller();
+
+ marsh.setContext(new MarshallerContextImpl(null));
+
+ ctx.configure(marsh);
+
+ return new GridPortableMarshaller(ctx);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationEx.java
new file mode 100644
index 0000000..eaf0997
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationEx.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.dotnet;
+
+import org.apache.ignite.internal.processors.platform.PlatformConfigurationEx;
+import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration;
+
+import java.util.Collection;
+
+/**
+ * Extended .Net configuration.
+ */
+public class PlatformDotNetConfigurationEx extends PlatformDotNetConfiguration implements PlatformConfigurationEx {
+ /** Native gateway. */
+ private final PlatformCallbackGateway gate;
+
+ /** Memory manager. */
+ private final PlatformMemoryManagerImpl memMgr;
+
+ /** Warnings */
+ private Collection<String> warnings;
+
+ /**
+ * Copy constructor.
+ *
+ * @param cfg Configuration to copy.
+ * @param gate Native gateway.
+ * @param memMgr Memory manager.
+ */
+ public PlatformDotNetConfigurationEx(PlatformDotNetConfiguration cfg, PlatformCallbackGateway gate,
+ PlatformMemoryManagerImpl memMgr) {
+ super(cfg);
+
+ this.gate = gate;
+ this.memMgr = memMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformCallbackGateway gate() {
+ return gate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformMemoryManagerImpl memory() {
+ return memMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String platform() {
+ return PlatformUtils.PLATFORM_DOTNET;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> warnings() {
+ return warnings;
+ }
+
+ /**
+ * @param warnings Warnings.
+ */
+ public void warnings(Collection<String> warnings) {
+ this.warnings = warnings;
+ }
+
+ /**
+ * Unwrap extended configuration.
+ *
+ * @return Original configuration.
+ */
+ public PlatformDotNetConfiguration unwrap() {
+ return new PlatformDotNetConfiguration(this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java
new file mode 100644
index 0000000..1316c83
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.dotnet;
+
+import org.apache.ignite.internal.processors.platform.services.PlatformService;
+
+/**
+ * Marker interface to denote a service implemented on .Net platform.
+ */
+public interface PlatformDotNetService extends PlatformService {
+ // No-op.
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java
new file mode 100644
index 0000000..ec241ee
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.dotnet;
+
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.services.PlatformAbstractService;
+
+/**
+ * Interop .Net service.
+ */
+public class PlatformDotNetServiceImpl extends PlatformAbstractService implements PlatformDotNetService {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Default constructor for serialization.
+ */
+ public PlatformDotNetServiceImpl() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param svc Service.
+ * @param ctx Context.
+ * @param srvKeepPortable Whether to keep objects portable on server if possible.
+ */
+ public PlatformDotNetServiceImpl(Object svc, PlatformContext ctx, boolean srvKeepPortable) {
+ super(svc, ctx, srvKeepPortable);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java
new file mode 100644
index 0000000..b2dfd1c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.events;
+
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+
+import java.util.UUID;
+
+/**
+ * Platform event filter. Delegates apply to native platform.
+ */
+public class PlatformEventFilterListenerImpl implements PlatformEventFilterListener
+{
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final Object pred;
+
+ /** Event types. */
+ private final int[] types;
+
+ /** */
+ protected transient long hnd;
+
+ /** */
+ private transient PlatformContext ctx;
+
+ /**
+ * Constructor.
+ *
+ * @param hnd Handle in the native platform.
+ * @param ctx Context.
+ */
+ public PlatformEventFilterListenerImpl(long hnd, PlatformContext ctx) {
+ assert ctx != null;
+ assert hnd != 0;
+
+ this.hnd = hnd;
+ this.ctx = ctx;
+
+ pred = null;
+ types = null;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param pred .Net portable predicate.
+ */
+ public PlatformEventFilterListenerImpl(Object pred, final int... types) {
+ assert pred != null;
+
+ this.pred = pred;
+ this.types = types;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Event evt) {
+ return apply0(null, evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Event evt) {
+ return apply0(uuid, evt);
+ }
+
+ /**
+ * Apply impl.
+ * @param uuid Node if.
+ * @param evt Event.
+ * @return Result.
+ */
+ private boolean apply0(final UUID uuid, final Event evt) {
+ if (!ctx.isEventTypeSupported(evt.type()))
+ return false;
+
+ if (types != null) {
+ boolean match = false;
+
+ for (int type : types) {
+ if (type == evt.type()) {
+ match = true;
+ break;
+ }
+ }
+
+ if (!match)
+ return false;
+ }
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ ctx.writeEvent(writer, evt);
+
+ writer.writeUuid(uuid);
+
+ out.synchronize();
+
+ int res = ctx.gateway().eventFilterApply(hnd, mem.pointer());
+
+ return res != 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClose() {
+ ctx.gateway().eventFilterDestroy(hnd);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(GridKernalContext gridCtx) {
+ ctx = PlatformUtils.platformContext(gridCtx.grid());
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObjectDetached(pred);
+
+ out.synchronize();
+
+ hnd = ctx.gateway().eventFilterCreate(mem.pointer());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return this == o || o != null && o instanceof PlatformEventFilterListenerImpl &&
+ hnd == ((PlatformEventFilterListenerImpl)o).hnd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (int)(hnd ^ (hnd >>> 32));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
new file mode 100644
index 0000000..8585526
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.events;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventAdapter;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interop events.
+ */
+public class PlatformEvents extends PlatformAbstractTarget {
+ /** */
+ private static final int OP_REMOTE_QUERY = 1;
+
+ /** */
+ private static final int OP_REMOTE_LISTEN = 2;
+
+ /** */
+ private static final int OP_STOP_REMOTE_LISTEN = 3;
+
+ /** */
+ private static final int OP_WAIT_FOR_LOCAL = 4;
+
+ /** */
+ private static final int OP_LOCAL_QUERY = 5;
+
+ /** */
+ private static final int OP_RECORD_LOCAL = 6;
+
+ /** */
+ private static final int OP_ENABLE_LOCAL = 8;
+
+ /** */
+ private static final int OP_DISABLE_LOCAL = 9;
+
+ /** */
+ private static final int OP_GET_ENABLED_EVENTS = 10;
+
+ /** */
+ private final IgniteEvents events;
+
+ /** */
+ private final EventResultWriter eventResWriter;
+
+ /** */
+ private final EventCollectionResultWriter eventColResWriter;
+
+ /**
+ * Ctor.
+ *
+ * @param platformCtx Context.
+ * @param events Ignite events.
+ */
+ public PlatformEvents(PlatformContext platformCtx, IgniteEvents events) {
+ super(platformCtx);
+
+ assert events != null;
+
+ this.events = events;
+
+ eventResWriter = new EventResultWriter(platformCtx);
+ eventColResWriter = new EventCollectionResultWriter(platformCtx);
+ }
+
+ /**
+ * Gets events with asynchronous mode enabled.
+ *
+ * @return Events with asynchronous mode enabled.
+ */
+ public PlatformEvents withAsync() {
+ if (events.isAsync())
+ return this;
+
+ return new PlatformEvents(platformCtx, events.withAsync());
+ }
+
+ /**
+ * Adds an event listener for local events.
+ *
+ * @param hnd Interop listener handle.
+ * @param type Event type.
+ */
+ @SuppressWarnings({"unchecked"})
+ public void localListen(long hnd, int type) {
+ events.localListen(localFilter(hnd), type);
+ }
+
+ /**
+ * Removes an event listener for local events.
+ *
+ * @param hnd Interop listener handle.
+ */
+ @SuppressWarnings({"UnusedDeclaration", "unchecked"})
+ public boolean stopLocalListen(long hnd) {
+ return events.stopLocalListen(localFilter(hnd));
+ }
+
+ /**
+ * Check if event is enabled.
+ *
+ * @param type Event type.
+ * @return {@code True} if event of passed in type is enabled.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public boolean isEnabled(int type) {
+ return events.isEnabled(type);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader)
+ throws IgniteCheckedException {
+ switch (type) {
+ case OP_RECORD_LOCAL:
+ // TODO: IGNITE-1410.
+ return TRUE;
+
+ case OP_ENABLE_LOCAL:
+
+ events.enableLocal(readEventTypes(reader));
+
+ return TRUE;
+
+ case OP_DISABLE_LOCAL:
+
+ events.disableLocal(readEventTypes(reader));
+
+ return TRUE;
+
+ case OP_STOP_REMOTE_LISTEN:
+ events.stopRemoteListen(reader.readUuid());
+
+ return TRUE;
+
+ default:
+ return super.processInStreamOutLong(type, reader);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"})
+ @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer)
+ throws IgniteCheckedException {
+ switch (type) {
+ case OP_LOCAL_QUERY: {
+ Collection<EventAdapter> result =
+ events.localQuery(F.<EventAdapter>alwaysTrue(), readEventTypes(reader));
+
+ writer.writeInt(result.size());
+
+ for (EventAdapter e : result)
+ platformCtx.writeEvent(writer, e);
+
+ break;
+ }
+
+ case OP_WAIT_FOR_LOCAL: {
+ boolean hasFilter = reader.readBoolean();
+
+ IgnitePredicate pred = hasFilter ? localFilter(reader.readLong()) : null;
+
+ int[] eventTypes = readEventTypes(reader);
+
+ EventAdapter result = (EventAdapter) events.waitForLocal(pred, eventTypes);
+
+ platformCtx.writeEvent(writer, result);
+
+ break;
+ }
+
+ case OP_REMOTE_LISTEN: {
+ int bufSize = reader.readInt();
+
+ long interval = reader.readLong();
+
+ boolean autoUnsubscribe = reader.readBoolean();
+
+ boolean hasLocFilter = reader.readBoolean();
+
+ PlatformEventFilterListener locFilter = hasLocFilter ? localFilter(reader.readLong()) : null;
+
+ boolean hasRmtFilter = reader.readBoolean();
+
+ UUID listenId;
+
+ if (hasRmtFilter) {
+ PlatformEventFilterListener rmtFilter = platformCtx.createRemoteEventFilter(
+ reader.readObjectDetached(), readEventTypes(reader));
+
+ listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, rmtFilter);
+ }
+ else
+ listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, null,
+ readEventTypes(reader));
+
+ writer.writeUuid(listenId);
+
+ break;
+ }
+
+ case OP_REMOTE_QUERY: {
+ Object pred = reader.readObjectDetached();
+
+ long timeout = reader.readLong();
+
+ int[] types = readEventTypes(reader);
+
+ PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types);
+
+ Collection<Event> result = events.remoteQuery(filter, timeout);
+
+ if (result == null)
+ writer.writeInt(-1);
+ else {
+ writer.writeInt(result.size());
+
+ for (Event e : result)
+ platformCtx.writeEvent(writer, e);
+ }
+
+ break;
+ }
+
+ default:
+ super.processInStreamOutStream(type, reader, writer);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+ switch (type) {
+ case OP_GET_ENABLED_EVENTS:
+ writeEventTypes(events.enabledEvents(), writer);
+
+ break;
+
+ default:
+ super.processOutStream(type, writer);
+ }
+ }
+
+ /** <inheritDoc /> */
+ @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+ return events.future();
+ }
+
+ /** <inheritDoc /> */
+ @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
+ switch (opId) {
+ case OP_WAIT_FOR_LOCAL:
+ return eventResWriter;
+
+ case OP_REMOTE_QUERY:
+ return eventColResWriter;
+ }
+
+ return null;
+ }
+
+ /**
+ * Reads event types array.
+ *
+ * @param reader Reader
+ * @return Event types, or null.
+ */
+ private int[] readEventTypes(PortableRawReaderEx reader) {
+ return reader.readIntArray();
+ }
+
+ /**
+ * Reads event types array.
+ *
+ * @param writer Writer
+ * @param types Types.
+ */
+ private void writeEventTypes(int[] types, PortableRawWriterEx writer) {
+ if (types == null) {
+ writer.writeIntArray(null);
+
+ return;
+ }
+
+ int[] resultTypes = new int[types.length];
+
+ int idx = 0;
+
+ for (int t : types)
+ if (platformCtx.isEventTypeSupported(t))
+ resultTypes[idx++] = t;
+
+ writer.writeIntArray(Arrays.copyOf(resultTypes, idx));
+ }
+
+ /**
+ * Creates an interop filter from handle.
+ *
+ * @param hnd Handle.
+ * @return Interop filter.
+ */
+ private PlatformEventFilterListener localFilter(long hnd) {
+ return platformCtx.createLocalEventFilter(hnd);
+ }
+
+ /**
+ * Writes an EventBase.
+ */
+ private static class EventResultWriter implements PlatformFutureUtils.Writer {
+ /** */
+ private final PlatformContext platformCtx;
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ */
+ public EventResultWriter(PlatformContext platformCtx) {
+ assert platformCtx != null;
+
+ this.platformCtx = platformCtx;
+ }
+
+ /** <inheritDoc /> */
+ @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+ platformCtx.writeEvent(writer, (EventAdapter)obj);
+ }
+
+ /** <inheritDoc /> */
+ @Override public boolean canWrite(Object obj, Throwable err) {
+ return obj instanceof EventAdapter && err == null;
+ }
+ }
+
+ /**
+ * Writes a collection of EventAdapter.
+ */
+ private static class EventCollectionResultWriter implements PlatformFutureUtils.Writer {
+ /** */
+ private final PlatformContext platformCtx;
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ */
+ public EventCollectionResultWriter(PlatformContext platformCtx) {
+ assert platformCtx != null;
+
+ this.platformCtx = platformCtx;
+ }
+
+ /** <inheritDoc /> */
+ @SuppressWarnings("unchecked")
+ @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+ Collection<EventAdapter> events = (Collection<EventAdapter>)obj;
+
+ writer.writeInt(events.size());
+
+ for (EventAdapter e : events)
+ platformCtx.writeEvent(writer, e);
+ }
+
+ /** <inheritDoc /> */
+ @Override public boolean canWrite(Object obj, Throwable err) {
+ return obj instanceof Collection && err == null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/lifecycle/PlatformLifecycleBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/lifecycle/PlatformLifecycleBean.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/lifecycle/PlatformLifecycleBean.java
new file mode 100644
index 0000000..f17e824
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/lifecycle/PlatformLifecycleBean.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.lifecycle;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
+import org.apache.ignite.lifecycle.LifecycleBean;
+import org.apache.ignite.lifecycle.LifecycleEventType;
+
+/**
+ * Lifecycle aware bean for interop.
+ */
+public class PlatformLifecycleBean implements LifecycleBean {
+ /** Native gateway. */
+ public PlatformCallbackGateway gate;
+
+ /** Holder pointer. */
+ public long ptr;
+
+ /**
+ * Constructor.
+ */
+ protected PlatformLifecycleBean() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param gate Native gateway.
+ * @param ptr Holder pointer.
+ */
+ public PlatformLifecycleBean(PlatformCallbackGateway gate, long ptr) {
+ initialize(gate, ptr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onLifecycleEvent(LifecycleEventType evt) {
+ if (gate == null)
+ throw new IgniteException("Interop lifecycle bean can only be used in interop mode (did " +
+ "you start the node with native platform bootstrapper?");
+
+ assert ptr != 0;
+
+ // Do not send after-stop events because gate will fail due to grid being stopped.
+ if (evt != LifecycleEventType.AFTER_NODE_STOP)
+ gate.lifecycleEvent(ptr, evt.ordinal());
+ }
+
+ /**
+ * Set pointers.
+ *
+ * @param gate Native gateway.
+ * @param ptr Target pointer.
+ */
+ public void initialize(PlatformCallbackGateway gate, long ptr) {
+ this.gate = gate;
+ this.ptr = ptr;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java
new file mode 100644
index 0000000..e305c71
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+/**
+ * Interop memory chunk abstraction.
+ */
+public abstract class PlatformAbstractMemory implements PlatformMemory {
+ /** Stream factory. */
+ private static final StreamFactory STREAM_FACTORY = PlatformMemoryUtils.LITTLE_ENDIAN ?
+ new LittleEndianStreamFactory() : new BigEndianStreamFactory();
+
+ /** Cross-platform memory pointer. */
+ protected long memPtr;
+
+ /**
+ * Constructor.
+ *
+ * @param memPtr Cross-platform memory pointer.
+ */
+ protected PlatformAbstractMemory(long memPtr) {
+ this.memPtr = memPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformInputStream input() {
+ return STREAM_FACTORY.createInput(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformOutputStream output() {
+ return STREAM_FACTORY.createOutput(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long pointer() {
+ return memPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long data() {
+ return PlatformMemoryUtils.data(memPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int capacity() {
+ return PlatformMemoryUtils.capacity(memPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int length() {
+ return PlatformMemoryUtils.length(memPtr);
+ }
+
+ /**
+ * Stream factory.
+ */
+ private static interface StreamFactory {
+ /**
+ * Create input stream.
+ *
+ * @param mem Memory.
+ * @return Input stream.
+ */
+ PlatformInputStreamImpl createInput(PlatformMemory mem);
+
+ /**
+ * Create output stream.
+ *
+ * @param mem Memory.
+ * @return Output stream.
+ */
+ PlatformOutputStreamImpl createOutput(PlatformMemory mem);
+ }
+
+ /**
+ * Stream factory for LITTLE ENDIAN architecture.
+ */
+ private static class LittleEndianStreamFactory implements StreamFactory {
+ /** {@inheritDoc} */
+ @Override public PlatformInputStreamImpl createInput(PlatformMemory mem) {
+ return new PlatformInputStreamImpl(mem);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformOutputStreamImpl createOutput(PlatformMemory mem) {
+ return new PlatformOutputStreamImpl(mem);
+ }
+ }
+
+ /**
+ * Stream factory for BIG ENDIAN architecture.
+ */
+ private static class BigEndianStreamFactory implements StreamFactory {
+ /** {@inheritDoc} */
+ @Override public PlatformInputStreamImpl createInput(PlatformMemory mem) {
+ return new PlatformBigEndianInputStreamImpl(mem);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformOutputStreamImpl createOutput(PlatformMemory mem) {
+ return new PlatformBigEndianOutputStreamImpl(mem);
+ }
+ }
+
+}
\ No newline at end of file