You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/28 08:13:13 UTC
ignite git commit: IGNITE-1312: Moving continuous queries.
Repository: ignite
Updated Branches:
refs/heads/ignite-1312 [created] 07a94f1e3
IGNITE-1312: Moving continuous queries.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07a94f1e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07a94f1e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07a94f1e
Branch: refs/heads/ignite-1312
Commit: 07a94f1e3ae971ba9bf7eb5851a30c000c751fa0
Parents: 3d46b62
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 09:12:23 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 09:12:23 2015 +0300
----------------------------------------------------------------------
.../internal/portable/PortableContext.java | 1 -
.../processors/platform/PlatformContext.java | 19 ++
.../processors/platform/PlatformTarget.java | 76 +++++++
.../cache/query/PlatformContinuousQuery.java | 58 +++++
.../processors/platform/PlatformTarget.java | 76 -------
.../query/PlatformContinuousQueryImpl.java | 222 +++++++++++++++++++
.../PlatformContinuousQueryRemoteFilter.java | 183 +++++++++++++++
7 files changed, 558 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/07a94f1e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index 723113e..a3cf686 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -206,7 +206,6 @@ public class PortableContext implements Externalizable {
registerPredefinedType(T2.class, 62);
registerPredefinedType(PortableObjectImpl.class, 63);
-
registerPredefinedType(PortableMetaDataImpl.class, 64);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/07a94f1e/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 504f79e..461fb84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.platform;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.platform.cache.query.*;
import org.apache.ignite.internal.processors.platform.callback.*;
import org.apache.ignite.internal.processors.platform.memory.*;
import org.jetbrains.annotations.*;
@@ -135,4 +137,21 @@ public interface PlatformContext {
* @param metrics Metrics.
*/
public void writeClusterMetrics(PortableRawWriterEx writer, @Nullable ClusterMetrics metrics);
+
+ /**
+ *
+ * @param ptr Pointer to continuous query deployed on the platform.
+ * @param hasFilter Whether filter exists.
+ * @param filter Filter.
+ * @return Platform continuous query.
+ */
+ public PlatformContinuousQuery createContinuousQuery(long ptr, boolean hasFilter, @Nullable Object filter);
+
+ /**
+ * Create continuous query filter to be deployed on remote node.
+ *
+ * @param filter Native filter.
+ * @return Filter.
+ */
+ public CacheContinuousQueryFilterEx createContinuousQueryFilter(Object filter);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/07a94f1e/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
new file mode 100644
index 0000000..1d54b4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Interop target abstraction.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public interface PlatformTarget {
+ /**
+ * Synchronous IN operation.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @return Value specific for the given operation otherwise.
+ * @throws Exception If failed.
+ */
+ public int inOp(int type, long memPtr) throws Exception;
+
+ /**
+ * Synchronous IN operation which returns managed object as result.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @return Managed result.
+ * @throws Exception If case of failure.
+ */
+ public Object inOpObject(int type, long memPtr) throws Exception;
+
+ /**
+ * Synchronous OUT operation.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @throws Exception In case of failure.
+ */
+ public void outOp(int type, long memPtr) throws Exception;
+
+ /**
+ * Synchronous IN-OUT operation.
+ *
+ * @param type Operation type.
+ * @param inMemPtr Input memory pointer.
+ * @param outMemPtr Output memory pointer.
+ * @throws Exception In case of failure.
+ */
+ public void inOutOp(int type, long inMemPtr, long outMemPtr) throws Exception;
+
+ /**
+ * Synchronous IN-OUT operation with optional argument.
+ *
+ * @param type Operation type.
+ * @param inMemPtr Input memory pointer.
+ * @param outMemPtr Output memory pointer.
+ * @param arg Argument (optional).
+ * @throws Exception In case of failure.
+ */
+ public void inOutOp(int type, long inMemPtr, long outMemPtr, @Nullable Object arg) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/07a94f1e/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java
new file mode 100644
index 0000000..0b55aea
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.platform.*;
+
+import javax.cache.event.*;
+
+/**
+ * Platform continuous query.
+ */
+public interface PlatformContinuousQuery extends CacheEntryUpdatedListener, CacheContinuousQueryFilterEx {
+ /**
+ * Start continuous query execution.
+ *
+ * @param cache Cache.
+ * @param loc Local flag.
+ * @param bufSize Buffer size.
+ * @param timeInterval Time interval.
+ * @param autoUnsubscribe Auto-unsubscribe flag.
+ * @param initialQry Initial query.
+ * @throws org.apache.ignite.IgniteCheckedException If failed.
+ */
+ public void start(IgniteCacheProxy cache, boolean loc, int bufSize, long timeInterval, boolean autoUnsubscribe,
+ Query initialQry) throws IgniteCheckedException;
+
+ /**
+ * Close continuous query.
+ */
+ public void close();
+
+ /**
+ * Gets initial query cursor (if any).
+ *
+ * @return Initial query cursor.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public PlatformTarget getInitialQueryCursor();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/07a94f1e/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
deleted file mode 100644
index 1d54b4e..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform;
-
-import org.jetbrains.annotations.*;
-
-/**
- * Interop target abstraction.
- */
-@SuppressWarnings("UnusedDeclaration")
-public interface PlatformTarget {
- /**
- * Synchronous IN operation.
- *
- * @param type Operation type.
- * @param memPtr Memory pointer.
- * @return Value specific for the given operation otherwise.
- * @throws Exception If failed.
- */
- public int inOp(int type, long memPtr) throws Exception;
-
- /**
- * Synchronous IN operation which returns managed object as result.
- *
- * @param type Operation type.
- * @param memPtr Memory pointer.
- * @return Managed result.
- * @throws Exception If case of failure.
- */
- public Object inOpObject(int type, long memPtr) throws Exception;
-
- /**
- * Synchronous OUT operation.
- *
- * @param type Operation type.
- * @param memPtr Memory pointer.
- * @throws Exception In case of failure.
- */
- public void outOp(int type, long memPtr) throws Exception;
-
- /**
- * Synchronous IN-OUT operation.
- *
- * @param type Operation type.
- * @param inMemPtr Input memory pointer.
- * @param outMemPtr Output memory pointer.
- * @throws Exception In case of failure.
- */
- public void inOutOp(int type, long inMemPtr, long outMemPtr) throws Exception;
-
- /**
- * Synchronous IN-OUT operation with optional argument.
- *
- * @param type Operation type.
- * @param inMemPtr Input memory pointer.
- * @param outMemPtr Output memory pointer.
- * @param arg Argument (optional).
- * @throws Exception In case of failure.
- */
- public void inOutOp(int type, long inMemPtr, long outMemPtr, @Nullable Object arg) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/07a94f1e/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
new file mode 100644
index 0000000..b2fa1e3
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
@@ -0,0 +1,222 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.processors.query.*;
+
+import javax.cache.*;
+import javax.cache.event.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Interop continuous query handle.
+ */
+public class PlatformContinuousQueryImpl implements PlatformContinuousQuery {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Context. */
+ protected final PlatformContext platformCtx;
+
+ /** Whether filter exists. */
+ private final boolean hasFilter;
+
+ /** Native filter in serialized form. If null, then filter is either not set, or this is local query. */
+ protected final Object filter;
+
+ /** Pointer to native counterpart; zero if closed. */
+ private long ptr;
+
+ /** Cursor to handle filter close. */
+ private QueryCursor cursor;
+
+ /** Lock for concurrency control. */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /** Wrapped initial qry cursor. */
+ private PlatformQueryCursor initialQryCur;
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ * @param ptr Pointer to native counterpart.
+ * @param hasFilter Whether filter exists.
+ * @param filter Filter.
+ */
+ public PlatformContinuousQueryImpl(PlatformContext platformCtx, long ptr, boolean hasFilter, Object filter) {
+ assert ptr != 0L;
+
+ this.platformCtx = platformCtx;
+ this.ptr = ptr;
+ this.hasFilter = hasFilter;
+ this.filter = filter;
+ }
+
+ /**
+ * Start query execution.
+ *
+ * @param cache Cache.
+ * @param loc Local flag.
+ * @param bufSize Buffer size.
+ * @param timeInterval Time interval.
+ * @param autoUnsubscribe Auto-unsubscribe flag.
+ * @param initialQry Initial query.
+ */
+ @SuppressWarnings("unchecked")
+ public void start(IgniteCacheProxy cache, boolean loc, int bufSize, long timeInterval, boolean autoUnsubscribe,
+ Query initialQry) throws IgniteCheckedException {
+ assert !loc || filter == null;
+
+ lock.writeLock().lock();
+
+ try {
+ try {
+ ContinuousQuery qry = new ContinuousQuery();
+
+ qry.setLocalListener(this);
+ qry.setRemoteFilter(this); // Filter must be set always for correct resource release.
+ qry.setPageSize(bufSize);
+ qry.setTimeInterval(timeInterval);
+ qry.setAutoUnsubscribe(autoUnsubscribe);
+ qry.setInitialQuery(initialQry);
+
+ cursor = cache.query(qry.setLocal(loc));
+
+ if (initialQry != null)
+ initialQryCur = new PlatformQueryCursor(platformCtx, new QueryCursorEx<Cache.Entry>() {
+ @Override public Iterator<Cache.Entry> iterator() {
+ return cursor.iterator();
+ }
+
+ @Override public List<Cache.Entry> getAll() {
+ return cursor.getAll();
+ }
+
+ @Override public void close() {
+ // No-op: do not close whole continuous query when initial query cursor closes.
+ }
+
+ @Override public void getAll(Consumer<Cache.Entry> clo) throws IgniteCheckedException {
+ for (Cache.Entry t : this)
+ clo.consume(t);
+ }
+
+ @Override public List<GridQueryFieldMetadata> fieldsMeta() {
+ return null;
+ }
+ }, initialQry.getPageSize() > 0 ? initialQry.getPageSize() : Query.DFLT_PAGE_SIZE);
+ }
+ catch (Exception e) {
+ try
+ {
+ close0();
+ }
+ catch (Exception ignored)
+ {
+ // Ignore
+ }
+
+ throw PlatformUtils.unwrapQueryException(e);
+ }
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void onUpdated(Iterable evts) throws CacheEntryListenerException {
+ lock.readLock().lock();
+
+ try {
+ if (ptr == 0)
+ throw new CacheEntryListenerException("Failed to notify listener because it has been closed.");
+
+ PlatformUtils.applyContinuousQueryEvents(platformCtx, ptr, evts);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
+ lock.readLock().lock();
+
+ try {
+ if (ptr == 0)
+ throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed.");
+
+ return !hasFilter || PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onQueryUnregister() {
+ close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ lock.writeLock().lock();
+
+ try {
+ close0();
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"UnusedDeclaration", "unchecked"})
+ @Override public PlatformTarget getInitialQueryCursor() {
+ return initialQryCur;
+ }
+
+ /**
+ * Internal close routine.
+ */
+ private void close0() {
+ if (ptr != 0) {
+ long ptr0 = ptr;
+
+ ptr = 0;
+
+ if (cursor != null)
+ cursor.close();
+
+ platformCtx.gateway().continuousQueryFilterRelease(ptr0);
+ }
+ }
+
+ /**
+ * Replacer for remote filter.
+ *
+ * @return Filter to be deployed on remote node.
+ * @throws ObjectStreamException If failed.
+ */
+ Object writeReplace() throws ObjectStreamException {
+ return filter == null ? null : platformCtx.createContinuousQueryFilter(filter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/07a94f1e/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
new file mode 100644
index 0000000..0f19218
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+
+import javax.cache.event.*;
+import java.io.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Continuous query filter deployed on remote nodes.
+ */
+public class PlatformContinuousQueryRemoteFilter implements CacheContinuousQueryFilterEx, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Lock for concurrency control. */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /** Native filter in serialized form. */
+ private Object filter;
+
+ /** Grid hosting the filter. */
+ @IgniteInstanceResource
+ private transient Ignite grid;
+
+ /** Native platform pointer. */
+ private transient volatile long ptr;
+
+ /** Close flag. Once set, none requests to native platform is possible. */
+ private transient boolean closed;
+
+ /**
+ * {@link java.io.Externalizable} support.
+ */
+ public PlatformContinuousQueryRemoteFilter() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param filter Serialized native filter.
+ */
+ public PlatformContinuousQueryRemoteFilter(Object filter) {
+ assert filter != null;
+
+ this.filter = filter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
+ long ptr0 = ptr;
+
+ if (ptr0 == 0)
+ deploy();
+
+ lock.readLock().lock();
+
+ try {
+ if (closed)
+ throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed.");
+
+ PlatformContext platformCtx = PlatformUtils.platformContext(grid);
+
+ return PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Deploy filter to native platform.
+ */
+ private void deploy() {
+ lock.writeLock().lock();
+
+ try {
+ // 1. Do not deploy if the filter has been closed concurrently.
+ if (closed)
+ throw new CacheEntryListenerException("Failed to deploy the filter because it has been closed.");
+
+ // 2. Deploy.
+ PlatformContext ctx = PlatformUtils.platformContext(grid);
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(filter);
+
+ out.synchronize();
+
+ ptr = ctx.gateway().continuousQueryFilterCreate(mem.pointer());
+ }
+ catch (Exception e) {
+ // 3. Close in case of failure.
+ close();
+
+ throw new CacheEntryListenerException("Failed to deploy the filter.", e);
+ }
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onQueryUnregister() {
+ lock.writeLock().lock();
+
+ try {
+ close();
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Close the filter.
+ */
+ private void close() {
+ if (!closed) {
+ try {
+ if (ptr != 0) {
+ try {
+ PlatformUtils.platformContext(grid).gateway().continuousQueryFilterRelease(ptr);
+ }
+ finally {
+ // Nullify the pointer in any case.
+ ptr = 0;
+ }
+ }
+ }
+ finally {
+ closed = true;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(filter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ filter = in.readObject();
+
+ assert filter != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PlatformContinuousQueryRemoteFilter.class, this);
+ }
+}