You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/28 10:00:06 UTC

ignite git commit: IGNITE-1315: Moved events to Ignite.

Repository: ignite
Updated Branches:
  refs/heads/master 27a59cf8a -> a0eeea6fb


IGNITE-1315: Moved events to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a0eeea6f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a0eeea6f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a0eeea6f

Branch: refs/heads/master
Commit: a0eeea6fb61c203f5a3ec7b7e394839223c27eb3
Parents: 27a59cf
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 11:00:44 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 11:00:44 2015 +0300

----------------------------------------------------------------------
 .../platform/PlatformAwareEventFilter.java      |   4 +-
 .../processors/platform/PlatformContext.java    |  34 ++
 .../platform/events/PlatformEventFilter.java    | 161 ++++++++
 .../platform/events/PlatformEvents.java         | 388 +++++++++++++++++++
 4 files changed, 586 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
index f056bbf..b09d889 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
@@ -21,10 +21,12 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.lang.*;
 
+import java.util.*;
+
 /**
  * Special version of predicate for events with initialize/close callbacks.
  */
-public interface PlatformAwareEventFilter<E extends Event> extends IgnitePredicate<E> {
+public interface PlatformAwareEventFilter<E extends Event> extends IgnitePredicate<E>, IgniteBiPredicate<UUID, E> {
     /**
      * Initializes the filter.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 68e0e35..82a42d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.platform;
 
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.portable.*;
@@ -164,4 +165,37 @@ public interface PlatformContext {
      * @return Filter.
      */
     public GridLifecycleAwareMessageFilter<UUID, Object> createRemoteMessageFilter(Object filter, long ptr);
+
+    /**
+     * Check whether the given event type is supported.
+     *
+     * @param evtTyp Event type.
+     * @return {@code True} if supported.
+     */
+    public boolean isEventTypeSupported(int evtTyp);
+
+    /**
+     * Write event.
+     *
+     * @param writer Writer.
+     * @param event Event.
+     */
+    public void writeEvent(PortableRawWriterEx writer, EventAdapter event);
+
+    /**
+     * Create local event filter.
+     *
+     * @param hnd Native handle.
+     * @return Filter.
+     */
+    public <E extends Event> PlatformAwareEventFilter<E> createLocalEventFilter(long hnd);
+
+    /**
+     * Create remote event filter.
+     *
+     * @param pred Native predicate.
+     * @param types Event types.
+     * @return Filter.
+     */
+    public <E extends Event> PlatformAwareEventFilter<E> createRemoteEventFilter(Object pred, final int... types);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
new file mode 100644
index 0000000..7255dbb
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.events;
+
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+
+import java.util.*;
+
+/**
+ * Platform event filter. Delegates apply to native platform.
+ */
+public class PlatformEventFilter<E extends Event> implements PlatformAwareEventFilter<E>, PlatformLocalEventListener
+{
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final Object pred;
+
+    /** Event types. */
+    private final int[] types;
+
+    /** */
+    protected transient long hnd;
+
+    /** */
+    private transient PlatformContext ctx;
+
+    /**
+     * Constructor.
+     *
+     * @param hnd Handle in the native platform.
+     * @param ctx Context.
+     */
+    public PlatformEventFilter(long hnd, PlatformContext ctx) {
+        assert ctx != null;
+        assert hnd != 0;
+
+        this.hnd = hnd;
+        this.ctx = ctx;
+
+        pred = null;
+        types = null;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param pred .Net portable predicate.
+     */
+    public PlatformEventFilter(Object pred, final int... types) {
+        assert pred != null;
+
+        this.pred = pred;
+        this.types = types;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(E evt) {
+        return apply0(null, evt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(UUID uuid, E evt) {
+        return apply0(uuid, evt);
+    }
+
+    /**
+     * Apply impl.
+     * @param uuid Node if.
+     * @param evt Event.
+     * @return Result.
+     */
+    private boolean apply0(final UUID uuid, final E evt) {
+        if (!ctx.isEventTypeSupported(evt.type()))
+            return false;
+
+        if (types != null) {
+            boolean match = false;
+
+            for (int type : types) {
+                if (type == evt.type()) {
+                    match = true;
+                    break;
+                }
+            }
+
+            if (!match)
+                return false;
+        }
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            ctx.writeEvent(writer, (EventAdapter)evt);
+
+            writer.writeUuid(uuid);
+
+            out.synchronize();
+
+            int res = ctx.gateway().eventFilterApply(hnd, mem.pointer());
+
+            return res != 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        ctx.gateway().eventFilterDestroy(hnd);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(GridKernalContext gridCtx) {
+        ctx = PlatformUtils.platformContext(gridCtx.grid());
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObjectDetached(pred);
+
+            out.synchronize();
+
+            hnd = ctx.gateway().eventFilterCreate(mem.pointer());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        return this == o || o != null && o instanceof PlatformEventFilter &&
+            hnd == ((PlatformEventFilter)o).hnd;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return (int)(hnd ^ (hnd >>> 32));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
new file mode 100644
index 0000000..befc3bd
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.events;
+
+import org.apache.ignite.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop events.
+ */
+public class PlatformEvents extends PlatformAbstractTarget {
+    /** */
+    private static final int OP_REMOTE_QUERY = 1;
+
+    /** */
+    private static final int OP_REMOTE_LISTEN = 2;
+
+    /** */
+    private static final int OP_STOP_REMOTE_LISTEN = 3;
+
+    /** */
+    private static final int OP_WAIT_FOR_LOCAL = 4;
+
+    /** */
+    private static final int OP_LOCAL_QUERY = 5;
+
+    /** */
+    private static final int OP_RECORD_LOCAL = 6;
+
+    /** */
+    private static final int OP_ENABLE_LOCAL = 8;
+
+    /** */
+    private static final int OP_DISABLE_LOCAL = 9;
+
+    /** */
+    private static final int OP_GET_ENABLED_EVENTS = 10;
+
+    /** */
+    private final IgniteEvents events;
+
+    /** */
+    private final EventResultWriter eventResWriter;
+
+    /** */
+    private final EventCollectionResultWriter eventColResWriter;
+
+    /**
+     * Ctor.
+     *
+     * @param platformCtx Context.
+     * @param events Ignite events.
+     */
+    public PlatformEvents(PlatformContext platformCtx, IgniteEvents events) {
+        super(platformCtx);
+
+        assert events != null;
+
+        this.events = events;
+
+        eventResWriter = new EventResultWriter(platformCtx);
+        eventColResWriter = new EventCollectionResultWriter(platformCtx);
+    }
+
+    /**
+     * Gets events with asynchronous mode enabled.
+     *
+     * @return Events with asynchronous mode enabled.
+     */
+    public PlatformEvents withAsync() {
+        if (events.isAsync())
+            return this;
+
+        return new PlatformEvents(platformCtx, events.withAsync());
+    }
+
+    /**
+     * Adds an event listener for local events.
+     *
+     * @param hnd Interop listener handle.
+     * @param type Event type.
+     */
+    @SuppressWarnings({"unchecked"})
+    public void localListen(long hnd, int type) {
+        events.localListen(localFilter(hnd), type);
+    }
+
+    /**
+     * Removes an event listener for local events.
+     *
+     * @param hnd Interop listener handle.
+     */
+    @SuppressWarnings({"UnusedDeclaration", "unchecked"})
+    public boolean stopLocalListen(long hnd) {
+        return events.stopLocalListen(localFilter(hnd));
+    }
+
+    /**
+     * Check if event is enabled.
+     *
+     * @param type Event type.
+     * @return {@code True} if event of passed in type is enabled.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public boolean isEnabled(int type) {
+        return events.isEnabled(type);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_RECORD_LOCAL:
+                // TODO: GG-10244
+                break;
+
+            case OP_ENABLE_LOCAL:
+
+                events.enableLocal(readEventTypes(reader));
+
+                return TRUE;
+
+            case OP_DISABLE_LOCAL:
+
+                events.disableLocal(readEventTypes(reader));
+
+                return TRUE;
+
+            case OP_STOP_REMOTE_LISTEN:
+                events.stopRemoteListen(reader.readUuid());
+
+                return TRUE;
+        }
+
+        throw new IgniteCheckedException("Unsupported operation type: " + type);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"})
+    @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+        Object arg) throws IgniteCheckedException {
+        switch (type) {
+            case OP_LOCAL_QUERY: {
+                Collection<EventAdapter> result =
+                    events.localQuery(F.<EventAdapter>alwaysTrue(), readEventTypes(reader));
+
+                writer.writeInt(result.size());
+
+                for (EventAdapter e : result)
+                    platformCtx.writeEvent(writer, e);
+
+                break;
+            }
+
+            case OP_WAIT_FOR_LOCAL: {
+                boolean hasFilter = reader.readBoolean();
+
+                IgnitePredicate pred = hasFilter ? localFilter(reader.readLong()) : null;
+
+                int[] eventTypes = readEventTypes(reader);
+
+                EventAdapter result = (EventAdapter) events.waitForLocal(pred, eventTypes);
+
+                platformCtx.writeEvent(writer, result);
+
+                break;
+            }
+
+            case OP_REMOTE_LISTEN: {
+                int bufSize = reader.readInt();
+
+                long interval = reader.readLong();
+
+                boolean autoUnsubscribe = reader.readBoolean();
+
+                boolean hasLocFilter = reader.readBoolean();
+
+                PlatformAwareEventFilter locFilter = hasLocFilter ? localFilter(reader.readLong()) : null;
+
+                boolean hasRmtFilter = reader.readBoolean();
+
+                UUID listenId;
+
+                if (hasRmtFilter) {
+                    PlatformAwareEventFilter rmtFilter = platformCtx.createRemoteEventFilter(
+                        reader.readObjectDetached(), readEventTypes(reader));
+
+                    listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, rmtFilter);
+                }
+                else
+                    listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, null,
+                        readEventTypes(reader));
+
+                writer.writeUuid(listenId);
+
+                break;
+            }
+
+            case OP_REMOTE_QUERY: {
+                Object pred = reader.readObjectDetached();
+
+                long timeout = reader.readLong();
+
+                int[] types = readEventTypes(reader);
+
+                PlatformAwareEventFilter filter = platformCtx.createRemoteEventFilter(pred, types);
+
+                Collection<EventAdapter> result = events.remoteQuery(filter, timeout);
+
+                if (result == null)
+                    writer.writeInt(-1);
+                else {
+                    writer.writeInt(result.size());
+
+                    for (EventAdapter e : result)
+                        platformCtx.writeEvent(writer, e);
+                }
+
+                break;
+            }
+
+            default:
+                throw new IgniteCheckedException("Unsupported operation type: " + type);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void processOutOp(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+        switch (type) {
+            case OP_GET_ENABLED_EVENTS:
+                writeEventTypes(events.enabledEvents(), writer);
+
+                break;
+
+            default:
+                throwUnsupported(type);
+        }
+    }
+
+    /** <inheritDoc /> */
+    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+        return events.future();
+    }
+
+    /** <inheritDoc /> */
+    @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
+        switch (opId) {
+            case OP_WAIT_FOR_LOCAL:
+                return eventResWriter;
+
+            case OP_REMOTE_QUERY:
+                return eventColResWriter;
+        }
+
+        return null;
+    }
+
+    /**
+     *  Reads event types array.
+     *
+     * @param reader Reader
+     * @return Event types, or null.
+     */
+    private int[] readEventTypes(PortableRawReaderEx reader) {
+        return reader.readIntArray();
+    }
+
+    /**
+     *  Reads event types array.
+     *
+     * @param writer Writer
+     * @param types Types.
+     */
+    private void writeEventTypes(int[] types, PortableRawWriterEx writer) {
+        if (types == null) {
+            writer.writeIntArray(null);
+
+            return;
+        }
+
+        int[] resultTypes = new int[types.length];
+
+        int idx = 0;
+
+        for (int t : types)
+            if (platformCtx.isEventTypeSupported(t))
+                resultTypes[idx++] = t;
+
+        writer.writeIntArray(Arrays.copyOf(resultTypes, idx));
+    }
+
+    /**
+     * Creates an interop filter from handle.
+     *
+     * @param hnd Handle.
+     * @return Interop filter.
+     */
+    private PlatformAwareEventFilter localFilter(long hnd) {
+        return platformCtx.createLocalEventFilter(hnd);
+    }
+
+    /**
+     * Writes an EventBase.
+     */
+    private static class EventResultWriter implements PlatformFutureUtils.Writer {
+        /** */
+        private final PlatformContext platformCtx;
+
+        /**
+         * Constructor.
+         *
+         * @param platformCtx Context.
+         */
+        public EventResultWriter(PlatformContext platformCtx) {
+            assert platformCtx != null;
+
+            this.platformCtx = platformCtx;
+        }
+
+        /** <inheritDoc /> */
+        @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+            platformCtx.writeEvent(writer, (EventAdapter)obj);
+        }
+
+        /** <inheritDoc /> */
+        @Override public boolean canWrite(Object obj, Throwable err) {
+            return obj instanceof EventAdapter && err == null;
+        }
+    }
+
+    /**
+     * Writes a collection of EventAdapter.
+     */
+    private static class EventCollectionResultWriter implements PlatformFutureUtils.Writer {
+        /** */
+        private final PlatformContext platformCtx;
+
+        /**
+         * Constructor.
+         *
+         * @param platformCtx Context.
+         */
+        public EventCollectionResultWriter(PlatformContext platformCtx) {
+            assert platformCtx != null;
+
+            this.platformCtx = platformCtx;
+        }
+
+        /** <inheritDoc /> */
+        @SuppressWarnings("unchecked")
+        @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+            Collection<EventAdapter> events = (Collection<EventAdapter>)obj;
+
+            writer.writeInt(events.size());
+
+            for (EventAdapter e : events)
+                platformCtx.writeEvent(writer, e);
+        }
+
+        /** <inheritDoc /> */
+        @Override public boolean canWrite(Object obj, Throwable err) {
+            return obj instanceof Collection && err == null;
+        }
+    }
+}
+