You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/31 04:01:22 UTC
[39/50] [abbrv] ignite git commit: IGNITE-1315: Moved events to
Ignite.
IGNITE-1315: Moved events to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a0eeea6f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a0eeea6f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a0eeea6f
Branch: refs/heads/ignite-843
Commit: a0eeea6fb61c203f5a3ec7b7e394839223c27eb3
Parents: 27a59cf
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 11:00:44 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 11:00:44 2015 +0300
----------------------------------------------------------------------
.../platform/PlatformAwareEventFilter.java | 4 +-
.../processors/platform/PlatformContext.java | 34 ++
.../platform/events/PlatformEventFilter.java | 161 ++++++++
.../platform/events/PlatformEvents.java | 388 +++++++++++++++++++
4 files changed, 586 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
index f056bbf..b09d889 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
@@ -21,10 +21,12 @@ import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.lang.*;
+import java.util.*;
+
/**
* Special version of predicate for events with initialize/close callbacks.
*/
-public interface PlatformAwareEventFilter<E extends Event> extends IgnitePredicate<E> {
+public interface PlatformAwareEventFilter<E extends Event> extends IgnitePredicate<E>, IgniteBiPredicate<UUID, E> {
/**
* Initializes the filter.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 68e0e35..82a42d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.platform;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.portable.*;
@@ -164,4 +165,37 @@ public interface PlatformContext {
* @return Filter.
*/
public GridLifecycleAwareMessageFilter<UUID, Object> createRemoteMessageFilter(Object filter, long ptr);
+
+ /**
+ * Check whether the given event type is supported.
+ *
+ * @param evtTyp Event type.
+ * @return {@code True} if supported.
+ */
+ public boolean isEventTypeSupported(int evtTyp);
+
+ /**
+ * Write event.
+ *
+ * @param writer Writer.
+ * @param event Event.
+ */
+ public void writeEvent(PortableRawWriterEx writer, EventAdapter event);
+
+ /**
+ * Create local event filter.
+ *
+ * @param hnd Native handle.
+ * @return Filter.
+ */
+ public <E extends Event> PlatformAwareEventFilter<E> createLocalEventFilter(long hnd);
+
+ /**
+ * Create remote event filter.
+ *
+ * @param pred Native predicate.
+ * @param types Event types.
+ * @return Filter.
+ */
+ public <E extends Event> PlatformAwareEventFilter<E> createRemoteEventFilter(Object pred, final int... types);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
new file mode 100644
index 0000000..7255dbb
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.events;
+
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+
+import java.util.*;
+
+/**
+ * Platform event filter. Delegates apply to native platform.
+ */
+public class PlatformEventFilter<E extends Event> implements PlatformAwareEventFilter<E>, PlatformLocalEventListener
+{
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final Object pred;
+
+ /** Event types. */
+ private final int[] types;
+
+ /** */
+ protected transient long hnd;
+
+ /** */
+ private transient PlatformContext ctx;
+
+ /**
+ * Constructor.
+ *
+ * @param hnd Handle in the native platform.
+ * @param ctx Context.
+ */
+ public PlatformEventFilter(long hnd, PlatformContext ctx) {
+ assert ctx != null;
+ assert hnd != 0;
+
+ this.hnd = hnd;
+ this.ctx = ctx;
+
+ pred = null;
+ types = null;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param pred .Net portable predicate.
+ */
+ public PlatformEventFilter(Object pred, final int... types) {
+ assert pred != null;
+
+ this.pred = pred;
+ this.types = types;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(E evt) {
+ return apply0(null, evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, E evt) {
+ return apply0(uuid, evt);
+ }
+
+ /**
+ * Apply impl.
+ * @param uuid Node if.
+ * @param evt Event.
+ * @return Result.
+ */
+ private boolean apply0(final UUID uuid, final E evt) {
+ if (!ctx.isEventTypeSupported(evt.type()))
+ return false;
+
+ if (types != null) {
+ boolean match = false;
+
+ for (int type : types) {
+ if (type == evt.type()) {
+ match = true;
+ break;
+ }
+ }
+
+ if (!match)
+ return false;
+ }
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ ctx.writeEvent(writer, (EventAdapter)evt);
+
+ writer.writeUuid(uuid);
+
+ out.synchronize();
+
+ int res = ctx.gateway().eventFilterApply(hnd, mem.pointer());
+
+ return res != 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ ctx.gateway().eventFilterDestroy(hnd);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(GridKernalContext gridCtx) {
+ ctx = PlatformUtils.platformContext(gridCtx.grid());
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObjectDetached(pred);
+
+ out.synchronize();
+
+ hnd = ctx.gateway().eventFilterCreate(mem.pointer());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return this == o || o != null && o instanceof PlatformEventFilter &&
+ hnd == ((PlatformEventFilter)o).hnd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (int)(hnd ^ (hnd >>> 32));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a0eeea6f/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
new file mode 100644
index 0000000..befc3bd
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.events;
+
+import org.apache.ignite.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Interop events.
+ */
+public class PlatformEvents extends PlatformAbstractTarget {
+ /** */
+ private static final int OP_REMOTE_QUERY = 1;
+
+ /** */
+ private static final int OP_REMOTE_LISTEN = 2;
+
+ /** */
+ private static final int OP_STOP_REMOTE_LISTEN = 3;
+
+ /** */
+ private static final int OP_WAIT_FOR_LOCAL = 4;
+
+ /** */
+ private static final int OP_LOCAL_QUERY = 5;
+
+ /** */
+ private static final int OP_RECORD_LOCAL = 6;
+
+ /** */
+ private static final int OP_ENABLE_LOCAL = 8;
+
+ /** */
+ private static final int OP_DISABLE_LOCAL = 9;
+
+ /** */
+ private static final int OP_GET_ENABLED_EVENTS = 10;
+
+ /** */
+ private final IgniteEvents events;
+
+ /** */
+ private final EventResultWriter eventResWriter;
+
+ /** */
+ private final EventCollectionResultWriter eventColResWriter;
+
+ /**
+ * Ctor.
+ *
+ * @param platformCtx Context.
+ * @param events Ignite events.
+ */
+ public PlatformEvents(PlatformContext platformCtx, IgniteEvents events) {
+ super(platformCtx);
+
+ assert events != null;
+
+ this.events = events;
+
+ eventResWriter = new EventResultWriter(platformCtx);
+ eventColResWriter = new EventCollectionResultWriter(platformCtx);
+ }
+
+ /**
+ * Gets events with asynchronous mode enabled.
+ *
+ * @return Events with asynchronous mode enabled.
+ */
+ public PlatformEvents withAsync() {
+ if (events.isAsync())
+ return this;
+
+ return new PlatformEvents(platformCtx, events.withAsync());
+ }
+
+ /**
+ * Adds an event listener for local events.
+ *
+ * @param hnd Interop listener handle.
+ * @param type Event type.
+ */
+ @SuppressWarnings({"unchecked"})
+ public void localListen(long hnd, int type) {
+ events.localListen(localFilter(hnd), type);
+ }
+
+ /**
+ * Removes an event listener for local events.
+ *
+ * @param hnd Interop listener handle.
+ */
+ @SuppressWarnings({"UnusedDeclaration", "unchecked"})
+ public boolean stopLocalListen(long hnd) {
+ return events.stopLocalListen(localFilter(hnd));
+ }
+
+ /**
+ * Check if event is enabled.
+ *
+ * @param type Event type.
+ * @return {@code True} if event of passed in type is enabled.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public boolean isEnabled(int type) {
+ return events.isEnabled(type);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+ switch (type) {
+ case OP_RECORD_LOCAL:
+ // TODO: GG-10244
+ break;
+
+ case OP_ENABLE_LOCAL:
+
+ events.enableLocal(readEventTypes(reader));
+
+ return TRUE;
+
+ case OP_DISABLE_LOCAL:
+
+ events.disableLocal(readEventTypes(reader));
+
+ return TRUE;
+
+ case OP_STOP_REMOTE_LISTEN:
+ events.stopRemoteListen(reader.readUuid());
+
+ return TRUE;
+ }
+
+ throw new IgniteCheckedException("Unsupported operation type: " + type);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"})
+ @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+ Object arg) throws IgniteCheckedException {
+ switch (type) {
+ case OP_LOCAL_QUERY: {
+ Collection<EventAdapter> result =
+ events.localQuery(F.<EventAdapter>alwaysTrue(), readEventTypes(reader));
+
+ writer.writeInt(result.size());
+
+ for (EventAdapter e : result)
+ platformCtx.writeEvent(writer, e);
+
+ break;
+ }
+
+ case OP_WAIT_FOR_LOCAL: {
+ boolean hasFilter = reader.readBoolean();
+
+ IgnitePredicate pred = hasFilter ? localFilter(reader.readLong()) : null;
+
+ int[] eventTypes = readEventTypes(reader);
+
+ EventAdapter result = (EventAdapter) events.waitForLocal(pred, eventTypes);
+
+ platformCtx.writeEvent(writer, result);
+
+ break;
+ }
+
+ case OP_REMOTE_LISTEN: {
+ int bufSize = reader.readInt();
+
+ long interval = reader.readLong();
+
+ boolean autoUnsubscribe = reader.readBoolean();
+
+ boolean hasLocFilter = reader.readBoolean();
+
+ PlatformAwareEventFilter locFilter = hasLocFilter ? localFilter(reader.readLong()) : null;
+
+ boolean hasRmtFilter = reader.readBoolean();
+
+ UUID listenId;
+
+ if (hasRmtFilter) {
+ PlatformAwareEventFilter rmtFilter = platformCtx.createRemoteEventFilter(
+ reader.readObjectDetached(), readEventTypes(reader));
+
+ listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, rmtFilter);
+ }
+ else
+ listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, null,
+ readEventTypes(reader));
+
+ writer.writeUuid(listenId);
+
+ break;
+ }
+
+ case OP_REMOTE_QUERY: {
+ Object pred = reader.readObjectDetached();
+
+ long timeout = reader.readLong();
+
+ int[] types = readEventTypes(reader);
+
+ PlatformAwareEventFilter filter = platformCtx.createRemoteEventFilter(pred, types);
+
+ Collection<EventAdapter> result = events.remoteQuery(filter, timeout);
+
+ if (result == null)
+ writer.writeInt(-1);
+ else {
+ writer.writeInt(result.size());
+
+ for (EventAdapter e : result)
+ platformCtx.writeEvent(writer, e);
+ }
+
+ break;
+ }
+
+ default:
+ throw new IgniteCheckedException("Unsupported operation type: " + type);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processOutOp(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+ switch (type) {
+ case OP_GET_ENABLED_EVENTS:
+ writeEventTypes(events.enabledEvents(), writer);
+
+ break;
+
+ default:
+ throwUnsupported(type);
+ }
+ }
+
+ /** <inheritDoc /> */
+ @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+ return events.future();
+ }
+
+ /** <inheritDoc /> */
+ @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
+ switch (opId) {
+ case OP_WAIT_FOR_LOCAL:
+ return eventResWriter;
+
+ case OP_REMOTE_QUERY:
+ return eventColResWriter;
+ }
+
+ return null;
+ }
+
+ /**
+ * Reads event types array.
+ *
+ * @param reader Reader
+ * @return Event types, or null.
+ */
+ private int[] readEventTypes(PortableRawReaderEx reader) {
+ return reader.readIntArray();
+ }
+
+ /**
+ * Reads event types array.
+ *
+ * @param writer Writer
+ * @param types Types.
+ */
+ private void writeEventTypes(int[] types, PortableRawWriterEx writer) {
+ if (types == null) {
+ writer.writeIntArray(null);
+
+ return;
+ }
+
+ int[] resultTypes = new int[types.length];
+
+ int idx = 0;
+
+ for (int t : types)
+ if (platformCtx.isEventTypeSupported(t))
+ resultTypes[idx++] = t;
+
+ writer.writeIntArray(Arrays.copyOf(resultTypes, idx));
+ }
+
+ /**
+ * Creates an interop filter from handle.
+ *
+ * @param hnd Handle.
+ * @return Interop filter.
+ */
+ private PlatformAwareEventFilter localFilter(long hnd) {
+ return platformCtx.createLocalEventFilter(hnd);
+ }
+
+ /**
+ * Writes an EventBase.
+ */
+ private static class EventResultWriter implements PlatformFutureUtils.Writer {
+ /** */
+ private final PlatformContext platformCtx;
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ */
+ public EventResultWriter(PlatformContext platformCtx) {
+ assert platformCtx != null;
+
+ this.platformCtx = platformCtx;
+ }
+
+ /** <inheritDoc /> */
+ @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+ platformCtx.writeEvent(writer, (EventAdapter)obj);
+ }
+
+ /** <inheritDoc /> */
+ @Override public boolean canWrite(Object obj, Throwable err) {
+ return obj instanceof EventAdapter && err == null;
+ }
+ }
+
+ /**
+ * Writes a collection of EventAdapter.
+ */
+ private static class EventCollectionResultWriter implements PlatformFutureUtils.Writer {
+ /** */
+ private final PlatformContext platformCtx;
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ */
+ public EventCollectionResultWriter(PlatformContext platformCtx) {
+ assert platformCtx != null;
+
+ this.platformCtx = platformCtx;
+ }
+
+ /** <inheritDoc /> */
+ @SuppressWarnings("unchecked")
+ @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+ Collection<EventAdapter> events = (Collection<EventAdapter>)obj;
+
+ writer.writeInt(events.size());
+
+ for (EventAdapter e : events)
+ platformCtx.writeEvent(writer, e);
+ }
+
+ /** <inheritDoc /> */
+ @Override public boolean canWrite(Object obj, Throwable err) {
+ return obj instanceof Collection && err == null;
+ }
+ }
+}
+