You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/04/01 08:26:10 UTC

[GitHub] [ignite] alex-plekhanov opened a new pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

alex-plekhanov opened a new pull request #8960:
URL: https://github.com/apache/ignite/pull/8960


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606993408



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       >  it should be done by another ticket
   
   Ok, let's do this first, and then get back to the current ticket?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r607183406



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       > If we are going to introduce predefined ids (like 0 or -1) for general-purpose events - that will work fine as well.
   
   There is still a case with resource IDs initiated from the server, but ok, we don't have it now, if we will need it we can change implementation.
   I've tried to change notification listeners and listen to resource ID, but unfortunately, there is one critical problem: we can process notification before the notification listener is registered. For compute it's a blocker since notification can be lost and the future will never complete. This happens because the earliest place when we can register listener - it's payload reader, payload reader executed in user's thread or in `asyncContinuationExecutor` thread. Concurrently the new message (notification) from the channel can be read by NIO thread and executed by `asyncContinuationExecutor` without notifying listeners.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r609315903



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
##########
@@ -747,6 +775,92 @@ else if (qry instanceof SqlFieldsQuery)
         ));
     }
 
+    /** {@inheritDoc} */
+    @Override public <R> QueryCursor<R> query(ContinuousQuery<K, V> qry, ClientDisconnectListener disconnectLsnr) {
+        A.ensure(!(qry.getInitialQuery() instanceof ContinuousQuery), "Initial query for continuous query " +
+            "can't be an instance of another continuous query");
+        A.notNull(qry.getLocalListener(), "Local listener");
+        A.ensure(!qry.isLocal(), "Unsupported Local flag value");

Review comment:
       What about a message like `Local query is not supported by thin client`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
##########
@@ -747,6 +775,92 @@ else if (qry instanceof SqlFieldsQuery)
         ));
     }
 
+    /** {@inheritDoc} */
+    @Override public <R> QueryCursor<R> query(ContinuousQuery<K, V> qry, ClientDisconnectListener disconnectLsnr) {
+        A.ensure(!(qry.getInitialQuery() instanceof ContinuousQuery), "Initial query for continuous query " +
+            "can't be an instance of another continuous query");
+        A.notNull(qry.getLocalListener(), "Local listener");
+        A.ensure(!qry.isLocal(), "Unsupported Local flag value");
+        A.ensure(qry.isAutoUnsubscribe(), "Unsupported AutoUnsubscribe flag value");

Review comment:
       Similarly, `AutoUnsubscribe flag is not supported by thin client`.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
##########
@@ -747,6 +775,92 @@ else if (qry instanceof SqlFieldsQuery)
         ));
     }
 
+    /** {@inheritDoc} */
+    @Override public <R> QueryCursor<R> query(ContinuousQuery<K, V> qry, ClientDisconnectListener disconnectLsnr) {
+        A.ensure(!(qry.getInitialQuery() instanceof ContinuousQuery), "Initial query for continuous query " +
+            "can't be an instance of another continuous query");
+        A.notNull(qry.getLocalListener(), "Local listener");
+        A.ensure(!qry.isLocal(), "Unsupported Local flag value");
+        A.ensure(qry.isAutoUnsubscribe(), "Unsupported AutoUnsubscribe flag value");
+        A.ensure(qry.getRemoteFilterFactory() == null || qry.getRemoteFilter() == null,
+            "Should be used either RemoterFilter or RemoteFilterFactory.");

Review comment:
       `RemoteFilter and RemoteFilterFactory can't be used together`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
##########
@@ -947,4 +1061,73 @@ private void writeEntries(Map<? extends K, ? extends V> map, PayloadOutputChanne
                     serDes.writeObject(out, e.getValue());
                 });
     }
+
+    /**
+     * Adapter to convert CQ listener calls to JCache listener calls.
+     */
+    private static class JCacheEntryListenerAdapter<K, V> implements CacheEntryUpdatedListener<K, V> {

Review comment:
       The file is too big already, can we move those classes to separate files?

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java
##########
@@ -0,0 +1,708 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.cache.Cache;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientDisconnectListener;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.Person;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Thin client cache entry listeners test.
+ */
+public class CacheEntryListenersTest extends AbstractThinClientTest {

Review comment:
       Let's add a test that combines one or more continuous queries with compute calls: since we use a shared notification mechanism, it would be good to check this scenario.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r607007843



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       > As I understand, this limitation already exists - all messages have an ID in any case.
   There is no requirement for the uniqueness of this ID. We can use 0 for some events (for example, we have topology change event implemented as special flag of every response, but it can be implemented with notification and no additional resource ID required here). Also, perhaps in the future events initiated by server-side for some resource will be needed, in this case, client should be able to listen some type of notification with any resource ID. 
   > why have Collection<NotificationListener>, then separate maps in Compute and Continuous Query
   We can replace the first collection with `EnumMap` (Listener per ClientOperation), it is very lightweight and fast, and there will be no iterations. There will be at most one listener per one operation type, and each listener can implement its own strategy for resource ID mapping. This change does not require to refactor compute a lot, so, I think, it can be implemented in the current ticket. 
   Or we can even provide two API on client channel level: listener for client operation and any resource id (refactor compute to use this API) and listener for client operation and exactly one resource Id. WDYT?  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606944816



##########
File path: modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
##########
@@ -718,4 +723,27 @@
      * @return Cursor.
      */
     public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry);
+
+    /**
+     * Registers a {@link CacheEntryListener}. The supplied {@link CacheEntryListenerConfiguration} is used to
+     * instantiate a listener and apply it to those events specified in the configuration.
+     * <p>
+     * NOTE: There is no failover in case of client channel failure, this event should be handled on the user's side.
+     * Listeners will be notified about client disconnect event if they implement {@link ClientChannelDisconnectListener}
+     * interface.
+     *
+     * @param cacheEntryListenerConfiguration a factory and related configuration for creating the listener.
+     * @throws IllegalArgumentException is the same CacheEntryListenerConfiguration is used more than once or
+     *          if some unsupported by thin client properties are set.
+     * @see CacheEntryListener
+     */
+    public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration);

Review comment:
       But why not provide this ability to the user if it's relatively low cost and no changes needed on server-side? The user can use these methods in thick client, and having these methods in thin client makes migration to thin client easier.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r607007843



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       > As I understand, this limitation already exists - all messages have an ID in any case.
   
   There is no requirement for the uniqueness of this ID. We can use 0 for some events (for example, we have topology change event implemented as special flag of every response, but it can be implemented with notification and no additional resource ID required here). Also, perhaps in the future events initiated by server-side for some resource will be needed, in this case, client should be able to listen some type of notification with any resource ID. 
   
   > why have Collection<NotificationListener>, then separate maps in Compute and Continuous Query
   
   We can replace the first collection with `EnumMap` (Listener per ClientOperation), it is very lightweight and fast, and there will be no iterations. There will be at most one listener per one operation type, and each listener can implement its own strategy for resource ID mapping. This change does not require to refactor compute a lot, so, I think, it can be implemented in the current ticket. 
   Or we can even provide two API on client channel level: listener for client operation and any resource id (refactor compute to use this API) and listener for client operation and exactly one resource Id. WDYT?  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606993048



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       > new limitations: we can't use notifications without resource id in the future
   
   As I understand, this limitation already exists - all messages have an ID in any case.
   
   > find handles in this listener by resource id using some map
   
   We could do this, but again, why have `Collection<NotificationListener>`, then separate maps in Compute and Continuous Query (and, possibly, more places in future), when a single map will:
   * Simplify the implementation
   * Make it more efficient (single lookup, single collection) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] asfgit closed pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #8960:
URL: https://github.com/apache/ignite/pull/8960


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r607275997



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       The same is true for Continuous Query - you can get a notification before you know the query ID and register the listener (and you can have multiple active queries).
   This is another piece of logic to be reused - that's why we need a centralized notification handling mechanism.
   
   We can queue "unknown" notifications per resourceId and handle them as soon as listener is registered - we'll have to do it one way or another. For Compute the queue will have at most 1 item, but for CQ it can have any number of events.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606946561



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.client.ClientChannelDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements NotificationListener, AutoCloseable {
+    /** "Keep binary" flag mask. */
+    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+    /** */
+    private final Cache<K, V> jCacheAdapter;
+
+    /** */
+    private final ReliableChannel ch;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    private final ClientUtils utils;
+
+    /** */
+    private final Consumer<ClientChannel> chCloseLsnr = this::onChannelClosed;
+
+    /** */
+    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** */
+    private volatile ClientChannel clientCh;
+
+    /** */
+    private volatile long rsrcId;
+
+    /** */
+    ClientCacheEntryListenerHandler(
+        Cache<K, V> jCacheAdapter,
+        ReliableChannel ch,
+        ClientBinaryMarshaller marsh,
+        boolean keepBinary
+    ) {
+        this.jCacheAdapter = jCacheAdapter;
+        this.ch = ch;
+        this.keepBinary = keepBinary;
+        utils = new ClientUtils(marsh);
+    }
+
+    /**
+     * Send request to the server and start
+     */
+    public synchronized void startListen(
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> rmtFilterFactory,
+        int pageSize,
+        long timeInterval,
+        boolean includeExpired
+    ) {
+        if (clientCh != null)
+            throw new IllegalStateException("Listener was already started");
+
+        this.locLsnr = locLsnr;
+
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
+            out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+            out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+            out.writeInt(pageSize);
+            out.writeLong(timeInterval);
+            out.writeBoolean(includeExpired);
+
+            if (rmtFilterFactory == null)
+                out.writeByte(GridBinaryMarshaller.NULL);
+            else {
+                utils.writeObject(out, rmtFilterFactory);
+                out.writeByte((byte)1); // Java platform

Review comment:
       `ClientPlatform' it's a server-side class, perhaps it's better to introduce some client-side constant and refactor `scanQuery` as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606959517



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.client.ClientChannelDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements NotificationListener, AutoCloseable {
+    /** "Keep binary" flag mask. */
+    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+    /** */
+    private final Cache<K, V> jCacheAdapter;
+
+    /** */
+    private final ReliableChannel ch;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    private final ClientUtils utils;
+
+    /** */
+    private final Consumer<ClientChannel> chCloseLsnr = this::onChannelClosed;
+
+    /** */
+    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** */
+    private volatile ClientChannel clientCh;
+
+    /** */
+    private volatile long rsrcId;
+
+    /** */
+    ClientCacheEntryListenerHandler(
+        Cache<K, V> jCacheAdapter,
+        ReliableChannel ch,
+        ClientBinaryMarshaller marsh,
+        boolean keepBinary
+    ) {
+        this.jCacheAdapter = jCacheAdapter;
+        this.ch = ch;
+        this.keepBinary = keepBinary;
+        utils = new ClientUtils(marsh);
+    }
+
+    /**
+     * Send request to the server and start
+     */
+    public synchronized void startListen(
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> rmtFilterFactory,
+        int pageSize,
+        long timeInterval,
+        boolean includeExpired
+    ) {
+        if (clientCh != null)
+            throw new IllegalStateException("Listener was already started");
+
+        this.locLsnr = locLsnr;
+
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
+            out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+            out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+            out.writeInt(pageSize);
+            out.writeLong(timeInterval);
+            out.writeBoolean(includeExpired);
+
+            if (rmtFilterFactory == null)
+                out.writeByte(GridBinaryMarshaller.NULL);
+            else {
+                utils.writeObject(out, rmtFilterFactory);
+                out.writeByte((byte)1); // Java platform

Review comment:
       > introduce some client-side constant
   
   Sounds good to me. Let's avoid magic numbers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#issuecomment-815464933


   @alex-plekhanov looks good to me in general, please see a few comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r607895820



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       @ptupitsyn I've refactored notification listeners and partially move changes to the ticket 
   https://issues.apache.org/jira/browse/IGNITE-14492. When this ticket will be in the master branch I will rebase the current ticket.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606946561



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.client.ClientChannelDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements NotificationListener, AutoCloseable {
+    /** "Keep binary" flag mask. */
+    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+    /** */
+    private final Cache<K, V> jCacheAdapter;
+
+    /** */
+    private final ReliableChannel ch;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    private final ClientUtils utils;
+
+    /** */
+    private final Consumer<ClientChannel> chCloseLsnr = this::onChannelClosed;
+
+    /** */
+    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** */
+    private volatile ClientChannel clientCh;
+
+    /** */
+    private volatile long rsrcId;
+
+    /** */
+    ClientCacheEntryListenerHandler(
+        Cache<K, V> jCacheAdapter,
+        ReliableChannel ch,
+        ClientBinaryMarshaller marsh,
+        boolean keepBinary
+    ) {
+        this.jCacheAdapter = jCacheAdapter;
+        this.ch = ch;
+        this.keepBinary = keepBinary;
+        utils = new ClientUtils(marsh);
+    }
+
+    /**
+     * Send request to the server and start
+     */
+    public synchronized void startListen(
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> rmtFilterFactory,
+        int pageSize,
+        long timeInterval,
+        boolean includeExpired
+    ) {
+        if (clientCh != null)
+            throw new IllegalStateException("Listener was already started");
+
+        this.locLsnr = locLsnr;
+
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
+            out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+            out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+            out.writeInt(pageSize);
+            out.writeLong(timeInterval);
+            out.writeBoolean(includeExpired);
+
+            if (rmtFilterFactory == null)
+                out.writeByte(GridBinaryMarshaller.NULL);
+            else {
+                utils.writeObject(out, rmtFilterFactory);
+                out.writeByte((byte)1); // Java platform

Review comment:
       `ClientPlatform` it's a server-side class, perhaps it's better to introduce some client-side constant and refactor `scanQuery` as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606958859



##########
File path: modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
##########
@@ -718,4 +723,27 @@
      * @return Cursor.
      */
     public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry);
+
+    /**
+     * Registers a {@link CacheEntryListener}. The supplied {@link CacheEntryListenerConfiguration} is used to
+     * instantiate a listener and apply it to those events specified in the configuration.
+     * <p>
+     * NOTE: There is no failover in case of client channel failure, this event should be handled on the user's side.
+     * Listeners will be notified about client disconnect event if they implement {@link ClientChannelDisconnectListener}
+     * interface.
+     *
+     * @param cacheEntryListenerConfiguration a factory and related configuration for creating the listener.
+     * @throws IllegalArgumentException is the same CacheEntryListenerConfiguration is used more than once or
+     *          if some unsupported by thin client properties are set.
+     * @see CacheEntryListener
+     */
+    public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration);

Review comment:
       Ok, no objections.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r607278091



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       Please have a look at [C# implementation](https://github.com/apache/ignite/blob/master/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientNotificationHandler.cs) - hopefully I did not miss anything there.
   
   The only potential problem is that `requestId` is assumed to be unique across Compute, CQ and other future notification types - we should use a combined key instead (notificationType + requestId).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606947207



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.client.ClientChannelDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements NotificationListener, AutoCloseable {
+    /** "Keep binary" flag mask. */
+    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+    /** */
+    private final Cache<K, V> jCacheAdapter;
+
+    /** */
+    private final ReliableChannel ch;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    private final ClientUtils utils;
+
+    /** */
+    private final Consumer<ClientChannel> chCloseLsnr = this::onChannelClosed;
+
+    /** */
+    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** */
+    private volatile ClientChannel clientCh;
+
+    /** */
+    private volatile long rsrcId;
+
+    /** */
+    ClientCacheEntryListenerHandler(
+        Cache<K, V> jCacheAdapter,
+        ReliableChannel ch,
+        ClientBinaryMarshaller marsh,
+        boolean keepBinary
+    ) {
+        this.jCacheAdapter = jCacheAdapter;
+        this.ch = ch;
+        this.keepBinary = keepBinary;
+        utils = new ClientUtils(marsh);
+    }
+
+    /**
+     * Send request to the server and start
+     */
+    public synchronized void startListen(
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> rmtFilterFactory,
+        int pageSize,
+        long timeInterval,
+        boolean includeExpired
+    ) {
+        if (clientCh != null)
+            throw new IllegalStateException("Listener was already started");
+
+        this.locLsnr = locLsnr;
+
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
+            out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+            out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+            out.writeInt(pageSize);
+            out.writeLong(timeInterval);
+            out.writeBoolean(includeExpired);
+
+            if (rmtFilterFactory == null)
+                out.writeByte(GridBinaryMarshaller.NULL);
+            else {
+                utils.writeObject(out, rmtFilterFactory);
+                out.writeByte((byte)1); // Java platform
+            }
+        };
+
+        ch.addChannelCloseListener(chCloseLsnr);
+
+        try {
+            T2<ClientChannel, Long> params = ch.service(
+                ClientOperation.QUERY_CONTINUOUS,
+                qryWriter,
+                res -> new T2<>(res.clientChannel(), res.in().readLong())
+            );
+
+            clientCh = params.get1();
+            rsrcId = params.get2();
+        }
+        catch (ClientError e) {
+            ch.removeChannelCloseListener(chCloseLsnr);
+
+            throw new ClientException(e);
+        }
+
+        clientCh.addNotificationListener(this);
+    }
+
+    /**
+     * @param ch Channel.
+     */
+    private void onChannelClosed(ClientChannel ch) {
+        if (ch == clientCh) {
+            if (locLsnr instanceof ClientChannelDisconnectListener)
+                ((ClientChannelDisconnectListener)locLsnr).onDisconnected();
+
+            U.closeQuiet(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptNotification(
+            ClientChannel ch,
+            ClientOperation op,
+            long rsrcId,
+            ByteBuffer payload,
+            Exception err
+    ) {
+        if (op == ClientOperation.QUERY_CONTINUOUS_EVENT_NOTIFICATION && rsrcId == this.rsrcId) {
+            if (err == null && payload != null) {
+                BinaryInputStream in = BinaryByteBufferInputStream.create(payload);
+
+                int cnt = in.readInt();
+
+                List<CacheEntryEvent<? extends K, ? extends V>> evts = new ArrayList<>(cnt);
+
+                for (int i = 0; i < cnt; i++) {
+                    K key = utils.readObject(in, keepBinary);
+                    V oldVal = utils.readObject(in, keepBinary);
+                    V val = utils.readObject(in, keepBinary);
+                    byte evtTypeByte = in.readByte();
+
+                    EventType evtType = eventType(evtTypeByte);
+
+                    if (evtType != null) // Skip unknown event types.
+                        evts.add(new CacheEntryEventImpl<>(jCacheAdapter, evtType, key, oldVal, val));
+                }
+
+                locLsnr.onUpdated(evts);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() {
+        ch.removeChannelCloseListener(chCloseLsnr);
+
+        if (clientCh != null) {
+            clientCh.removeNotificationListener(this);
+
+            if (!clientCh.closed())
+                clientCh.service(ClientOperation.RESOURCE_CLOSE, ch -> ch.out().writeLong(rsrcId), null);
+        }
+    }
+
+    /**
+     * Client channel.
+     */
+    public ClientChannel clientChannel() {
+        return clientCh;
+    }
+
+    /** */
+    private EventType eventType(byte evtTypeByte) {
+        switch (evtTypeByte) {
+            case 0: return EventType.CREATED;
+            case 1: return EventType.UPDATED;
+            case 2: return EventType.REMOVED;
+            case 3: return EventType.EXPIRED;
+            default: return null;

Review comment:
       We don't have logger on the client-side




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606982991



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.client.ClientChannelDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements NotificationListener, AutoCloseable {
+    /** "Keep binary" flag mask. */
+    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+    /** */
+    private final Cache<K, V> jCacheAdapter;
+
+    /** */
+    private final ReliableChannel ch;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    private final ClientUtils utils;
+
+    /** */
+    private final Consumer<ClientChannel> chCloseLsnr = this::onChannelClosed;
+
+    /** */
+    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** */
+    private volatile ClientChannel clientCh;
+
+    /** */
+    private volatile long rsrcId;
+
+    /** */
+    ClientCacheEntryListenerHandler(
+        Cache<K, V> jCacheAdapter,
+        ReliableChannel ch,
+        ClientBinaryMarshaller marsh,
+        boolean keepBinary
+    ) {
+        this.jCacheAdapter = jCacheAdapter;
+        this.ch = ch;
+        this.keepBinary = keepBinary;
+        utils = new ClientUtils(marsh);
+    }
+
+    /**
+     * Send request to the server and start
+     */
+    public synchronized void startListen(
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> rmtFilterFactory,
+        int pageSize,
+        long timeInterval,
+        boolean includeExpired
+    ) {
+        if (clientCh != null)
+            throw new IllegalStateException("Listener was already started");
+
+        this.locLsnr = locLsnr;
+
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
+            out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+            out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+            out.writeInt(pageSize);
+            out.writeLong(timeInterval);
+            out.writeBoolean(includeExpired);
+
+            if (rmtFilterFactory == null)
+                out.writeByte(GridBinaryMarshaller.NULL);
+            else {
+                utils.writeObject(out, rmtFilterFactory);
+                out.writeByte((byte)1); // Java platform
+            }
+        };
+
+        ch.addChannelCloseListener(chCloseLsnr);
+
+        try {
+            T2<ClientChannel, Long> params = ch.service(
+                ClientOperation.QUERY_CONTINUOUS,
+                qryWriter,
+                res -> new T2<>(res.clientChannel(), res.in().readLong())
+            );
+
+            clientCh = params.get1();
+            rsrcId = params.get2();
+        }
+        catch (ClientError e) {
+            ch.removeChannelCloseListener(chCloseLsnr);
+
+            throw new ClientException(e);
+        }
+
+        clientCh.addNotificationListener(this);
+    }
+
+    /**
+     * @param ch Channel.
+     */
+    private void onChannelClosed(ClientChannel ch) {
+        if (ch == clientCh) {
+            if (locLsnr instanceof ClientChannelDisconnectListener)
+                ((ClientChannelDisconnectListener)locLsnr).onDisconnected();
+
+            U.closeQuiet(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptNotification(
+            ClientChannel ch,
+            ClientOperation op,
+            long rsrcId,
+            ByteBuffer payload,
+            Exception err
+    ) {
+        if (op == ClientOperation.QUERY_CONTINUOUS_EVENT_NOTIFICATION && rsrcId == this.rsrcId) {
+            if (err == null && payload != null) {
+                BinaryInputStream in = BinaryByteBufferInputStream.create(payload);
+
+                int cnt = in.readInt();
+
+                List<CacheEntryEvent<? extends K, ? extends V>> evts = new ArrayList<>(cnt);
+
+                for (int i = 0; i < cnt; i++) {
+                    K key = utils.readObject(in, keepBinary);
+                    V oldVal = utils.readObject(in, keepBinary);
+                    V val = utils.readObject(in, keepBinary);
+                    byte evtTypeByte = in.readByte();
+
+                    EventType evtType = eventType(evtTypeByte);
+
+                    if (evtType != null) // Skip unknown event types.
+                        evts.add(new CacheEntryEventImpl<>(jCacheAdapter, evtType, key, oldVal, val));
+                }
+
+                locLsnr.onUpdated(evts);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() {
+        ch.removeChannelCloseListener(chCloseLsnr);
+
+        if (clientCh != null) {
+            clientCh.removeNotificationListener(this);
+
+            if (!clientCh.closed())
+                clientCh.service(ClientOperation.RESOURCE_CLOSE, ch -> ch.out().writeLong(rsrcId), null);
+        }
+    }
+
+    /**
+     * Client channel.
+     */
+    public ClientChannel clientChannel() {
+        return clientCh;
+    }
+
+    /** */
+    private EventType eventType(byte evtTypeByte) {
+        switch (evtTypeByte) {
+            case 0: return EventType.CREATED;
+            case 1: return EventType.UPDATED;
+            case 2: return EventType.REMOVED;
+            case 3: return EventType.EXPIRED;
+            default: return null;

Review comment:
       Then we should close the channel with a `Throwable cause` that explains the problem.
   Invalid event type means some kind of protocol breakdown, we should not silently continue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606299495



##########
File path: modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
##########
@@ -703,11 +705,14 @@
     public <K1, V1> ClientCache<K1, V1> withExpirePolicy(ExpiryPolicy expirePlc);
 
     /**
-     * Queries cache. Supports {@link ScanQuery} and {@link SqlFieldsQuery}.
+     * Queries cache. Supports {@link ScanQuery}, {@link SqlFieldsQuery} and {@link ContinuousQuery}.
+     * <p>
+     * NOTE: For continuous query listeners there is no failover in case of client channel failure, this event should
+     * be handled on the user's side. Listeners will be notified about client disconnect event if they implement
+     * {@link ClientChannelDisconnectListener} interface.

Review comment:
       As noted on the [dev list](http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-Java-thin-client-Continuous-Queries-public-API-td52114.html), maybe we can provide an overload instead?
    ```java
   query(ContinuousQuery, ClientChannelDisconnectListener)
   ```

##########
File path: modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
##########
@@ -718,4 +723,27 @@
      * @return Cursor.
      */
     public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry);
+
+    /**
+     * Registers a {@link CacheEntryListener}. The supplied {@link CacheEntryListenerConfiguration} is used to
+     * instantiate a listener and apply it to those events specified in the configuration.
+     * <p>
+     * NOTE: There is no failover in case of client channel failure, this event should be handled on the user's side.
+     * Listeners will be notified about client disconnect event if they implement {@link ClientChannelDisconnectListener}
+     * interface.
+     *
+     * @param cacheEntryListenerConfiguration a factory and related configuration for creating the listener.
+     * @throws IllegalArgumentException is the same CacheEntryListenerConfiguration is used more than once or
+     *          if some unsupported by thin client properties are set.
+     * @see CacheEntryListener
+     */
+    public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration);

Review comment:
       Those 2 methods seem to be redundant - listener can be configured through `ContinuousQuery` class. In Thick API those are dictated by JCache, which we don't have here.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.client.ClientChannelDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *

Review comment:
       Missing javadoc

##########
File path: modules/core/src/main/java/org/apache/ignite/client/ClientChannelDisconnectListener.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import org.apache.ignite.cache.query.ContinuousQuery;
+
+/**
+ * This interface can be implemented by {@link ContinuousQuery} local listeners or by cache entry listeners
+ * registered via {@link ClientCache#registerCacheEntryListener(CacheEntryListenerConfiguration)} to get notification
+ * about client channel disconnected event.
+ */
+public interface ClientChannelDisconnectListener {

Review comment:
       `ClientDisconnectListener` seems to be a better name - `channel` sounds like an implementation detail.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.client.ClientChannelDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements NotificationListener, AutoCloseable {
+    /** "Keep binary" flag mask. */
+    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+    /** */
+    private final Cache<K, V> jCacheAdapter;
+
+    /** */
+    private final ReliableChannel ch;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    private final ClientUtils utils;
+
+    /** */
+    private final Consumer<ClientChannel> chCloseLsnr = this::onChannelClosed;
+
+    /** */
+    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** */
+    private volatile ClientChannel clientCh;
+
+    /** */
+    private volatile long rsrcId;
+
+    /** */
+    ClientCacheEntryListenerHandler(
+        Cache<K, V> jCacheAdapter,
+        ReliableChannel ch,
+        ClientBinaryMarshaller marsh,
+        boolean keepBinary
+    ) {
+        this.jCacheAdapter = jCacheAdapter;
+        this.ch = ch;
+        this.keepBinary = keepBinary;
+        utils = new ClientUtils(marsh);
+    }
+
+    /**
+     * Send request to the server and start
+     */
+    public synchronized void startListen(
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> rmtFilterFactory,
+        int pageSize,
+        long timeInterval,
+        boolean includeExpired
+    ) {
+        if (clientCh != null)
+            throw new IllegalStateException("Listener was already started");
+
+        this.locLsnr = locLsnr;
+
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
+            out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+            out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+            out.writeInt(pageSize);
+            out.writeLong(timeInterval);
+            out.writeBoolean(includeExpired);
+
+            if (rmtFilterFactory == null)
+                out.writeByte(GridBinaryMarshaller.NULL);
+            else {
+                utils.writeObject(out, rmtFilterFactory);
+                out.writeByte((byte)1); // Java platform
+            }
+        };
+
+        ch.addChannelCloseListener(chCloseLsnr);
+
+        try {
+            T2<ClientChannel, Long> params = ch.service(
+                ClientOperation.QUERY_CONTINUOUS,
+                qryWriter,
+                res -> new T2<>(res.clientChannel(), res.in().readLong())
+            );
+
+            clientCh = params.get1();
+            rsrcId = params.get2();
+        }
+        catch (ClientError e) {
+            ch.removeChannelCloseListener(chCloseLsnr);
+
+            throw new ClientException(e);
+        }
+
+        clientCh.addNotificationListener(this);
+    }
+
+    /**
+     * @param ch Channel.
+     */
+    private void onChannelClosed(ClientChannel ch) {
+        if (ch == clientCh) {
+            if (locLsnr instanceof ClientChannelDisconnectListener)
+                ((ClientChannelDisconnectListener)locLsnr).onDisconnected();
+
+            U.closeQuiet(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptNotification(
+            ClientChannel ch,
+            ClientOperation op,
+            long rsrcId,
+            ByteBuffer payload,
+            Exception err
+    ) {
+        if (op == ClientOperation.QUERY_CONTINUOUS_EVENT_NOTIFICATION && rsrcId == this.rsrcId) {

Review comment:
       nit: Invert condition to reduce nesting

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.client.ClientChannelDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements NotificationListener, AutoCloseable {
+    /** "Keep binary" flag mask. */
+    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+    /** */
+    private final Cache<K, V> jCacheAdapter;
+
+    /** */
+    private final ReliableChannel ch;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    private final ClientUtils utils;
+
+    /** */
+    private final Consumer<ClientChannel> chCloseLsnr = this::onChannelClosed;
+
+    /** */
+    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** */
+    private volatile ClientChannel clientCh;
+
+    /** */
+    private volatile long rsrcId;
+
+    /** */
+    ClientCacheEntryListenerHandler(
+        Cache<K, V> jCacheAdapter,
+        ReliableChannel ch,
+        ClientBinaryMarshaller marsh,
+        boolean keepBinary
+    ) {
+        this.jCacheAdapter = jCacheAdapter;
+        this.ch = ch;
+        this.keepBinary = keepBinary;
+        utils = new ClientUtils(marsh);
+    }
+
+    /**
+     * Send request to the server and start
+     */
+    public synchronized void startListen(
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> rmtFilterFactory,
+        int pageSize,
+        long timeInterval,
+        boolean includeExpired
+    ) {
+        if (clientCh != null)
+            throw new IllegalStateException("Listener was already started");
+
+        this.locLsnr = locLsnr;
+
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
+            out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+            out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+            out.writeInt(pageSize);
+            out.writeLong(timeInterval);
+            out.writeBoolean(includeExpired);
+
+            if (rmtFilterFactory == null)
+                out.writeByte(GridBinaryMarshaller.NULL);
+            else {
+                utils.writeObject(out, rmtFilterFactory);
+                out.writeByte((byte)1); // Java platform

Review comment:
       `ClientPlatform.JAVA`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.client.ClientChannelDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements NotificationListener, AutoCloseable {
+    /** "Keep binary" flag mask. */
+    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+    /** */
+    private final Cache<K, V> jCacheAdapter;
+
+    /** */
+    private final ReliableChannel ch;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    private final ClientUtils utils;
+
+    /** */
+    private final Consumer<ClientChannel> chCloseLsnr = this::onChannelClosed;
+
+    /** */
+    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** */
+    private volatile ClientChannel clientCh;
+
+    /** */
+    private volatile long rsrcId;
+
+    /** */
+    ClientCacheEntryListenerHandler(
+        Cache<K, V> jCacheAdapter,
+        ReliableChannel ch,
+        ClientBinaryMarshaller marsh,
+        boolean keepBinary
+    ) {
+        this.jCacheAdapter = jCacheAdapter;
+        this.ch = ch;
+        this.keepBinary = keepBinary;
+        utils = new ClientUtils(marsh);
+    }
+
+    /**
+     * Send request to the server and start
+     */
+    public synchronized void startListen(
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> rmtFilterFactory,
+        int pageSize,
+        long timeInterval,
+        boolean includeExpired
+    ) {
+        if (clientCh != null)
+            throw new IllegalStateException("Listener was already started");
+
+        this.locLsnr = locLsnr;
+
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
+            out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+            out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+            out.writeInt(pageSize);
+            out.writeLong(timeInterval);
+            out.writeBoolean(includeExpired);
+
+            if (rmtFilterFactory == null)
+                out.writeByte(GridBinaryMarshaller.NULL);
+            else {
+                utils.writeObject(out, rmtFilterFactory);
+                out.writeByte((byte)1); // Java platform
+            }
+        };
+
+        ch.addChannelCloseListener(chCloseLsnr);
+
+        try {
+            T2<ClientChannel, Long> params = ch.service(
+                ClientOperation.QUERY_CONTINUOUS,
+                qryWriter,
+                res -> new T2<>(res.clientChannel(), res.in().readLong())
+            );
+
+            clientCh = params.get1();
+            rsrcId = params.get2();
+        }
+        catch (ClientError e) {
+            ch.removeChannelCloseListener(chCloseLsnr);
+
+            throw new ClientException(e);
+        }
+
+        clientCh.addNotificationListener(this);
+    }
+
+    /**
+     * @param ch Channel.
+     */
+    private void onChannelClosed(ClientChannel ch) {
+        if (ch == clientCh) {
+            if (locLsnr instanceof ClientChannelDisconnectListener)
+                ((ClientChannelDisconnectListener)locLsnr).onDisconnected();
+
+            U.closeQuiet(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptNotification(
+            ClientChannel ch,
+            ClientOperation op,
+            long rsrcId,
+            ByteBuffer payload,
+            Exception err
+    ) {
+        if (op == ClientOperation.QUERY_CONTINUOUS_EVENT_NOTIFICATION && rsrcId == this.rsrcId) {
+            if (err == null && payload != null) {

Review comment:
       Combine with the previous condition?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.client.ClientChannelDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements NotificationListener, AutoCloseable {
+    /** "Keep binary" flag mask. */
+    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+    /** */
+    private final Cache<K, V> jCacheAdapter;
+
+    /** */
+    private final ReliableChannel ch;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    private final ClientUtils utils;
+
+    /** */
+    private final Consumer<ClientChannel> chCloseLsnr = this::onChannelClosed;
+
+    /** */
+    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** */
+    private volatile ClientChannel clientCh;
+
+    /** */
+    private volatile long rsrcId;
+
+    /** */
+    ClientCacheEntryListenerHandler(
+        Cache<K, V> jCacheAdapter,
+        ReliableChannel ch,
+        ClientBinaryMarshaller marsh,
+        boolean keepBinary
+    ) {
+        this.jCacheAdapter = jCacheAdapter;
+        this.ch = ch;
+        this.keepBinary = keepBinary;
+        utils = new ClientUtils(marsh);
+    }
+
+    /**
+     * Send request to the server and start
+     */
+    public synchronized void startListen(
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> rmtFilterFactory,
+        int pageSize,
+        long timeInterval,
+        boolean includeExpired
+    ) {
+        if (clientCh != null)
+            throw new IllegalStateException("Listener was already started");
+
+        this.locLsnr = locLsnr;
+
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
+            out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+            out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+            out.writeInt(pageSize);
+            out.writeLong(timeInterval);
+            out.writeBoolean(includeExpired);
+
+            if (rmtFilterFactory == null)
+                out.writeByte(GridBinaryMarshaller.NULL);
+            else {
+                utils.writeObject(out, rmtFilterFactory);
+                out.writeByte((byte)1); // Java platform
+            }
+        };
+
+        ch.addChannelCloseListener(chCloseLsnr);
+
+        try {
+            T2<ClientChannel, Long> params = ch.service(
+                ClientOperation.QUERY_CONTINUOUS,
+                qryWriter,
+                res -> new T2<>(res.clientChannel(), res.in().readLong())
+            );
+
+            clientCh = params.get1();
+            rsrcId = params.get2();
+        }
+        catch (ClientError e) {
+            ch.removeChannelCloseListener(chCloseLsnr);
+
+            throw new ClientException(e);
+        }
+
+        clientCh.addNotificationListener(this);
+    }
+
+    /**
+     * @param ch Channel.
+     */
+    private void onChannelClosed(ClientChannel ch) {
+        if (ch == clientCh) {
+            if (locLsnr instanceof ClientChannelDisconnectListener)
+                ((ClientChannelDisconnectListener)locLsnr).onDisconnected();
+
+            U.closeQuiet(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptNotification(
+            ClientChannel ch,
+            ClientOperation op,
+            long rsrcId,
+            ByteBuffer payload,
+            Exception err
+    ) {
+        if (op == ClientOperation.QUERY_CONTINUOUS_EVENT_NOTIFICATION && rsrcId == this.rsrcId) {
+            if (err == null && payload != null) {
+                BinaryInputStream in = BinaryByteBufferInputStream.create(payload);
+
+                int cnt = in.readInt();
+
+                List<CacheEntryEvent<? extends K, ? extends V>> evts = new ArrayList<>(cnt);
+
+                for (int i = 0; i < cnt; i++) {
+                    K key = utils.readObject(in, keepBinary);
+                    V oldVal = utils.readObject(in, keepBinary);
+                    V val = utils.readObject(in, keepBinary);
+                    byte evtTypeByte = in.readByte();
+
+                    EventType evtType = eventType(evtTypeByte);
+
+                    if (evtType != null) // Skip unknown event types.
+                        evts.add(new CacheEntryEventImpl<>(jCacheAdapter, evtType, key, oldVal, val));
+                }
+
+                locLsnr.onUpdated(evts);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() {
+        ch.removeChannelCloseListener(chCloseLsnr);
+
+        if (clientCh != null) {
+            clientCh.removeNotificationListener(this);

Review comment:
       Even though we never set `clientCh` from non-null to null, and this code seems to be safe right now,
   the field is still marked `volatile`, so this use-after-nullcheck looks suspicious.
   
   To reduce cognitive load, copy `clientCh` to a temp variable, and then check for null.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       I think we should do the following:
   * Refactor existing `TcpClientChannel.notificationLsnrs` to a `Map<Long, NotificationListener>`
   * Instead of iterating over all `notificationLsnrs` and checking the id in every listener, get the listener by id from map and invoke it without inner checks
   * Use this mechanism for continuous query notifications, avoid creating a new registry
   
   This approach seems to work fine in .NET thin client.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.client.ClientChannelDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements NotificationListener, AutoCloseable {
+    /** "Keep binary" flag mask. */
+    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+    /** */
+    private final Cache<K, V> jCacheAdapter;
+
+    /** */
+    private final ReliableChannel ch;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    private final ClientUtils utils;
+
+    /** */
+    private final Consumer<ClientChannel> chCloseLsnr = this::onChannelClosed;
+
+    /** */
+    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** */
+    private volatile ClientChannel clientCh;
+
+    /** */
+    private volatile long rsrcId;
+
+    /** */
+    ClientCacheEntryListenerHandler(
+        Cache<K, V> jCacheAdapter,
+        ReliableChannel ch,
+        ClientBinaryMarshaller marsh,
+        boolean keepBinary
+    ) {
+        this.jCacheAdapter = jCacheAdapter;
+        this.ch = ch;
+        this.keepBinary = keepBinary;
+        utils = new ClientUtils(marsh);
+    }
+
+    /**
+     * Send request to the server and start
+     */
+    public synchronized void startListen(
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> rmtFilterFactory,
+        int pageSize,
+        long timeInterval,
+        boolean includeExpired
+    ) {
+        if (clientCh != null)
+            throw new IllegalStateException("Listener was already started");
+
+        this.locLsnr = locLsnr;
+
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
+            out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+            out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+            out.writeInt(pageSize);
+            out.writeLong(timeInterval);
+            out.writeBoolean(includeExpired);
+
+            if (rmtFilterFactory == null)
+                out.writeByte(GridBinaryMarshaller.NULL);
+            else {
+                utils.writeObject(out, rmtFilterFactory);
+                out.writeByte((byte)1); // Java platform
+            }
+        };
+
+        ch.addChannelCloseListener(chCloseLsnr);
+
+        try {
+            T2<ClientChannel, Long> params = ch.service(
+                ClientOperation.QUERY_CONTINUOUS,
+                qryWriter,
+                res -> new T2<>(res.clientChannel(), res.in().readLong())
+            );
+
+            clientCh = params.get1();
+            rsrcId = params.get2();
+        }
+        catch (ClientError e) {
+            ch.removeChannelCloseListener(chCloseLsnr);
+
+            throw new ClientException(e);
+        }
+
+        clientCh.addNotificationListener(this);
+    }
+
+    /**
+     * @param ch Channel.
+     */
+    private void onChannelClosed(ClientChannel ch) {
+        if (ch == clientCh) {
+            if (locLsnr instanceof ClientChannelDisconnectListener)
+                ((ClientChannelDisconnectListener)locLsnr).onDisconnected();
+
+            U.closeQuiet(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptNotification(
+            ClientChannel ch,
+            ClientOperation op,
+            long rsrcId,
+            ByteBuffer payload,
+            Exception err
+    ) {
+        if (op == ClientOperation.QUERY_CONTINUOUS_EVENT_NOTIFICATION && rsrcId == this.rsrcId) {
+            if (err == null && payload != null) {
+                BinaryInputStream in = BinaryByteBufferInputStream.create(payload);
+
+                int cnt = in.readInt();
+
+                List<CacheEntryEvent<? extends K, ? extends V>> evts = new ArrayList<>(cnt);
+
+                for (int i = 0; i < cnt; i++) {
+                    K key = utils.readObject(in, keepBinary);
+                    V oldVal = utils.readObject(in, keepBinary);
+                    V val = utils.readObject(in, keepBinary);
+                    byte evtTypeByte = in.readByte();
+
+                    EventType evtType = eventType(evtTypeByte);
+
+                    if (evtType != null) // Skip unknown event types.
+                        evts.add(new CacheEntryEventImpl<>(jCacheAdapter, evtType, key, oldVal, val));
+                }
+
+                locLsnr.onUpdated(evts);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() {
+        ch.removeChannelCloseListener(chCloseLsnr);
+
+        if (clientCh != null) {
+            clientCh.removeNotificationListener(this);
+
+            if (!clientCh.closed())
+                clientCh.service(ClientOperation.RESOURCE_CLOSE, ch -> ch.out().writeLong(rsrcId), null);
+        }
+    }
+
+    /**
+     * Client channel.
+     */
+    public ClientChannel clientChannel() {
+        return clientCh;
+    }
+
+    /** */
+    private EventType eventType(byte evtTypeByte) {
+        switch (evtTypeByte) {
+            case 0: return EventType.CREATED;
+            case 1: return EventType.UPDATED;
+            case 2: return EventType.REMOVED;
+            case 3: return EventType.EXPIRED;
+            default: return null;

Review comment:
       Should we log an error instead? Other event types are not valid.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606944121



##########
File path: modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
##########
@@ -703,11 +705,14 @@
     public <K1, V1> ClientCache<K1, V1> withExpirePolicy(ExpiryPolicy expirePlc);
 
     /**
-     * Queries cache. Supports {@link ScanQuery} and {@link SqlFieldsQuery}.
+     * Queries cache. Supports {@link ScanQuery}, {@link SqlFieldsQuery} and {@link ContinuousQuery}.
+     * <p>
+     * NOTE: For continuous query listeners there is no failover in case of client channel failure, this event should
+     * be handled on the user's side. Listeners will be notified about client disconnect event if they implement
+     * {@link ClientChannelDisconnectListener} interface.

Review comment:
       Ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r609470663



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
##########
@@ -947,4 +1061,73 @@ private void writeEntries(Map<? extends K, ? extends V> map, PayloadOutputChanne
                     serDes.writeObject(out, e.getValue());
                 });
     }
+
+    /**
+     * Adapter to convert CQ listener calls to JCache listener calls.
+     */
+    private static class JCacheEntryListenerAdapter<K, V> implements CacheEntryUpdatedListener<K, V> {

Review comment:
       I've moved `JCacheEntryListenerAdapter` to the separate file, but `JCacheDisconnectListenerAdapter` is not static and relies on `TcpClientCache` fields, so I've inlined it as an anonymous class. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r607567991



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       > The same is true for Continuous Query - you can get a notification before you know the query ID and register the listener (and you can have multiple active queries).
   
   For CQ we don't need such strong guarantees. We can skip some events from the start and CQ still will be valid (it's like if we start listening a little bit later).
   
   > This is another piece of logic to be reused.
   > We can queue "unknown" notifications per resourceId and handle them as soon as listener is registered
   
   The difference between CQ and compute: compute can't skip notifications, compute has exactly one notification, taking into account this logic compute was implemented in such way, that unregistering listener performed when receiving notification. For CQ there can be many notifications per listener and listener unregistering should be performed when we close CQ. And if we will use some queue for pending notifications there will be notifications leaks after CQ closing, some notifications can stay in queue forever (as far as I understand C# implementation has the same problem). We can use some flag for notification type (use pending notification queue for compute and don't use it for CQ) to solve this problem, but I'm not sure that it's better than solution with different logic for different components.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r607024859



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       > There is no requirement for the uniqueness of this ID
   
   Right now this ID is required to match either a compute task ID, or a continuous query ID.
   A combination of `resId`+`notificationCode` is required to be "unique". We can use `IgniteBiTuple` as a map key.
   If we are going to introduce predefined ids (like 0 or -1) for general-purpose events - that will work fine as well.
   
   > each listener can implement its own strategy for resource ID mapping
   
   This looks like code duplication to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606975129



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       This will require creating new listener for each compute task and refactoring of existing compute functionality (I think if we agreed to do this, it should be done by another ticket). Also, this change will add new limitations: we can't use notifications without resource id in the future.
   To reduce count of listeners, I think for continuous queries we can act the same way as for compute: register only one listener for QUERY_CONTINUOUS_EVENT_NOTIFICATION and find handles in this listener by resource id using some map. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r608149299



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {

Review comment:
       @ptupitsyn, all comments fixed, ticket rebased, can you please have a look again?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8960: IGNITE-14402 Java thin client: Continuous queries support

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8960:
URL: https://github.com/apache/ignite/pull/8960#discussion_r606944935



##########
File path: modules/core/src/main/java/org/apache/ignite/client/ClientChannelDisconnectListener.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import org.apache.ignite.cache.query.ContinuousQuery;
+
+/**
+ * This interface can be implemented by {@link ContinuousQuery} local listeners or by cache entry listeners
+ * registered via {@link ClientCache#registerCacheEntryListener(CacheEntryListenerConfiguration)} to get notification
+ * about client channel disconnected event.
+ */
+public interface ClientChannelDisconnectListener {

Review comment:
       Ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org