You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/09/02 04:00:44 UTC
[16/29] ignite git commit: Improving platform interfaces.
Improving platform interfaces.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/32579d45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/32579d45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/32579d45
Branch: refs/heads/ignite-843
Commit: 32579d450377f177fc6078c4d9686de88dd21220
Parents: 0e25f55
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 1 12:49:16 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 1 12:49:16 2015 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 15 +-
.../managers/communication/GridIoManager.java | 50 +++---
.../GridLifecycleAwareMessageFilter.java | 38 -----
.../eventstorage/GridEventStorageManager.java | 16 +-
.../platform/PlatformAwareEventFilter.java | 39 -----
.../processors/platform/PlatformContext.java | 15 +-
.../platform/PlatformEventFilterListener.java | 39 +++++
.../platform/PlatformLocalEventListener.java | 28 ----
.../platform/message/PlatformMessageFilter.java | 40 +++++
.../platform/events/PlatformEventFilter.java | 164 -------------------
.../events/PlatformEventFilterListenerImpl.java | 163 ++++++++++++++++++
.../platform/events/PlatformEvents.java | 15 +-
.../messaging/PlatformMessageFilter.java | 110 -------------
.../messaging/PlatformMessageFilterImpl.java | 110 +++++++++++++
.../messaging/PlatformMessageLocalFilter.java | 9 +-
.../platform/messaging/PlatformMessaging.java | 7 +-
16 files changed, 416 insertions(+), 442 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 93e01e5..599d301 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -38,8 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
-import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter;
-import org.apache.ignite.internal.processors.platform.PlatformLocalEventListener;
+import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.T3;
@@ -139,8 +138,8 @@ class GridEventConsumeHandler implements GridContinuousHandler {
if (filter != null)
ctx.resource().injectGeneric(filter);
- if (filter instanceof PlatformAwareEventFilter)
- ((PlatformAwareEventFilter)filter).initialize(ctx);
+ if (filter instanceof PlatformEventFilterListener)
+ ((PlatformEventFilterListener)filter).initialize(ctx);
final boolean loc = nodeId.equals(ctx.localNodeId());
@@ -260,16 +259,16 @@ class GridEventConsumeHandler implements GridContinuousHandler {
RuntimeException err = null;
try {
- if (filter instanceof PlatformAwareEventFilter)
- ((PlatformAwareEventFilter)filter).close();
+ if (filter instanceof PlatformEventFilterListener)
+ ((PlatformEventFilterListener)filter).onClose();
}
catch(RuntimeException ex) {
err = ex;
}
try {
- if (cb instanceof PlatformLocalEventListener)
- ((PlatformLocalEventListener)cb).close();
+ if (cb instanceof PlatformEventFilterListener)
+ ((PlatformEventFilterListener)cb).onClose();
}
catch (RuntimeException ex) {
if (err == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index aa73296..b8af8da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -17,26 +17,6 @@
package org.apache.ignite.internal.managers.communication;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -54,6 +34,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
@@ -83,6 +64,27 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -1457,8 +1459,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) {
if (p != null) {
try {
- if (p instanceof GridLifecycleAwareMessageFilter)
- ((GridLifecycleAwareMessageFilter)p).initialize(ctx);
+ if (p instanceof PlatformMessageFilter)
+ ((PlatformMessageFilter)p).initialize(ctx);
else
ctx.resource().injectGeneric(p);
@@ -1795,8 +1797,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (lsnr instanceof GridUserMessageListener) {
GridUserMessageListener userLsnr = (GridUserMessageListener)lsnr;
- if (userLsnr.predLsnr instanceof GridLifecycleAwareMessageFilter)
- ((GridLifecycleAwareMessageFilter)userLsnr.predLsnr).close();
+ if (userLsnr.predLsnr instanceof PlatformMessageFilter)
+ ((PlatformMessageFilter)userLsnr.predLsnr).onClose();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java
deleted file mode 100644
index 2d33a65..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.communication;
-
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.lang.IgniteBiPredicate;
-
-/**
- * Special version of bi-predicate for messaging with initialize/close callbacks.
- */
-public interface GridLifecycleAwareMessageFilter<K, V> extends IgniteBiPredicate<K, V> {
- /**
- * Initializes the filter.
- *
- * @param ctx Kernal context.
- */
- public void initialize(GridKernalContext ctx);
-
- /**
- * Closes the filter.
- */
- public void close();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 7b8c759..ea01e52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -47,8 +47,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
-import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter;
-import org.apache.ignite.internal.processors.platform.PlatformLocalEventListener;
+import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
@@ -681,8 +680,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
{
IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();
- if (p instanceof PlatformLocalEventListener)
- ((PlatformLocalEventListener)p).close();
+ if (p instanceof PlatformEventFilterListener)
+ ((PlatformEventFilterListener)p).onClose();
}
return found;
@@ -784,19 +783,20 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @param p Grid event predicate.
* @return Collection of grid events.
*/
+ @SuppressWarnings("unchecked")
public <T extends Event> Collection<T> localEvents(IgnitePredicate<T> p) {
assert p != null;
- if (p instanceof PlatformAwareEventFilter) {
- PlatformAwareEventFilter p0 = (PlatformAwareEventFilter)p;
+ if (p instanceof PlatformEventFilterListener) {
+ PlatformEventFilterListener p0 = (PlatformEventFilterListener)p;
p0.initialize(ctx);
try {
- return getSpi().localEvents(p0);
+ return (Collection<T>)getSpi().localEvents(p0);
}
finally {
- p0.close();
+ p0.onClose();
}
}
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
deleted file mode 100644
index a423578..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform;
-
-import java.util.UUID;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgnitePredicate;
-
-/**
- * Special version of predicate for events with initialize/close callbacks.
- */
-public interface PlatformAwareEventFilter<E extends Event> extends IgnitePredicate<E>, IgniteBiPredicate<UUID, E> {
- /**
- * Initializes the filter.
- */
- public void initialize(GridKernalContext ctx);
-
- /**
- * Closes the filter.
- */
- public void close();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index cea8326..1febf07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -18,13 +18,10 @@
package org.apache.ignite.internal.processors.platform;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventAdapter;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter;
import org.apache.ignite.internal.portable.PortableRawReaderEx;
import org.apache.ignite.internal.portable.PortableRawWriterEx;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilterEx;
@@ -39,10 +36,10 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.jetbrains.annotations.Nullable;
import java.util.Collection;
-import java.util.UUID;
/**
* Platform context. Acts as an entry point for platform operations.
@@ -178,7 +175,7 @@ public interface PlatformContext {
* @param ptr Pointer of deployed native filter.
* @return Filter.
*/
- public GridLifecycleAwareMessageFilter<UUID, Object> createRemoteMessageFilter(Object filter, long ptr);
+ public PlatformMessageFilter createRemoteMessageFilter(Object filter, long ptr);
/**
* Check whether the given event type is supported.
@@ -192,9 +189,9 @@ public interface PlatformContext {
* Write event.
*
* @param writer Writer.
- * @param event Event.
+ * @param evt Event.
*/
- public void writeEvent(PortableRawWriterEx writer, EventAdapter event);
+ public void writeEvent(PortableRawWriterEx writer, Event evt);
/**
* Create local event filter.
@@ -202,7 +199,7 @@ public interface PlatformContext {
* @param hnd Native handle.
* @return Filter.
*/
- public <E extends Event> PlatformAwareEventFilter<E> createLocalEventFilter(long hnd);
+ public PlatformEventFilterListener createLocalEventFilter(long hnd);
/**
* Create remote event filter.
@@ -211,7 +208,7 @@ public interface PlatformContext {
* @param types Event types.
* @return Filter.
*/
- public <E extends Event> PlatformAwareEventFilter<E> createRemoteEventFilter(Object pred, final int... types);
+ public PlatformEventFilterListener createRemoteEventFilter(Object pred, final int... types);
/**
* Create native exception.
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java
new file mode 100644
index 0000000..77f0ac8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import java.util.UUID;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ * Platform event filter and listener.
+ */
+public interface PlatformEventFilterListener extends IgnitePredicate<Event>, IgniteBiPredicate<UUID, Event> {
+ /**
+ * Initializes the filter.
+ */
+ public void initialize(GridKernalContext ctx);
+
+ /**
+ * Callback invoked when filter is closed.
+ */
+ public void onClose();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java
deleted file mode 100644
index f38d8e0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform;
-
-/**
- * Special version of listener for events with close callbacks.
- */
-public interface PlatformLocalEventListener {
- /**
- * Closes the listener.
- */
- public void close();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java
new file mode 100644
index 0000000..e5cec0a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.message;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+import java.util.UUID;
+
+/**
+ * Platform message filter.
+ */
+public interface PlatformMessageFilter extends IgniteBiPredicate<UUID, Object> {
+ /**
+ * Initializes the filter.
+ *
+ * @param ctx Kernal context.
+ */
+ public void initialize(GridKernalContext ctx);
+
+ /**
+ * Closes the filter.
+ */
+ public void onClose();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
deleted file mode 100644
index 32daa1c..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.events;
-
-import java.util.UUID;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventAdapter;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.PlatformLocalEventListener;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-
-/**
- * Platform event filter. Delegates apply to native platform.
- */
-public class PlatformEventFilter<E extends Event> implements PlatformAwareEventFilter<E>, PlatformLocalEventListener
-{
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final Object pred;
-
- /** Event types. */
- private final int[] types;
-
- /** */
- protected transient long hnd;
-
- /** */
- private transient PlatformContext ctx;
-
- /**
- * Constructor.
- *
- * @param hnd Handle in the native platform.
- * @param ctx Context.
- */
- public PlatformEventFilter(long hnd, PlatformContext ctx) {
- assert ctx != null;
- assert hnd != 0;
-
- this.hnd = hnd;
- this.ctx = ctx;
-
- pred = null;
- types = null;
- }
-
- /**
- * Constructor.
- *
- * @param pred .Net portable predicate.
- */
- public PlatformEventFilter(Object pred, final int... types) {
- assert pred != null;
-
- this.pred = pred;
- this.types = types;
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(E evt) {
- return apply0(null, evt);
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(UUID uuid, E evt) {
- return apply0(uuid, evt);
- }
-
- /**
- * Apply impl.
- * @param uuid Node if.
- * @param evt Event.
- * @return Result.
- */
- private boolean apply0(final UUID uuid, final E evt) {
- if (!ctx.isEventTypeSupported(evt.type()))
- return false;
-
- if (types != null) {
- boolean match = false;
-
- for (int type : types) {
- if (type == evt.type()) {
- match = true;
- break;
- }
- }
-
- if (!match)
- return false;
- }
-
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- ctx.writeEvent(writer, (EventAdapter)evt);
-
- writer.writeUuid(uuid);
-
- out.synchronize();
-
- int res = ctx.gateway().eventFilterApply(hnd, mem.pointer());
-
- return res != 0;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- ctx.gateway().eventFilterDestroy(hnd);
- }
-
- /** {@inheritDoc} */
- @Override public void initialize(GridKernalContext gridCtx) {
- ctx = PlatformUtils.platformContext(gridCtx.grid());
-
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writer.writeObjectDetached(pred);
-
- out.synchronize();
-
- hnd = ctx.gateway().eventFilterCreate(mem.pointer());
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- return this == o || o != null && o instanceof PlatformEventFilter &&
- hnd == ((PlatformEventFilter)o).hnd;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return (int)(hnd ^ (hnd >>> 32));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java
new file mode 100644
index 0000000..b2dfd1c
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.events;
+
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+
+import java.util.UUID;
+
+/**
+ * Platform event filter. Delegates apply to native platform.
+ */
+public class PlatformEventFilterListenerImpl implements PlatformEventFilterListener
+{
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final Object pred;
+
+ /** Event types. */
+ private final int[] types;
+
+ /** */
+ protected transient long hnd;
+
+ /** */
+ private transient PlatformContext ctx;
+
+ /**
+ * Constructor.
+ *
+ * @param hnd Handle in the native platform.
+ * @param ctx Context.
+ */
+ public PlatformEventFilterListenerImpl(long hnd, PlatformContext ctx) {
+ assert ctx != null;
+ assert hnd != 0;
+
+ this.hnd = hnd;
+ this.ctx = ctx;
+
+ pred = null;
+ types = null;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param pred .Net portable predicate.
+ */
+ public PlatformEventFilterListenerImpl(Object pred, final int... types) {
+ assert pred != null;
+
+ this.pred = pred;
+ this.types = types;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Event evt) {
+ return apply0(null, evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Event evt) {
+ return apply0(uuid, evt);
+ }
+
+ /**
+ * Apply impl.
+ * @param uuid Node if.
+ * @param evt Event.
+ * @return Result.
+ */
+ private boolean apply0(final UUID uuid, final Event evt) {
+ if (!ctx.isEventTypeSupported(evt.type()))
+ return false;
+
+ if (types != null) {
+ boolean match = false;
+
+ for (int type : types) {
+ if (type == evt.type()) {
+ match = true;
+ break;
+ }
+ }
+
+ if (!match)
+ return false;
+ }
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ ctx.writeEvent(writer, evt);
+
+ writer.writeUuid(uuid);
+
+ out.synchronize();
+
+ int res = ctx.gateway().eventFilterApply(hnd, mem.pointer());
+
+ return res != 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClose() {
+ ctx.gateway().eventFilterDestroy(hnd);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(GridKernalContext gridCtx) {
+ ctx = PlatformUtils.platformContext(gridCtx.grid());
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObjectDetached(pred);
+
+ out.synchronize();
+
+ hnd = ctx.gateway().eventFilterCreate(mem.pointer());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return this == o || o != null && o instanceof PlatformEventFilterListenerImpl &&
+ hnd == ((PlatformEventFilterListenerImpl)o).hnd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (int)(hnd ^ (hnd >>> 32));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
index fde6be5..997c019 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -22,11 +22,12 @@ import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventAdapter;
import org.apache.ignite.internal.portable.PortableRawReaderEx;
import org.apache.ignite.internal.portable.PortableRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
-import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter;
+import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.util.typedef.F;
@@ -205,14 +206,14 @@ public class PlatformEvents extends PlatformAbstractTarget {
boolean hasLocFilter = reader.readBoolean();
- PlatformAwareEventFilter locFilter = hasLocFilter ? localFilter(reader.readLong()) : null;
+ PlatformEventFilterListener locFilter = hasLocFilter ? localFilter(reader.readLong()) : null;
boolean hasRmtFilter = reader.readBoolean();
UUID listenId;
if (hasRmtFilter) {
- PlatformAwareEventFilter rmtFilter = platformCtx.createRemoteEventFilter(
+ PlatformEventFilterListener rmtFilter = platformCtx.createRemoteEventFilter(
reader.readObjectDetached(), readEventTypes(reader));
listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, rmtFilter);
@@ -233,16 +234,16 @@ public class PlatformEvents extends PlatformAbstractTarget {
int[] types = readEventTypes(reader);
- PlatformAwareEventFilter filter = platformCtx.createRemoteEventFilter(pred, types);
+ PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types);
- Collection<EventAdapter> result = events.remoteQuery(filter, timeout);
+ Collection<Event> result = events.remoteQuery(filter, timeout);
if (result == null)
writer.writeInt(-1);
else {
writer.writeInt(result.size());
- for (EventAdapter e : result)
+ for (Event e : result)
platformCtx.writeEvent(writer, e);
}
@@ -325,7 +326,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
* @param hnd Handle.
* @return Interop filter.
*/
- private PlatformAwareEventFilter localFilter(long hnd) {
+ private PlatformEventFilterListener localFilter(long hnd) {
return platformCtx.createLocalEventFilter(hnd);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
deleted file mode 100644
index 4237665..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.messaging;
-
-import java.util.UUID;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-
-/**
- * Interop filter. Delegates apply to native platform.
- */
-public class PlatformMessageFilter extends PlatformAbstractPredicate
- implements GridLifecycleAwareMessageFilter<UUID, Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Constructor.
- */
- public PlatformMessageFilter()
- {
- super();
- }
-
- /**
- * Constructor.
- *
- * @param pred .Net portable predicate.
- * @param ptr Pointer to predicate in the native platform.
- * @param ctx Kernal context.
- */
- protected PlatformMessageFilter(Object pred, long ptr, PlatformContext ctx) {
- super(pred, ptr, ctx);
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(UUID uuid, Object m) {
- if (ptr == 0)
- return false; // Destroyed.
-
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writer.writeObject(uuid);
- writer.writeObject(m);
-
- out.synchronize();
-
- return ctx.gateway().messagingFilterApply(ptr, mem.pointer()) != 0;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void initialize(GridKernalContext kernalCtx) {
- if (ptr != 0)
- return;
-
- ctx = PlatformUtils.platformContext(kernalCtx.grid());
-
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writer.writeObject(pred);
-
- out.synchronize();
-
- ptr = ctx.gateway().messagingFilterCreate(mem.pointer());
- }
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- if (ptr == 0) // Already destroyed or not initialized yet.
- return;
-
- try {
- assert ctx != null;
-
- ctx.gateway().messagingFilterDestroy(ptr);
- }
- finally {
- ptr = 0;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java
new file mode 100644
index 0000000..1e42914
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.messaging;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+
+import java.util.UUID;
+
+/**
+ * Platform message filter. Delegates apply to native platform.
+ */
+public class PlatformMessageFilterImpl extends PlatformAbstractPredicate implements PlatformMessageFilter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructor.
+ */
+ public PlatformMessageFilterImpl()
+ {
+ super();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param pred .Net portable predicate.
+ * @param ptr Pointer to predicate in the native platform.
+ * @param ctx Kernal context.
+ */
+ protected PlatformMessageFilterImpl(Object pred, long ptr, PlatformContext ctx) {
+ super(pred, ptr, ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Object m) {
+ if (ptr == 0)
+ return false; // Destroyed.
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(uuid);
+ writer.writeObject(m);
+
+ out.synchronize();
+
+ return ctx.gateway().messagingFilterApply(ptr, mem.pointer()) != 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(GridKernalContext kernalCtx) {
+ if (ptr != 0)
+ return;
+
+ ctx = PlatformUtils.platformContext(kernalCtx.grid());
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(pred);
+
+ out.synchronize();
+
+ ptr = ctx.gateway().messagingFilterCreate(mem.pointer());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClose() {
+ if (ptr == 0) // Already destroyed or not initialized yet.
+ return;
+
+ try {
+ assert ctx != null;
+
+ ctx.gateway().messagingFilterDestroy(ptr);
+ }
+ finally {
+ ptr = 0;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
index 8a27508..50643e1 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
@@ -17,18 +17,19 @@
package org.apache.ignite.internal.processors.platform.messaging;
-import java.util.UUID;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter;
import org.apache.ignite.internal.portable.PortableRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
+
+import java.util.UUID;
/**
* Interop local filter. Delegates apply to native platform, uses id to identify native target.
*/
-public class PlatformMessageLocalFilter implements GridLifecycleAwareMessageFilter<UUID, Object> {
+public class PlatformMessageLocalFilter implements PlatformMessageFilter {
/** */
private static final long serialVersionUID = 0L;
@@ -71,7 +72,7 @@ public class PlatformMessageLocalFilter implements GridLifecycleAwareMessageFilt
}
/** {@inheritDoc} */
- @Override public void close() {
+ @Override public void onClose() {
platformCtx.gateway().messagingFilterDestroy(hnd);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
index 968edd5..6dfd570 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -17,17 +17,18 @@
package org.apache.ignite.internal.processors.platform.messaging;
-import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteMessaging;
-import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter;
import org.apache.ignite.internal.portable.PortableRawReaderEx;
import org.apache.ignite.internal.portable.PortableRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.lang.IgniteFuture;
+import java.util.UUID;
+
/**
* Interop messaging.
*/
@@ -144,7 +145,7 @@ public class PlatformMessaging extends PlatformAbstractTarget {
Object topic = reader.readObjectDetached();
- GridLifecycleAwareMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
+ PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
UUID listenId = messaging.remoteListen(topic, filter);