You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Ankit Singhai <an...@gmail.com> on 2016/12/28 12:33:07 UTC

IgniteDataStreamer with Continuous Query

Hi All,
Can we have DataStreamerer pushing data via StreamTransformer to an cache
and an ContinousQuery executing on cache to raise the event when the
condition meets?

Any sample would do lot of help.

Thanks
Ankit Singhai



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/IgniteDataStreamer-with-Continuous-Query-tp9779.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: IgniteDataStreamer with Continuous Query

Posted by Denis Magda <dm...@gridgain.com>.
Hi,

You need to put the continuous query execution statement out of the
try-with-resource block. So, the code has to look like this

                QueryCursor<Cache.Entry&lt;String, Integer>> cur =
stmCache.query(continuousQuery);

                for(Cache.Entry<String,Integer> c : cur) {
                    System.out.println(c);
                }

Presently, your continuous query is automatically closed right after the
initial result is received. This is triggered by try-with-resource block.

--
Denis



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/IgniteDataStreamer-with-Continuous-Query-tp9779p9810.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: IgniteDataStreamer with Continuous Query

Posted by ANKIT SINGHAI <an...@gmail.com>.
Can someone please have a look at below query?

Thanks

On Dec 29, 2016 2:13 PM, "Ankit Singhai" <an...@gmail.com> wrote:

> Hi,
> In my test case scenario I am pumping data into cache via data streamer
> which has Created expiration policy of 60 seconds to have sliding window of
> 60 seconds, after the initial bursts I am making my thread to sleep then
> again pump data, but for the 2nd bursts I am not getting any events (local
> or remote) only the data I receive is after initial search. Can somebody
> point out what I am doing wrong here?
>
> Ignite Configuration
> <bean id="ignite.cfg"
> class="org.apache.ignite.configuration.IgniteConfiguration">
>         <property name="clientMode" value="true"/>
>         <property name="gridName" value="TestGrid"/>
>
>         <property name="peerClassLoadingEnabled" value="true"/>
>
>
>
>
>
>
>
>         <property name="discoverySpi">
>             <bean
> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>                 <property name="ipFinder">
>
>
>
>                     <bean
> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.
> TcpDiscoveryVmIpFinder">
>                         <property name="addresses">
>                             <list>
>
>                                 <value>127.0.0.1:47500..47509</value>
>                             </list>
>                         </property>
>                     </bean>
>                 </property>
>                 <property name="localAddress" value="127.0.0.1"/>
>
>                 <property name="heartbeatFrequency" value="2000"/>
>             </bean>
>         </property>
>
>         <property name="lifecycleBeans">
>             <list>
>                 <bean class="com.gvc.LifeCycleBeanImpl"
> id="lifeCycleBeanImpl"/>
>             </list>
>         </property>
>
>         <property name="communicationSpi">
>             <bean
> class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
>                 <property name="slowClientQueueLimit" value="1000"/>
>                 <property name="localPort" value="4321"/>
>             </bean>
>         </property>
>     </bean>
>
> Cache Configuration
>     <bean id="ipCache"
> class="org.apache.ignite.configuration.CacheConfiguration"
> scope="singleton">
>         <property name="name" value="ipCache"/>
>
>
>         <property name="cacheMode" value="REPLICATED"/>
>
>         <property name="memoryMode" value="ONHEAP_TIERED"/>
>
>
>         <property name="offHeapMaxMemory" value="#{256 * 1024L * 1024L}"/>
>
>
>         <property name="backups" value="0"/>
>
>
>         <property name="writeSynchronizationMode" value="PRIMARY_SYNC"/>
>
>
>         <property name="startSize" value="#{1 * 1024 * 1024}"/>
>
>
>         <property name="swapEnabled" value="false"/>
>
>
>         <property name="evictionPolicy">
>             <bean
> class="org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy">
>
>                 <property name="maxSize" value="1000"/>
>             </bean>
>         </property>
>
>
>         <property name="rebalanceMode" value="SYNC"/>
>
>
>         <property name="rebalanceBatchSize" value="#{1024 * 1024}"/>
>
>
>         <property name="rebalanceThrottle" value="0"/>
>
>
>         <property name="rebalanceThreadPoolSize" value="4"/>
>     </bean>
>
> Ignite Data Streamer Code via StreamTransformer
>
> CacheConfiguration<String,Integer> ipCacheConfiguration =
> (CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache");
>         ipCacheConfiguration.setIndexedTypes(String.class, Integer.class);
>
> ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
> CreatedExpiryPolicy(new Duration(SECONDS, 60))));
>
>         IgniteCache<String,Integer> ipCache =
> ignite.getOrCreateCache(ipCacheConfiguration);
>
>         Random RAND = new Random();
>
>         try (IgniteDataStreamer<String,Integer> igniteDataStreamer =
> ignite.dataStreamer(ipCache.getName())){
>             igniteDataStreamer.allowOverwrite(true);
>
>             igniteDataStreamer.receiver(new
> StreamTransformer<String,Integer>() {
>                 @Override
>                 public Object process(MutableEntry<String,Integer>
> mutableEntry, Object... objects) throws EntryProcessorException {
>                     Integer val = mutableEntry.getValue();
>
>                     // Increment count by 1.
>                     mutableEntry.setValue(val == null ? (int) 1L : val +
> 1);
>
>                     return null;
>                 }
>             });
>
>             int range = 1000;
>             for(int i = 1 ; i <= 100000 ; i++) {
>                 igniteDataStreamer.addData(""+RAND.nextInt(range), 1);
>             }
>
>             try {
>                 Thread.sleep(70000);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>
>             System.out.println("After Sleeping");
>             for(int i = 1 ; i <= 100000 ; i++) {
>                 igniteDataStreamer.addData(""+RAND.nextInt(range), 1);
>             }
>         }
>
> Continuous Query Code (running on an different JVM)
> CacheConfiguration<String,Integer> ipCacheConfiguration =
> (CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache");
>         ipCacheConfiguration.setIndexedTypes(String.class, Integer.class);
>
> ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
> CreatedExpiryPolicy(new Duration(SECONDS, 60))));
>
>         IgniteCache<String,Integer> ipCache =
> ignite.getOrCreateCache(ipCacheConfiguration);
>
>         ContinuousQuery<String,Integer> continuousQuery = new
> ContinuousQuery<>();
>         continuousQuery.setInitialQuery(new ScanQuery<String, Integer>(new
> IgniteBiPredicate<String, Integer>() {
>             @Override
>             public boolean apply(String integer, Integer integer2) {
>                 return integer2 > 100;
>             }
>         }));
>
>         continuousQuery.setLocalListener(new
> CacheEntryUpdatedListener<String, Integer>() {
>             @Override
>             public void onUpdated(Iterable<CacheEntryEvent&lt;? extends
> String, ? extends Integer>> iterable) throws CacheEntryListenerException {
>                 for (CacheEntryEvent events : iterable) {
>                     System.out.println(" Inside local listener :: " +
> events);
>                 }
>             }
>         });
>
>         continuousQuery.setRemoteFilterFactory(new
> Factory<CacheEntryEventFilter&lt;String, Integer>>() {
>             @Override
>             public CacheEntryEventFilter<String, Integer> create() {
>                 return new CacheEntryEventFilter<String, Integer>() {
>                     @Override
>                     public boolean evaluate(CacheEntryEvent<? extends
> String, ? extends Integer> cacheEntryEvent) throws
> CacheEntryListenerException {
>                         System.out.println("Remote Listener");
>                         return cacheEntryEvent.getValue() > 100;
>                     }
>                 };
>             }
>         });
>
>         try (QueryCursor<Cache.Entry&lt;String, Integer>> cur =
> ipCache.query(continuousQuery)){
>             for(Cache.Entry<String,Integer> c : cur) {
>                 System.out.println(c);
>             }
>         }
>
> Thanks
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/IgniteDataStreamer-with-Continuous-Query-tp9779p9795.
> html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: IgniteDataStreamer with Continuous Query

Posted by Ankit Singhai <an...@gmail.com>.
Hi,
In my test case scenario I am pumping data into cache via data streamer
which has Created expiration policy of 60 seconds to have sliding window of
60 seconds, after the initial bursts I am making my thread to sleep then
again pump data, but for the 2nd bursts I am not getting any events (local
or remote) only the data I receive is after initial search. Can somebody
point out what I am doing wrong here?

Ignite Configuration
<bean id="ignite.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="clientMode" value="true"/>
        <property name="gridName" value="TestGrid"/>

        <property name="peerClassLoadingEnabled" value="true"/>

        
        

        
        

        <property name="discoverySpi">
            <bean
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    
                    
                    
                    <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
                <property name="localAddress" value="127.0.0.1"/>
                
                <property name="heartbeatFrequency" value="2000"/>
            </bean>
        </property>

        <property name="lifecycleBeans">
            <list>
                <bean class="com.gvc.LifeCycleBeanImpl"
id="lifeCycleBeanImpl"/>
            </list>
        </property>

        <property name="communicationSpi">
            <bean
class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
                <property name="slowClientQueueLimit" value="1000"/>
                <property name="localPort" value="4321"/>
            </bean>
        </property>
    </bean>

Cache Configuration
    <bean id="ipCache"
class="org.apache.ignite.configuration.CacheConfiguration"
scope="singleton">
        <property name="name" value="ipCache"/>

        
        <property name="cacheMode" value="REPLICATED"/>
        
        <property name="memoryMode" value="ONHEAP_TIERED"/>

        
        <property name="offHeapMaxMemory" value="#{256 * 1024L * 1024L}"/>

        
        <property name="backups" value="0"/>

        
        <property name="writeSynchronizationMode" value="PRIMARY_SYNC"/>

        
        <property name="startSize" value="#{1 * 1024 * 1024}"/>

        
        <property name="swapEnabled" value="false"/>

        
        <property name="evictionPolicy">
            <bean
class="org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy">
                
                <property name="maxSize" value="1000"/>
            </bean>
        </property>

        
        <property name="rebalanceMode" value="SYNC"/>

        
        <property name="rebalanceBatchSize" value="#{1024 * 1024}"/>

        
        <property name="rebalanceThrottle" value="0"/>

        
        <property name="rebalanceThreadPoolSize" value="4"/>
    </bean>

Ignite Data Streamer Code via StreamTransformer

CacheConfiguration<String,Integer> ipCacheConfiguration =
(CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache");
        ipCacheConfiguration.setIndexedTypes(String.class, Integer.class);
       
ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
CreatedExpiryPolicy(new Duration(SECONDS, 60))));

        IgniteCache<String,Integer> ipCache =
ignite.getOrCreateCache(ipCacheConfiguration);

        Random RAND = new Random();

        try (IgniteDataStreamer<String,Integer> igniteDataStreamer =
ignite.dataStreamer(ipCache.getName())){
            igniteDataStreamer.allowOverwrite(true);

            igniteDataStreamer.receiver(new
StreamTransformer<String,Integer>() {
                @Override
                public Object process(MutableEntry<String,Integer>
mutableEntry, Object... objects) throws EntryProcessorException {
                    Integer val = mutableEntry.getValue();

                    // Increment count by 1.
                    mutableEntry.setValue(val == null ? (int) 1L : val + 1);

                    return null;
                }
            });

            int range = 1000;
            for(int i = 1 ; i <= 100000 ; i++) {
                igniteDataStreamer.addData(""+RAND.nextInt(range), 1);
            }

            try {
                Thread.sleep(70000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("After Sleeping");
            for(int i = 1 ; i <= 100000 ; i++) {
                igniteDataStreamer.addData(""+RAND.nextInt(range), 1);
            }
        }

Continuous Query Code (running on an different JVM)
CacheConfiguration<String,Integer> ipCacheConfiguration =
(CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache");
        ipCacheConfiguration.setIndexedTypes(String.class, Integer.class);
       
ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
CreatedExpiryPolicy(new Duration(SECONDS, 60))));

        IgniteCache<String,Integer> ipCache =
ignite.getOrCreateCache(ipCacheConfiguration);

        ContinuousQuery<String,Integer> continuousQuery = new
ContinuousQuery<>();
        continuousQuery.setInitialQuery(new ScanQuery<String, Integer>(new
IgniteBiPredicate<String, Integer>() {
            @Override
            public boolean apply(String integer, Integer integer2) {
                return integer2 > 100;
            }
        }));

        continuousQuery.setLocalListener(new
CacheEntryUpdatedListener<String, Integer>() {
            @Override
            public void onUpdated(Iterable<CacheEntryEvent&lt;? extends
String, ? extends Integer>> iterable) throws CacheEntryListenerException {
                for (CacheEntryEvent events : iterable) {
                    System.out.println(" Inside local listener :: " +
events);
                }
            }
        });

        continuousQuery.setRemoteFilterFactory(new
Factory<CacheEntryEventFilter&lt;String, Integer>>() {
            @Override
            public CacheEntryEventFilter<String, Integer> create() {
                return new CacheEntryEventFilter<String, Integer>() {
                    @Override
                    public boolean evaluate(CacheEntryEvent<? extends
String, ? extends Integer> cacheEntryEvent) throws
CacheEntryListenerException {
                        System.out.println("Remote Listener");
                        return cacheEntryEvent.getValue() > 100;
                    }
                };
            }
        });

        try (QueryCursor<Cache.Entry&lt;String, Integer>> cur =
ipCache.query(continuousQuery)){
            for(Cache.Entry<String,Integer> c : cur) {
                System.out.println(c);
            }
        }

Thanks



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/IgniteDataStreamer-with-Continuous-Query-tp9779p9795.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: IgniteDataStreamer with Continuous Query

Posted by ANKIT SINGHAI <an...@gmail.com>.
Thanks

On Dec 28, 2016 7:12 PM, "dkarachentsev" <dk...@gridgain.com> wrote:

> Hi Ankit,
>
> Yes, you can use ContinuousQuery with DataStreamer. Code samples you may
> found here [1] and [2].
>
> [1]
> https://github.com/apache/ignite/blob/master/examples/
> src/main/java/org/apache/ignite/examples/streaming/
> StreamTransformerExample.java
>
> [2]
> https://github.com/apache/ignite/blob/master/examples/
> src/main/java/org/apache/ignite/examples/datagrid/
> CacheContinuousQueryExample.java
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/IgniteDataStreamer-with-Continuous-Query-tp9779p9781.
> html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: IgniteDataStreamer with Continuous Query

Posted by dkarachentsev <dk...@gridgain.com>.
Hi Ankit,

Yes, you can use ContinuousQuery with DataStreamer. Code samples you may
found here [1] and [2].

[1]
https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java

[2]
https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/IgniteDataStreamer-with-Continuous-Query-tp9779p9781.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.