You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/31 04:01:21 UTC

[38/50] [abbrv] ignite git commit: IGNITE-1314: Moved messaging to Ignite.

IGNITE-1314: Moved messaging to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/27a59cf8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/27a59cf8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/27a59cf8

Branch: refs/heads/ignite-843
Commit: 27a59cf8a18a9f12e42fd0dc54890f6e44d91515
Parents: f4c7107
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 09:54:10 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 09:54:10 2015 +0300

----------------------------------------------------------------------
 .../processors/platform/PlatformContext.java    |  10 ++
 .../messaging/PlatformMessageFilter.java        | 109 +++++++++++++
 .../messaging/PlatformMessageLocalFilter.java   | 102 ++++++++++++
 .../platform/messaging/PlatformMessaging.java   | 162 +++++++++++++++++++
 4 files changed, 383 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 461fb84..68e0e35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform;
 
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.portable.*;
 import org.apache.ignite.internal.processors.cache.query.continuous.*;
 import org.apache.ignite.internal.processors.platform.cache.query.*;
@@ -154,4 +155,13 @@ public interface PlatformContext {
      * @return Filter.
      */
     public CacheContinuousQueryFilterEx createContinuousQueryFilter(Object filter);
+
+    /**
+     * Create remote message filter.
+     *
+     * @param filter Native filter.
+     * @param ptr Pointer of deployed native filter.
+     * @return Filter.
+     */
+    public GridLifecycleAwareMessageFilter<UUID, Object> createRemoteMessageFilter(Object filter, long ptr);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
new file mode 100644
index 0000000..8a433ac
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.messaging;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+
+import java.util.*;
+
+/**
+ * Interop filter. Delegates apply to native platform.
+ */
+public class PlatformMessageFilter extends PlatformAbstractPredicate
+    implements GridLifecycleAwareMessageFilter<UUID, Object> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Constructor.
+     */
+    public PlatformMessageFilter()
+    {
+        super();
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param pred .Net portable predicate.
+     * @param ptr Pointer to predicate in the native platform.
+     * @param ctx Kernal context.
+     */
+    protected PlatformMessageFilter(Object pred, long ptr, PlatformContext ctx) {
+        super(pred, ptr, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(UUID uuid, Object m) {
+        if (ptr == 0)
+            return false;  // Destroyed.
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(uuid);
+            writer.writeObject(m);
+
+            out.synchronize();
+
+            return ctx.gateway().messagingFilterApply(ptr, mem.pointer()) != 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(GridKernalContext kernalCtx) {
+        if (ptr != 0)
+            return;
+
+        ctx = PlatformUtils.platformContext(kernalCtx.grid());
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(pred);
+
+            out.synchronize();
+
+            ptr = ctx.gateway().messagingFilterCreate(mem.pointer());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (ptr == 0) // Already destroyed or not initialized yet.
+            return;
+
+        try {
+            assert ctx != null;
+
+            ctx.gateway().messagingFilterDestroy(ptr);
+        }
+        finally {
+            ptr = 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
new file mode 100644
index 0000000..71bb918
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.messaging;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+
+import java.util.*;
+
+/**
+ * Interop local filter. Delegates apply to native platform, uses id to identify native target.
+ */
+public class PlatformMessageLocalFilter implements GridLifecycleAwareMessageFilter<UUID, Object> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    protected final long hnd;
+
+    /** */
+    protected final PlatformContext platformCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param hnd Handle in the native platform.
+     * @param ctx Context.
+     */
+    public PlatformMessageLocalFilter(long hnd, PlatformContext ctx) {
+        assert ctx != null;
+        assert hnd != 0;
+
+        this.hnd = hnd;
+        this.platformCtx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(UUID uuid, Object m) {
+        try (PlatformMemory mem = platformCtx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = platformCtx.writer(out);
+
+            writer.writeObject(uuid);
+            writer.writeObject(m);
+
+            out.synchronize();
+
+            int res = platformCtx.gateway().messagingFilterApply(hnd, mem.pointer());
+
+            return res != 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        platformCtx.gateway().messagingFilterDestroy(hnd);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(GridKernalContext ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        PlatformMessageLocalFilter filter = (PlatformMessageLocalFilter)o;
+
+        return hnd == filter.hnd;
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return (int)(hnd ^ (hnd >>> 32));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/27a59cf8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
new file mode 100644
index 0000000..ffc2ab3
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.messaging;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ * Interop messaging.
+ */
+public class PlatformMessaging extends PlatformAbstractTarget {
+    /** */
+    public static final int OP_LOC_LISTEN = 1;
+
+    /** */
+    public static final int OP_REMOTE_LISTEN = 2;
+
+    /** */
+    public static final int OP_SEND = 3;
+
+    /** */
+    public static final int OP_SEND_MULTI = 4;
+
+    /** */
+    public static final int OP_SEND_ORDERED = 5;
+
+    /** */
+    public static final int OP_STOP_LOC_LISTEN = 6;
+
+    /** */
+    public static final int OP_STOP_REMOTE_LISTEN = 7;
+
+    /** */
+    private final IgniteMessaging messaging;
+
+    /**
+     * Ctor.
+     *
+     * @param platformCtx Context.
+     * @param messaging Ignite messaging.
+     */
+    public PlatformMessaging(PlatformContext platformCtx, IgniteMessaging messaging) {
+        super(platformCtx);
+
+        assert messaging != null;
+
+        this.messaging = messaging;
+    }
+
+    /**
+     * Gets messaging with asynchronous mode enabled.
+     *
+     * @return Messaging with asynchronous mode enabled.
+     */
+    public PlatformMessaging withAsync() {
+        if (messaging.isAsync())
+            return this;
+
+        return new PlatformMessaging (platformCtx, messaging.withAsync());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_SEND:
+                messaging.send(reader.readObjectDetached(), reader.readObjectDetached());
+
+                return TRUE;
+
+            case OP_SEND_MULTI:
+                messaging.send(reader.readObjectDetached(), PlatformUtils.readCollection(reader));
+
+                return TRUE;
+
+            case OP_SEND_ORDERED:
+                messaging.sendOrdered(reader.readObjectDetached(), reader.readObjectDetached(), reader.readLong());
+
+                return TRUE;
+
+            case OP_LOC_LISTEN: {
+                PlatformMessageLocalFilter filter = new PlatformMessageLocalFilter(reader.readLong(), platformCtx);
+
+                Object topic = reader.readObjectDetached();
+
+                messaging.localListen(topic, filter);
+
+                return TRUE;
+            }
+
+            case OP_STOP_LOC_LISTEN: {
+                PlatformMessageLocalFilter filter = new PlatformMessageLocalFilter(reader.readLong(), platformCtx);
+
+                Object topic = reader.readObjectDetached();
+
+                messaging.stopLocalListen(topic, filter);
+
+                return TRUE;
+            }
+
+            case OP_STOP_REMOTE_LISTEN: {
+                messaging.stopRemoteListen(reader.readUuid());
+
+                return TRUE;
+            }
+
+            default:
+                throw new IgniteCheckedException("Unsupported operation type: " + type);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"})
+    @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+        Object arg) throws IgniteCheckedException {
+        switch (type) {
+            case OP_REMOTE_LISTEN:{
+                Object nativeFilter = reader.readObjectDetached();
+
+                long ptr = reader.readLong();  // interop pointer
+
+                Object topic = reader.readObjectDetached();
+
+                GridLifecycleAwareMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
+
+                UUID listenId = messaging.remoteListen(topic, filter);
+
+                writer.writeUuid(listenId);
+
+                break;
+            }
+
+            default:
+                throw new IgniteCheckedException("Unsupported operation type: " + type);
+        }
+    }
+
+    /** <inheritDoc /> */
+    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+        return messaging.future();
+    }
+}
\ No newline at end of file