You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/09/02 04:00:44 UTC

[16/29] ignite git commit: Improving platform interfaces.

Improving platform interfaces.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/32579d45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/32579d45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/32579d45

Branch: refs/heads/ignite-843
Commit: 32579d450377f177fc6078c4d9686de88dd21220
Parents: 0e25f55
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 1 12:49:16 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 1 12:49:16 2015 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |  15 +-
 .../managers/communication/GridIoManager.java   |  50 +++---
 .../GridLifecycleAwareMessageFilter.java        |  38 -----
 .../eventstorage/GridEventStorageManager.java   |  16 +-
 .../platform/PlatformAwareEventFilter.java      |  39 -----
 .../processors/platform/PlatformContext.java    |  15 +-
 .../platform/PlatformEventFilterListener.java   |  39 +++++
 .../platform/PlatformLocalEventListener.java    |  28 ----
 .../platform/message/PlatformMessageFilter.java |  40 +++++
 .../platform/events/PlatformEventFilter.java    | 164 -------------------
 .../events/PlatformEventFilterListenerImpl.java | 163 ++++++++++++++++++
 .../platform/events/PlatformEvents.java         |  15 +-
 .../messaging/PlatformMessageFilter.java        | 110 -------------
 .../messaging/PlatformMessageFilterImpl.java    | 110 +++++++++++++
 .../messaging/PlatformMessageLocalFilter.java   |   9 +-
 .../platform/messaging/PlatformMessaging.java   |   7 +-
 16 files changed, 416 insertions(+), 442 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 93e01e5..599d301 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -38,8 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
-import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter;
-import org.apache.ignite.internal.processors.platform.PlatformLocalEventListener;
+import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.T3;
@@ -139,8 +138,8 @@ class GridEventConsumeHandler implements GridContinuousHandler {
         if (filter != null)
             ctx.resource().injectGeneric(filter);
 
-        if (filter instanceof PlatformAwareEventFilter)
-            ((PlatformAwareEventFilter)filter).initialize(ctx);
+        if (filter instanceof PlatformEventFilterListener)
+            ((PlatformEventFilterListener)filter).initialize(ctx);
 
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
@@ -260,16 +259,16 @@ class GridEventConsumeHandler implements GridContinuousHandler {
         RuntimeException err = null;
 
         try {
-            if (filter instanceof PlatformAwareEventFilter)
-                ((PlatformAwareEventFilter)filter).close();
+            if (filter instanceof PlatformEventFilterListener)
+                ((PlatformEventFilterListener)filter).onClose();
         }
         catch(RuntimeException ex) {
             err = ex;
         }
 
         try {
-            if (cb instanceof PlatformLocalEventListener)
-                ((PlatformLocalEventListener)cb).close();
+            if (cb instanceof PlatformEventFilterListener)
+                ((PlatformEventFilterListener)cb).onClose();
         }
         catch (RuntimeException ex) {
             if (err == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index aa73296..b8af8da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -17,26 +17,6 @@
 
 package org.apache.ignite.internal.managers.communication;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -54,6 +34,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
@@ -83,6 +64,27 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -1457,8 +1459,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) {
         if (p != null) {
             try {
-                if (p instanceof GridLifecycleAwareMessageFilter)
-                    ((GridLifecycleAwareMessageFilter)p).initialize(ctx);
+                if (p instanceof PlatformMessageFilter)
+                    ((PlatformMessageFilter)p).initialize(ctx);
                 else
                     ctx.resource().injectGeneric(p);
 
@@ -1795,8 +1797,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (lsnr instanceof GridUserMessageListener) {
             GridUserMessageListener userLsnr = (GridUserMessageListener)lsnr;
 
-            if (userLsnr.predLsnr instanceof GridLifecycleAwareMessageFilter)
-                ((GridLifecycleAwareMessageFilter)userLsnr.predLsnr).close();
+            if (userLsnr.predLsnr instanceof PlatformMessageFilter)
+                ((PlatformMessageFilter)userLsnr.predLsnr).onClose();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java
deleted file mode 100644
index 2d33a65..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridLifecycleAwareMessageFilter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.communication;
-
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.lang.IgniteBiPredicate;
-
-/**
- * Special version of bi-predicate for messaging with initialize/close callbacks.
- */
-public interface GridLifecycleAwareMessageFilter<K, V> extends IgniteBiPredicate<K, V> {
-    /**
-     * Initializes the filter.
-     *
-     * @param ctx Kernal context.
-     */
-    public void initialize(GridKernalContext ctx);
-
-    /**
-     * Closes the filter.
-     */
-    public void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 7b8c759..ea01e52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -47,8 +47,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
-import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter;
-import org.apache.ignite.internal.processors.platform.PlatformLocalEventListener;
+import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
 import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
@@ -681,8 +680,8 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
         {
             IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();
 
-            if (p instanceof PlatformLocalEventListener)
-                ((PlatformLocalEventListener)p).close();
+            if (p instanceof PlatformEventFilterListener)
+                ((PlatformEventFilterListener)p).onClose();
         }
 
         return found;
@@ -784,19 +783,20 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
      * @param p Grid event predicate.
      * @return Collection of grid events.
      */
+    @SuppressWarnings("unchecked")
     public <T extends Event> Collection<T> localEvents(IgnitePredicate<T> p) {
         assert p != null;
 
-        if (p instanceof PlatformAwareEventFilter) {
-            PlatformAwareEventFilter p0 = (PlatformAwareEventFilter)p;
+        if (p instanceof PlatformEventFilterListener) {
+            PlatformEventFilterListener p0 = (PlatformEventFilterListener)p;
 
             p0.initialize(ctx);
 
             try {
-                return getSpi().localEvents(p0);
+                return (Collection<T>)getSpi().localEvents(p0);
             }
             finally {
-                p0.close();
+                p0.onClose();
             }
         }
         else

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
deleted file mode 100644
index a423578..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAwareEventFilter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform;
-
-import java.util.UUID;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgnitePredicate;
-
-/**
- * Special version of predicate for events with initialize/close callbacks.
- */
-public interface PlatformAwareEventFilter<E extends Event> extends IgnitePredicate<E>, IgniteBiPredicate<UUID, E> {
-    /**
-     * Initializes the filter.
-     */
-    public void initialize(GridKernalContext ctx);
-
-    /**
-     * Closes the filter.
-     */
-    public void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index cea8326..1febf07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -18,13 +18,10 @@
 package org.apache.ignite.internal.processors.platform;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventAdapter;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter;
 import org.apache.ignite.internal.portable.PortableRawReaderEx;
 import org.apache.ignite.internal.portable.PortableRawWriterEx;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilterEx;
@@ -39,10 +36,10 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager;
 import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.jetbrains.annotations.Nullable;
 
 import java.util.Collection;
-import java.util.UUID;
 
 /**
  * Platform context. Acts as an entry point for platform operations.
@@ -178,7 +175,7 @@ public interface PlatformContext {
      * @param ptr Pointer of deployed native filter.
      * @return Filter.
      */
-    public GridLifecycleAwareMessageFilter<UUID, Object> createRemoteMessageFilter(Object filter, long ptr);
+    public PlatformMessageFilter createRemoteMessageFilter(Object filter, long ptr);
 
     /**
      * Check whether the given event type is supported.
@@ -192,9 +189,9 @@ public interface PlatformContext {
      * Write event.
      *
      * @param writer Writer.
-     * @param event Event.
+     * @param evt Event.
      */
-    public void writeEvent(PortableRawWriterEx writer, EventAdapter event);
+    public void writeEvent(PortableRawWriterEx writer, Event evt);
 
     /**
      * Create local event filter.
@@ -202,7 +199,7 @@ public interface PlatformContext {
      * @param hnd Native handle.
      * @return Filter.
      */
-    public <E extends Event> PlatformAwareEventFilter<E> createLocalEventFilter(long hnd);
+    public PlatformEventFilterListener createLocalEventFilter(long hnd);
 
     /**
      * Create remote event filter.
@@ -211,7 +208,7 @@ public interface PlatformContext {
      * @param types Event types.
      * @return Filter.
      */
-    public <E extends Event> PlatformAwareEventFilter<E> createRemoteEventFilter(Object pred, final int... types);
+    public PlatformEventFilterListener createRemoteEventFilter(Object pred, final int... types);
 
     /**
      * Create native exception.

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java
new file mode 100644
index 0000000..77f0ac8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformEventFilterListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import java.util.UUID;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ * Platform event filter and listener.
+ */
+public interface PlatformEventFilterListener extends IgnitePredicate<Event>, IgniteBiPredicate<UUID, Event> {
+    /**
+     * Initializes the filter.
+     */
+    public void initialize(GridKernalContext ctx);
+
+    /**
+     * Callback invoked when filter is closed.
+     */
+    public void onClose();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java
deleted file mode 100644
index f38d8e0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformLocalEventListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform;
-
-/**
- * Special version of listener for events with close callbacks.
- */
-public interface PlatformLocalEventListener {
-    /**
-     * Closes the listener.
-     */
-    public void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java
new file mode 100644
index 0000000..e5cec0a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/message/PlatformMessageFilter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.message;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+import java.util.UUID;
+
+/**
+ * Platform message filter.
+ */
+public interface PlatformMessageFilter extends IgniteBiPredicate<UUID, Object> {
+    /**
+     * Initializes the filter.
+     *
+     * @param ctx Kernal context.
+     */
+    public void initialize(GridKernalContext ctx);
+
+    /**
+     * Closes the filter.
+     */
+    public void onClose();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
deleted file mode 100644
index 32daa1c..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilter.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.events;
-
-import java.util.UUID;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventAdapter;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.PlatformLocalEventListener;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-
-/**
- * Platform event filter. Delegates apply to native platform.
- */
-public class PlatformEventFilter<E extends Event> implements PlatformAwareEventFilter<E>, PlatformLocalEventListener
-{
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private final Object pred;
-
-    /** Event types. */
-    private final int[] types;
-
-    /** */
-    protected transient long hnd;
-
-    /** */
-    private transient PlatformContext ctx;
-
-    /**
-     * Constructor.
-     *
-     * @param hnd Handle in the native platform.
-     * @param ctx Context.
-     */
-    public PlatformEventFilter(long hnd, PlatformContext ctx) {
-        assert ctx != null;
-        assert hnd != 0;
-
-        this.hnd = hnd;
-        this.ctx = ctx;
-
-        pred = null;
-        types = null;
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param pred .Net portable predicate.
-     */
-    public PlatformEventFilter(Object pred, final int... types) {
-        assert pred != null;
-
-        this.pred = pred;
-        this.types = types;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean apply(E evt) {
-        return apply0(null, evt);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean apply(UUID uuid, E evt) {
-        return apply0(uuid, evt);
-    }
-
-    /**
-     * Apply impl.
-     * @param uuid Node if.
-     * @param evt Event.
-     * @return Result.
-     */
-    private boolean apply0(final UUID uuid, final E evt) {
-        if (!ctx.isEventTypeSupported(evt.type()))
-            return false;
-
-        if (types != null) {
-            boolean match = false;
-
-            for (int type : types) {
-                if (type == evt.type()) {
-                    match = true;
-                    break;
-                }
-            }
-
-            if (!match)
-                return false;
-        }
-
-        try (PlatformMemory mem = ctx.memory().allocate()) {
-            PlatformOutputStream out = mem.output();
-
-            PortableRawWriterEx writer = ctx.writer(out);
-
-            ctx.writeEvent(writer, (EventAdapter)evt);
-
-            writer.writeUuid(uuid);
-
-            out.synchronize();
-
-            int res = ctx.gateway().eventFilterApply(hnd, mem.pointer());
-
-            return res != 0;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        ctx.gateway().eventFilterDestroy(hnd);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize(GridKernalContext gridCtx) {
-        ctx = PlatformUtils.platformContext(gridCtx.grid());
-
-        try (PlatformMemory mem = ctx.memory().allocate()) {
-            PlatformOutputStream out = mem.output();
-
-            PortableRawWriterEx writer = ctx.writer(out);
-
-            writer.writeObjectDetached(pred);
-
-            out.synchronize();
-
-            hnd = ctx.gateway().eventFilterCreate(mem.pointer());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        return this == o || o != null && o instanceof PlatformEventFilter &&
-            hnd == ((PlatformEventFilter)o).hnd;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return (int)(hnd ^ (hnd >>> 32));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java
new file mode 100644
index 0000000..b2dfd1c
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEventFilterListenerImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.events;
+
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+
+import java.util.UUID;
+
+/**
+ * Platform event filter. Delegates apply to native platform.
+ */
+public class PlatformEventFilterListenerImpl implements PlatformEventFilterListener
+{
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final Object pred;
+
+    /** Event types. */
+    private final int[] types;
+
+    /** */
+    protected transient long hnd;
+
+    /** */
+    private transient PlatformContext ctx;
+
+    /**
+     * Constructor.
+     *
+     * @param hnd Handle in the native platform.
+     * @param ctx Context.
+     */
+    public PlatformEventFilterListenerImpl(long hnd, PlatformContext ctx) {
+        assert ctx != null;
+        assert hnd != 0;
+
+        this.hnd = hnd;
+        this.ctx = ctx;
+
+        pred = null;
+        types = null;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param pred .Net portable predicate.
+     */
+    public PlatformEventFilterListenerImpl(Object pred, final int... types) {
+        assert pred != null;
+
+        this.pred = pred;
+        this.types = types;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(Event evt) {
+        return apply0(null, evt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(UUID uuid, Event evt) {
+        return apply0(uuid, evt);
+    }
+
+    /**
+     * Apply impl.
+     * @param uuid Node if.
+     * @param evt Event.
+     * @return Result.
+     */
+    private boolean apply0(final UUID uuid, final Event evt) {
+        if (!ctx.isEventTypeSupported(evt.type()))
+            return false;
+
+        if (types != null) {
+            boolean match = false;
+
+            for (int type : types) {
+                if (type == evt.type()) {
+                    match = true;
+                    break;
+                }
+            }
+
+            if (!match)
+                return false;
+        }
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            ctx.writeEvent(writer, evt);
+
+            writer.writeUuid(uuid);
+
+            out.synchronize();
+
+            int res = ctx.gateway().eventFilterApply(hnd, mem.pointer());
+
+            return res != 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClose() {
+        ctx.gateway().eventFilterDestroy(hnd);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(GridKernalContext gridCtx) {
+        ctx = PlatformUtils.platformContext(gridCtx.grid());
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObjectDetached(pred);
+
+            out.synchronize();
+
+            hnd = ctx.gateway().eventFilterCreate(mem.pointer());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        return this == o || o != null && o instanceof PlatformEventFilterListenerImpl &&
+            hnd == ((PlatformEventFilterListenerImpl)o).hnd;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return (int)(hnd ^ (hnd >>> 32));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
index fde6be5..997c019 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -22,11 +22,12 @@ import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventAdapter;
 import org.apache.ignite.internal.portable.PortableRawReaderEx;
 import org.apache.ignite.internal.portable.PortableRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
-import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter;
+import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
 import org.apache.ignite.internal.util.typedef.F;
@@ -205,14 +206,14 @@ public class PlatformEvents extends PlatformAbstractTarget {
 
                 boolean hasLocFilter = reader.readBoolean();
 
-                PlatformAwareEventFilter locFilter = hasLocFilter ? localFilter(reader.readLong()) : null;
+                PlatformEventFilterListener locFilter = hasLocFilter ? localFilter(reader.readLong()) : null;
 
                 boolean hasRmtFilter = reader.readBoolean();
 
                 UUID listenId;
 
                 if (hasRmtFilter) {
-                    PlatformAwareEventFilter rmtFilter = platformCtx.createRemoteEventFilter(
+                    PlatformEventFilterListener rmtFilter = platformCtx.createRemoteEventFilter(
                         reader.readObjectDetached(), readEventTypes(reader));
 
                     listenId = events.remoteListen(bufSize, interval, autoUnsubscribe, locFilter, rmtFilter);
@@ -233,16 +234,16 @@ public class PlatformEvents extends PlatformAbstractTarget {
 
                 int[] types = readEventTypes(reader);
 
-                PlatformAwareEventFilter filter = platformCtx.createRemoteEventFilter(pred, types);
+                PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types);
 
-                Collection<EventAdapter> result = events.remoteQuery(filter, timeout);
+                Collection<Event> result = events.remoteQuery(filter, timeout);
 
                 if (result == null)
                     writer.writeInt(-1);
                 else {
                     writer.writeInt(result.size());
 
-                    for (EventAdapter e : result)
+                    for (Event e : result)
                         platformCtx.writeEvent(writer, e);
                 }
 
@@ -325,7 +326,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
      * @param hnd Handle.
      * @return Interop filter.
      */
-    private PlatformAwareEventFilter localFilter(long hnd) {
+    private PlatformEventFilterListener localFilter(long hnd) {
         return platformCtx.createLocalEventFilter(hnd);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
deleted file mode 100644
index 4237665..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilter.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.messaging;
-
-import java.util.UUID;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-
-/**
- * Interop filter. Delegates apply to native platform.
- */
-public class PlatformMessageFilter extends PlatformAbstractPredicate
-    implements GridLifecycleAwareMessageFilter<UUID, Object> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * Constructor.
-     */
-    public PlatformMessageFilter()
-    {
-        super();
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param pred .Net portable predicate.
-     * @param ptr Pointer to predicate in the native platform.
-     * @param ctx Kernal context.
-     */
-    protected PlatformMessageFilter(Object pred, long ptr, PlatformContext ctx) {
-        super(pred, ptr, ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean apply(UUID uuid, Object m) {
-        if (ptr == 0)
-            return false;  // Destroyed.
-
-        try (PlatformMemory mem = ctx.memory().allocate()) {
-            PlatformOutputStream out = mem.output();
-
-            PortableRawWriterEx writer = ctx.writer(out);
-
-            writer.writeObject(uuid);
-            writer.writeObject(m);
-
-            out.synchronize();
-
-            return ctx.gateway().messagingFilterApply(ptr, mem.pointer()) != 0;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize(GridKernalContext kernalCtx) {
-        if (ptr != 0)
-            return;
-
-        ctx = PlatformUtils.platformContext(kernalCtx.grid());
-
-        try (PlatformMemory mem = ctx.memory().allocate()) {
-            PlatformOutputStream out = mem.output();
-
-            PortableRawWriterEx writer = ctx.writer(out);
-
-            writer.writeObject(pred);
-
-            out.synchronize();
-
-            ptr = ctx.gateway().messagingFilterCreate(mem.pointer());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        if (ptr == 0) // Already destroyed or not initialized yet.
-            return;
-
-        try {
-            assert ctx != null;
-
-            ctx.gateway().messagingFilterDestroy(ptr);
-        }
-        finally {
-            ptr = 0;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java
new file mode 100644
index 0000000..1e42914
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.messaging;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+
+import java.util.UUID;
+
+/**
+ * Platform message filter. Delegates apply to native platform.
+ */
+public class PlatformMessageFilterImpl extends PlatformAbstractPredicate implements PlatformMessageFilter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Constructor.
+     */
+    public PlatformMessageFilterImpl()
+    {
+        super();
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param pred .Net portable predicate.
+     * @param ptr Pointer to predicate in the native platform.
+     * @param ctx Kernal context.
+     */
+    protected PlatformMessageFilterImpl(Object pred, long ptr, PlatformContext ctx) {
+        super(pred, ptr, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(UUID uuid, Object m) {
+        if (ptr == 0)
+            return false;  // Destroyed.
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(uuid);
+            writer.writeObject(m);
+
+            out.synchronize();
+
+            return ctx.gateway().messagingFilterApply(ptr, mem.pointer()) != 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(GridKernalContext kernalCtx) {
+        if (ptr != 0)
+            return;
+
+        ctx = PlatformUtils.platformContext(kernalCtx.grid());
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(pred);
+
+            out.synchronize();
+
+            ptr = ctx.gateway().messagingFilterCreate(mem.pointer());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClose() {
+        if (ptr == 0) // Already destroyed or not initialized yet.
+            return;
+
+        try {
+            assert ctx != null;
+
+            ctx.gateway().messagingFilterDestroy(ptr);
+        }
+        finally {
+            ptr = 0;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
index 8a27508..50643e1 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
@@ -17,18 +17,19 @@
 
 package org.apache.ignite.internal.processors.platform.messaging;
 
-import java.util.UUID;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter;
 import org.apache.ignite.internal.portable.PortableRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
+
+import java.util.UUID;
 
 /**
  * Interop local filter. Delegates apply to native platform, uses id to identify native target.
  */
-public class PlatformMessageLocalFilter implements GridLifecycleAwareMessageFilter<UUID, Object> {
+public class PlatformMessageLocalFilter implements PlatformMessageFilter {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -71,7 +72,7 @@ public class PlatformMessageLocalFilter implements GridLifecycleAwareMessageFilt
     }
 
     /** {@inheritDoc} */
-    @Override public void close() {
+    @Override public void onClose() {
         platformCtx.gateway().messagingFilterDestroy(hnd);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/32579d45/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
index 968edd5..6dfd570 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -17,17 +17,18 @@
 
 package org.apache.ignite.internal.processors.platform.messaging;
 
-import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteMessaging;
-import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter;
 import org.apache.ignite.internal.portable.PortableRawReaderEx;
 import org.apache.ignite.internal.portable.PortableRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.apache.ignite.lang.IgniteFuture;
 
+import java.util.UUID;
+
 /**
  * Interop messaging.
  */
@@ -144,7 +145,7 @@ public class PlatformMessaging extends PlatformAbstractTarget {
 
                 Object topic = reader.readObjectDetached();
 
-                GridLifecycleAwareMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
+                PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
 
                 UUID listenId = messaging.remoteListen(topic, filter);