You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/28 08:53:29 UTC
ignite git commit: IGNITE-1314: Moved messaging to Ignite.
Repository: ignite
Updated Branches:
refs/heads/master f4c7107ce -> 27a59cf8a
IGNITE-1314: Moved messaging to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/27a59cf8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/27a59cf8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/27a59cf8
Branch: refs/heads/master
Commit: 27a59cf8a18a9f12e42fd0dc54890f6e44d91515
Parents: f4c7107
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 09:54:10 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 09:54:10 2015 +0300
----------------------------------------------------------------------
.../processors/platform/PlatformContext.java | 10 ++
.../messaging/PlatformMessageFilter.java | 109 +++++++++++++
.../messaging/PlatformMessageLocalFilter.java | 102 ++++++++++++
.../platform/messaging/PlatformMessaging.java | 162 +++++++++++++++++++
4 files changed, 383 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 461fb84..68e0e35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.portable.*;
import org.apache.ignite.internal.processors.cache.query.continuous.*;
import org.apache.ignite.internal.processors.platform.cache.query.*;
@@ -154,4 +155,13 @@ public interface PlatformContext {
* @return Filter.
*/
public CacheContinuousQueryFilterEx createContinuousQueryFilter(Object filter);
+
+ /**
+ * Create remote message filter.
+ *
+ * @param filter Native filter.
+ * @param ptr Pointer of deployed native filter.
+ * @return Filter.
+ */
+ public GridLifecycleAwareMessageFilter<UUID, Object> createRemoteMessageFilter(Object filter, long ptr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
new file mode 100644
index 0000000..8a433ac
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.messaging;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+
+import java.util.*;
+
+/**
+ * Interop filter. Delegates apply to native platform.
+ */
+public class PlatformMessageFilter extends PlatformAbstractPredicate
+ implements GridLifecycleAwareMessageFilter<UUID, Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructor.
+ */
+ public PlatformMessageFilter()
+ {
+ super();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param pred .Net portable predicate.
+ * @param ptr Pointer to predicate in the native platform.
+ * @param ctx Kernal context.
+ */
+ protected PlatformMessageFilter(Object pred, long ptr, PlatformContext ctx) {
+ super(pred, ptr, ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Object m) {
+ if (ptr == 0)
+ return false; // Destroyed.
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(uuid);
+ writer.writeObject(m);
+
+ out.synchronize();
+
+ return ctx.gateway().messagingFilterApply(ptr, mem.pointer()) != 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(GridKernalContext kernalCtx) {
+ if (ptr != 0)
+ return;
+
+ ctx = PlatformUtils.platformContext(kernalCtx.grid());
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(pred);
+
+ out.synchronize();
+
+ ptr = ctx.gateway().messagingFilterCreate(mem.pointer());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ if (ptr == 0) // Already destroyed or not initialized yet.
+ return;
+
+ try {
+ assert ctx != null;
+
+ ctx.gateway().messagingFilterDestroy(ptr);
+ }
+ finally {
+ ptr = 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
new file mode 100644
index 0000000..71bb918
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.messaging;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+
+import java.util.*;
+
+/**
+ * Interop local filter. Delegates apply to native platform, uses id to identify native target.
+ */
+public class PlatformMessageLocalFilter implements GridLifecycleAwareMessageFilter<UUID, Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ protected final long hnd;
+
+ /** */
+ protected final PlatformContext platformCtx;
+
+ /**
+ * Constructor.
+ *
+ * @param hnd Handle in the native platform.
+ * @param ctx Context.
+ */
+ public PlatformMessageLocalFilter(long hnd, PlatformContext ctx) {
+ assert ctx != null;
+ assert hnd != 0;
+
+ this.hnd = hnd;
+ this.platformCtx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Object m) {
+ try (PlatformMemory mem = platformCtx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = platformCtx.writer(out);
+
+ writer.writeObject(uuid);
+ writer.writeObject(m);
+
+ out.synchronize();
+
+ int res = platformCtx.gateway().messagingFilterApply(hnd, mem.pointer());
+
+ return res != 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ platformCtx.gateway().messagingFilterDestroy(hnd);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(GridKernalContext ctx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ PlatformMessageLocalFilter filter = (PlatformMessageLocalFilter)o;
+
+ return hnd == filter.hnd;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (int)(hnd ^ (hnd >>> 32));
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
new file mode 100644
index 0000000..ffc2ab3
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.messaging;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ * Interop messaging.
+ */
+public class PlatformMessaging extends PlatformAbstractTarget {
+ /** */
+ public static final int OP_LOC_LISTEN = 1;
+
+ /** */
+ public static final int OP_REMOTE_LISTEN = 2;
+
+ /** */
+ public static final int OP_SEND = 3;
+
+ /** */
+ public static final int OP_SEND_MULTI = 4;
+
+ /** */
+ public static final int OP_SEND_ORDERED = 5;
+
+ /** */
+ public static final int OP_STOP_LOC_LISTEN = 6;
+
+ /** */
+ public static final int OP_STOP_REMOTE_LISTEN = 7;
+
+ /** */
+ private final IgniteMessaging messaging;
+
+ /**
+ * Ctor.
+ *
+ * @param platformCtx Context.
+ * @param messaging Ignite messaging.
+ */
+ public PlatformMessaging(PlatformContext platformCtx, IgniteMessaging messaging) {
+ super(platformCtx);
+
+ assert messaging != null;
+
+ this.messaging = messaging;
+ }
+
+ /**
+ * Gets messaging with asynchronous mode enabled.
+ *
+ * @return Messaging with asynchronous mode enabled.
+ */
+ public PlatformMessaging withAsync() {
+ if (messaging.isAsync())
+ return this;
+
+ return new PlatformMessaging (platformCtx, messaging.withAsync());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+ switch (type) {
+ case OP_SEND:
+ messaging.send(reader.readObjectDetached(), reader.readObjectDetached());
+
+ return TRUE;
+
+ case OP_SEND_MULTI:
+ messaging.send(reader.readObjectDetached(), PlatformUtils.readCollection(reader));
+
+ return TRUE;
+
+ case OP_SEND_ORDERED:
+ messaging.sendOrdered(reader.readObjectDetached(), reader.readObjectDetached(), reader.readLong());
+
+ return TRUE;
+
+ case OP_LOC_LISTEN: {
+ PlatformMessageLocalFilter filter = new PlatformMessageLocalFilter(reader.readLong(), platformCtx);
+
+ Object topic = reader.readObjectDetached();
+
+ messaging.localListen(topic, filter);
+
+ return TRUE;
+ }
+
+ case OP_STOP_LOC_LISTEN: {
+ PlatformMessageLocalFilter filter = new PlatformMessageLocalFilter(reader.readLong(), platformCtx);
+
+ Object topic = reader.readObjectDetached();
+
+ messaging.stopLocalListen(topic, filter);
+
+ return TRUE;
+ }
+
+ case OP_STOP_REMOTE_LISTEN: {
+ messaging.stopRemoteListen(reader.readUuid());
+
+ return TRUE;
+ }
+
+ default:
+ throw new IgniteCheckedException("Unsupported operation type: " + type);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"})
+ @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+ Object arg) throws IgniteCheckedException {
+ switch (type) {
+ case OP_REMOTE_LISTEN:{
+ Object nativeFilter = reader.readObjectDetached();
+
+ long ptr = reader.readLong(); // interop pointer
+
+ Object topic = reader.readObjectDetached();
+
+ GridLifecycleAwareMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
+
+ UUID listenId = messaging.remoteListen(topic, filter);
+
+ writer.writeUuid(listenId);
+
+ break;
+ }
+
+ default:
+ throw new IgniteCheckedException("Unsupported operation type: " + type);
+ }
+ }
+
+ /** <inheritDoc /> */
+ @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+ return messaging.future();
+ }
+}
\ No newline at end of file