You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Charlin S <Ch...@hotelhub.com> on 2022/05/26 14:23:35 UTC

C# GetDataStreamer with WithExpiryPolicy not working.

We have a requirement to set data to expire after some time.
I set the WithExpiryPolicy for cache instance, but the data added by
GetDataStreamer does not expire, due to it returning a new instance with
default policies.
So I am trying to use IStreamReceiver but not able to build the solution.

IStreamReceiver  Code:
 public  class MyStreamReceiver : IStreamReceiver<string, object>
    {
        public void Receive(ICache<string, object> cache,
ICollection<ICacheEntry<string, object>> entries)
        {
            foreach (var entry in entries)
            {
                cache.WithExpiryPolicy(new
ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
entry.Value);
            }
        }
    }

Datastreamer code error
[image: image.png]

How to implement IStreamReceiver. Please help me on this.
Regards,
Charlin

Re: C# GetDataStreamer with WithExpiryPolicy not working.

Posted by Pavel Tupitsyn <pt...@apache.org>.
You can use multiple nodes, this is the whole point of Ignite.

1. I suggest testing something on a single node so that you can use the
debugger and hit breakpoints. When it works, move to multi-node setup.
2. When running multiple nodes, ensure that the stream receiver is deployed
on all nodes (see
https://ignite.apache.org/docs/latest/net-specific/net-standalone-nodes#load-user-assemblies
)
3. When something fails, check the logs on all nodes for exceptions.

On Wed, Jun 1, 2022 at 10:34 AM Charlin S <Ch...@hotelhub.com> wrote:

> Hi,
> I am using 3(2 server and 1 client) node cluster, why can we not run this
> with multiple nodes?.
> The First option is also not working now. It was working and now it's
> failing.
>
> Regards,
> Charlin
>
>
>
>
> On Wed, 1 Jun 2022 at 11:28, Pavel Tupitsyn <pt...@apache.org> wrote:
>
>> You certainly have other nodes running somewhere, and they get picked up
>> when you run examples, yielding incorrect results.
>> Try adding this right after Ignition.Start:
>>
>> var nodeCount = ignite.GetCluster().GetNodes().Count;
>> if (nodeCount > 1)
>> {
>>     throw new Exception("Unexpected node count: " + nodeCount);
>> }
>>
>>
>> On Tue, May 31, 2022 at 5:30 PM Charlin S <Ch...@hotelhub.com> wrote:
>>
>>> Hi,
>>> Thanks for the reply,
>>> I have tried the same code and got an error
>>> message org.apache.ignite.IgniteCheckedException: Failed to finish
>>> operation (too many remaps): 32
>>> tried in Ignite 2.10.0 and 2.13.0 both are having same problem.
>>>
>>> Regards,
>>> Charlin
>>>
>>>
>>>
>>> On Tue, 31 May 2022 at 16:27, Pavel Tupitsyn <pt...@apache.org>
>>> wrote:
>>>
>>>> If you run multiple nodes in the cluster, the receiver may be invoked
>>>> on another node, so the breakpoint is not reached.
>>>> I've simplified yor code a bit and it works as expected:
>>>> https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c
>>>>
>>>> On Mon, May 30, 2022 at 11:22 AM Charlin S <Ch...@hotelhub.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> Thanks for the reply,
>>>>> First option working for me by creating a cache instance with expiry
>>>>> policy just before datastreamer.
>>>>> My curiosity with datastreamer and receiver also.
>>>>>
>>>>> no build error with new changes, but application not working as
>>>>> expected. added breakpoint in MyStreamReceiver but not reached
>>>>>
>>>>> using (var cacheDataStreamer =
>>>>> DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string,
>>>>> T>(cacheName))
>>>>>                 {
>>>>>                     cacheDataStreamer.AllowOverwrite = true;
>>>>>                     cacheDataStreamer.Receiver = new
>>>>> MyStreamReceiver<T>();
>>>>>                     foreach (var item in data)
>>>>>                     {
>>>>>                         string cacheKey = item.Key;
>>>>>                         int index = cacheKey.IndexOf("Model:");
>>>>>                         if (index > 0)
>>>>>                             cacheKey = cacheKey.Insert(index +
>>>>> "Model:".Length, CacheKeyDefault);
>>>>>                         else
>>>>>                             cacheKey = CacheKeyDefault + cacheKey;
>>>>>                          cacheDataStreamer.AddData(cacheName + ":" +
>>>>> cacheKey, item.Value);
>>>>>
>>>>>
>>>>>                     }
>>>>>                     cacheDataStreamer.Flush();
>>>>>                 }
>>>>>
>>>>> public  class MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>>>     {
>>>>>         public void Receive(ICache<string, T> cache,
>>>>> ICollection<ICacheEntry<string, T>> entries)
>>>>>         {
>>>>>             foreach (var entry in entries)
>>>>>             {
>>>>>                 cache.WithExpiryPolicy(new
>>>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>>>> entry.Value);
>>>>>             }
>>>>>         }
>>>>>     }
>>>>>
>>>>> Regards,
>>>>> Charlin
>>>>>
>>>>>
>>>>> On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <pt...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> 1. You can set expiry policy in CacheConfiguration so that entries
>>>>>> inserted with DataStreamer are also affected,
>>>>>> see
>>>>>> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy
>>>>>>
>>>>>> 2. Compiler error says it all. Generic arguments don't match.
>>>>>> Try changing
>>>>>> MyStreamReceiver : IStreamReceiver<string, object>
>>>>>> to
>>>>>> MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>>>>
>>>>>> On Thu, May 26, 2022 at 5:24 PM Charlin S <Ch...@hotelhub.com>
>>>>>> wrote:
>>>>>>
>>>>>>> We have a requirement to set data to expire after some time.
>>>>>>> I set the WithExpiryPolicy for cache instance, but the data added by
>>>>>>> GetDataStreamer does not expire, due to it returning a new instance with
>>>>>>> default policies.
>>>>>>> So I am trying to use IStreamReceiver but not able to build the
>>>>>>> solution.
>>>>>>>
>>>>>>> IStreamReceiver  Code:
>>>>>>>  public  class MyStreamReceiver : IStreamReceiver<string, object>
>>>>>>>     {
>>>>>>>         public void Receive(ICache<string, object> cache,
>>>>>>> ICollection<ICacheEntry<string, object>> entries)
>>>>>>>         {
>>>>>>>             foreach (var entry in entries)
>>>>>>>             {
>>>>>>>                 cache.WithExpiryPolicy(new
>>>>>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>>>>>> entry.Value);
>>>>>>>             }
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>> Datastreamer code error
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> How to implement IStreamReceiver. Please help me on this.
>>>>>>> Regards,
>>>>>>> Charlin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Re: C# GetDataStreamer with WithExpiryPolicy not working.

Posted by Charlin S <Ch...@hotelhub.com>.
Hi,
I am using 3(2 server and 1 client) node cluster, why can we not run this
with multiple nodes?.
The First option is also not working now. It was working and now it's
failing.

Regards,
Charlin




On Wed, 1 Jun 2022 at 11:28, Pavel Tupitsyn <pt...@apache.org> wrote:

> You certainly have other nodes running somewhere, and they get picked up
> when you run examples, yielding incorrect results.
> Try adding this right after Ignition.Start:
>
> var nodeCount = ignite.GetCluster().GetNodes().Count;
> if (nodeCount > 1)
> {
>     throw new Exception("Unexpected node count: " + nodeCount);
> }
>
>
> On Tue, May 31, 2022 at 5:30 PM Charlin S <Ch...@hotelhub.com> wrote:
>
>> Hi,
>> Thanks for the reply,
>> I have tried the same code and got an error
>> message org.apache.ignite.IgniteCheckedException: Failed to finish
>> operation (too many remaps): 32
>> tried in Ignite 2.10.0 and 2.13.0 both are having same problem.
>>
>> Regards,
>> Charlin
>>
>>
>>
>> On Tue, 31 May 2022 at 16:27, Pavel Tupitsyn <pt...@apache.org>
>> wrote:
>>
>>> If you run multiple nodes in the cluster, the receiver may be invoked on
>>> another node, so the breakpoint is not reached.
>>> I've simplified yor code a bit and it works as expected:
>>> https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c
>>>
>>> On Mon, May 30, 2022 at 11:22 AM Charlin S <Ch...@hotelhub.com>
>>> wrote:
>>>
>>>> Hi,
>>>> Thanks for the reply,
>>>> First option working for me by creating a cache instance with expiry
>>>> policy just before datastreamer.
>>>> My curiosity with datastreamer and receiver also.
>>>>
>>>> no build error with new changes, but application not working as
>>>> expected. added breakpoint in MyStreamReceiver but not reached
>>>>
>>>> using (var cacheDataStreamer =
>>>> DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string,
>>>> T>(cacheName))
>>>>                 {
>>>>                     cacheDataStreamer.AllowOverwrite = true;
>>>>                     cacheDataStreamer.Receiver = new
>>>> MyStreamReceiver<T>();
>>>>                     foreach (var item in data)
>>>>                     {
>>>>                         string cacheKey = item.Key;
>>>>                         int index = cacheKey.IndexOf("Model:");
>>>>                         if (index > 0)
>>>>                             cacheKey = cacheKey.Insert(index +
>>>> "Model:".Length, CacheKeyDefault);
>>>>                         else
>>>>                             cacheKey = CacheKeyDefault + cacheKey;
>>>>                          cacheDataStreamer.AddData(cacheName + ":" +
>>>> cacheKey, item.Value);
>>>>
>>>>
>>>>                     }
>>>>                     cacheDataStreamer.Flush();
>>>>                 }
>>>>
>>>> public  class MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>>     {
>>>>         public void Receive(ICache<string, T> cache,
>>>> ICollection<ICacheEntry<string, T>> entries)
>>>>         {
>>>>             foreach (var entry in entries)
>>>>             {
>>>>                 cache.WithExpiryPolicy(new
>>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>>> entry.Value);
>>>>             }
>>>>         }
>>>>     }
>>>>
>>>> Regards,
>>>> Charlin
>>>>
>>>>
>>>> On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <pt...@apache.org>
>>>> wrote:
>>>>
>>>>> 1. You can set expiry policy in CacheConfiguration so that entries
>>>>> inserted with DataStreamer are also affected,
>>>>> see
>>>>> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy
>>>>>
>>>>> 2. Compiler error says it all. Generic arguments don't match.
>>>>> Try changing
>>>>> MyStreamReceiver : IStreamReceiver<string, object>
>>>>> to
>>>>> MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>>>
>>>>> On Thu, May 26, 2022 at 5:24 PM Charlin S <Ch...@hotelhub.com>
>>>>> wrote:
>>>>>
>>>>>> We have a requirement to set data to expire after some time.
>>>>>> I set the WithExpiryPolicy for cache instance, but the data added by
>>>>>> GetDataStreamer does not expire, due to it returning a new instance with
>>>>>> default policies.
>>>>>> So I am trying to use IStreamReceiver but not able to build the
>>>>>> solution.
>>>>>>
>>>>>> IStreamReceiver  Code:
>>>>>>  public  class MyStreamReceiver : IStreamReceiver<string, object>
>>>>>>     {
>>>>>>         public void Receive(ICache<string, object> cache,
>>>>>> ICollection<ICacheEntry<string, object>> entries)
>>>>>>         {
>>>>>>             foreach (var entry in entries)
>>>>>>             {
>>>>>>                 cache.WithExpiryPolicy(new
>>>>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>>>>> entry.Value);
>>>>>>             }
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>> Datastreamer code error
>>>>>> [image: image.png]
>>>>>>
>>>>>> How to implement IStreamReceiver. Please help me on this.
>>>>>> Regards,
>>>>>> Charlin
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Re: C# GetDataStreamer with WithExpiryPolicy not working.

Posted by Pavel Tupitsyn <pt...@apache.org>.
You certainly have other nodes running somewhere, and they get picked up
when you run examples, yielding incorrect results.
Try adding this right after Ignition.Start:

var nodeCount = ignite.GetCluster().GetNodes().Count;
if (nodeCount > 1)
{
    throw new Exception("Unexpected node count: " + nodeCount);
}


On Tue, May 31, 2022 at 5:30 PM Charlin S <Ch...@hotelhub.com> wrote:

> Hi,
> Thanks for the reply,
> I have tried the same code and got an error
> message org.apache.ignite.IgniteCheckedException: Failed to finish
> operation (too many remaps): 32
> tried in Ignite 2.10.0 and 2.13.0 both are having same problem.
>
> Regards,
> Charlin
>
>
>
> On Tue, 31 May 2022 at 16:27, Pavel Tupitsyn <pt...@apache.org> wrote:
>
>> If you run multiple nodes in the cluster, the receiver may be invoked on
>> another node, so the breakpoint is not reached.
>> I've simplified yor code a bit and it works as expected:
>> https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c
>>
>> On Mon, May 30, 2022 at 11:22 AM Charlin S <Ch...@hotelhub.com>
>> wrote:
>>
>>> Hi,
>>> Thanks for the reply,
>>> First option working for me by creating a cache instance with expiry
>>> policy just before datastreamer.
>>> My curiosity with datastreamer and receiver also.
>>>
>>> no build error with new changes, but application not working as
>>> expected. added breakpoint in MyStreamReceiver but not reached
>>>
>>> using (var cacheDataStreamer =
>>> DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string,
>>> T>(cacheName))
>>>                 {
>>>                     cacheDataStreamer.AllowOverwrite = true;
>>>                     cacheDataStreamer.Receiver = new
>>> MyStreamReceiver<T>();
>>>                     foreach (var item in data)
>>>                     {
>>>                         string cacheKey = item.Key;
>>>                         int index = cacheKey.IndexOf("Model:");
>>>                         if (index > 0)
>>>                             cacheKey = cacheKey.Insert(index +
>>> "Model:".Length, CacheKeyDefault);
>>>                         else
>>>                             cacheKey = CacheKeyDefault + cacheKey;
>>>                          cacheDataStreamer.AddData(cacheName + ":" +
>>> cacheKey, item.Value);
>>>
>>>
>>>                     }
>>>                     cacheDataStreamer.Flush();
>>>                 }
>>>
>>> public  class MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>     {
>>>         public void Receive(ICache<string, T> cache,
>>> ICollection<ICacheEntry<string, T>> entries)
>>>         {
>>>             foreach (var entry in entries)
>>>             {
>>>                 cache.WithExpiryPolicy(new
>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>> entry.Value);
>>>             }
>>>         }
>>>     }
>>>
>>> Regards,
>>> Charlin
>>>
>>>
>>> On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <pt...@apache.org>
>>> wrote:
>>>
>>>> 1. You can set expiry policy in CacheConfiguration so that entries
>>>> inserted with DataStreamer are also affected,
>>>> see
>>>> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy
>>>>
>>>> 2. Compiler error says it all. Generic arguments don't match.
>>>> Try changing
>>>> MyStreamReceiver : IStreamReceiver<string, object>
>>>> to
>>>> MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>>
>>>> On Thu, May 26, 2022 at 5:24 PM Charlin S <Ch...@hotelhub.com>
>>>> wrote:
>>>>
>>>>> We have a requirement to set data to expire after some time.
>>>>> I set the WithExpiryPolicy for cache instance, but the data added by
>>>>> GetDataStreamer does not expire, due to it returning a new instance with
>>>>> default policies.
>>>>> So I am trying to use IStreamReceiver but not able to build the
>>>>> solution.
>>>>>
>>>>> IStreamReceiver  Code:
>>>>>  public  class MyStreamReceiver : IStreamReceiver<string, object>
>>>>>     {
>>>>>         public void Receive(ICache<string, object> cache,
>>>>> ICollection<ICacheEntry<string, object>> entries)
>>>>>         {
>>>>>             foreach (var entry in entries)
>>>>>             {
>>>>>                 cache.WithExpiryPolicy(new
>>>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>>>> entry.Value);
>>>>>             }
>>>>>         }
>>>>>     }
>>>>>
>>>>> Datastreamer code error
>>>>> [image: image.png]
>>>>>
>>>>> How to implement IStreamReceiver. Please help me on this.
>>>>> Regards,
>>>>> Charlin
>>>>>
>>>>>
>>>>>
>>>>>

Re: C# GetDataStreamer with WithExpiryPolicy not working.

Posted by Charlin S <Ch...@hotelhub.com>.
Hi,
Thanks for the reply,
I have tried the same code and got an error
message org.apache.ignite.IgniteCheckedException: Failed to finish
operation (too many remaps): 32
tried in Ignite 2.10.0 and 2.13.0 both are having same problem.

Regards,
Charlin



On Tue, 31 May 2022 at 16:27, Pavel Tupitsyn <pt...@apache.org> wrote:

> If you run multiple nodes in the cluster, the receiver may be invoked on
> another node, so the breakpoint is not reached.
> I've simplified yor code a bit and it works as expected:
> https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c
>
> On Mon, May 30, 2022 at 11:22 AM Charlin S <Ch...@hotelhub.com> wrote:
>
>> Hi,
>> Thanks for the reply,
>> First option working for me by creating a cache instance with expiry
>> policy just before datastreamer.
>> My curiosity with datastreamer and receiver also.
>>
>> no build error with new changes, but application not working as expected.
>> added breakpoint in MyStreamReceiver but not reached
>>
>> using (var cacheDataStreamer =
>> DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string,
>> T>(cacheName))
>>                 {
>>                     cacheDataStreamer.AllowOverwrite = true;
>>                     cacheDataStreamer.Receiver = new
>> MyStreamReceiver<T>();
>>                     foreach (var item in data)
>>                     {
>>                         string cacheKey = item.Key;
>>                         int index = cacheKey.IndexOf("Model:");
>>                         if (index > 0)
>>                             cacheKey = cacheKey.Insert(index +
>> "Model:".Length, CacheKeyDefault);
>>                         else
>>                             cacheKey = CacheKeyDefault + cacheKey;
>>                          cacheDataStreamer.AddData(cacheName + ":" +
>> cacheKey, item.Value);
>>
>>
>>                     }
>>                     cacheDataStreamer.Flush();
>>                 }
>>
>> public  class MyStreamReceiver<T> : IStreamReceiver<string, T>
>>     {
>>         public void Receive(ICache<string, T> cache,
>> ICollection<ICacheEntry<string, T>> entries)
>>         {
>>             foreach (var entry in entries)
>>             {
>>                 cache.WithExpiryPolicy(new
>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>> entry.Value);
>>             }
>>         }
>>     }
>>
>> Regards,
>> Charlin
>>
>>
>> On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <pt...@apache.org>
>> wrote:
>>
>>> 1. You can set expiry policy in CacheConfiguration so that entries
>>> inserted with DataStreamer are also affected,
>>> see
>>> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy
>>>
>>> 2. Compiler error says it all. Generic arguments don't match.
>>> Try changing
>>> MyStreamReceiver : IStreamReceiver<string, object>
>>> to
>>> MyStreamReceiver<T> : IStreamReceiver<string, T>
>>>
>>> On Thu, May 26, 2022 at 5:24 PM Charlin S <Ch...@hotelhub.com>
>>> wrote:
>>>
>>>> We have a requirement to set data to expire after some time.
>>>> I set the WithExpiryPolicy for cache instance, but the data added by
>>>> GetDataStreamer does not expire, due to it returning a new instance with
>>>> default policies.
>>>> So I am trying to use IStreamReceiver but not able to build the
>>>> solution.
>>>>
>>>> IStreamReceiver  Code:
>>>>  public  class MyStreamReceiver : IStreamReceiver<string, object>
>>>>     {
>>>>         public void Receive(ICache<string, object> cache,
>>>> ICollection<ICacheEntry<string, object>> entries)
>>>>         {
>>>>             foreach (var entry in entries)
>>>>             {
>>>>                 cache.WithExpiryPolicy(new
>>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>>> entry.Value);
>>>>             }
>>>>         }
>>>>     }
>>>>
>>>> Datastreamer code error
>>>> [image: image.png]
>>>>
>>>> How to implement IStreamReceiver. Please help me on this.
>>>> Regards,
>>>> Charlin
>>>>
>>>>
>>>>
>>>>

Re: C# GetDataStreamer with WithExpiryPolicy not working.

Posted by Pavel Tupitsyn <pt...@apache.org>.
If you run multiple nodes in the cluster, the receiver may be invoked on
another node, so the breakpoint is not reached.
I've simplified yor code a bit and it works as expected:
https://gist.github.com/ptupitsyn/67c984e8ea44da6e2a42efdfc38df53c

On Mon, May 30, 2022 at 11:22 AM Charlin S <Ch...@hotelhub.com> wrote:

> Hi,
> Thanks for the reply,
> First option working for me by creating a cache instance with expiry
> policy just before datastreamer.
> My curiosity with datastreamer and receiver also.
>
> no build error with new changes, but application not working as expected.
> added breakpoint in MyStreamReceiver but not reached
>
> using (var cacheDataStreamer =
> DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string,
> T>(cacheName))
>                 {
>                     cacheDataStreamer.AllowOverwrite = true;
>                     cacheDataStreamer.Receiver = new
> MyStreamReceiver<T>();
>                     foreach (var item in data)
>                     {
>                         string cacheKey = item.Key;
>                         int index = cacheKey.IndexOf("Model:");
>                         if (index > 0)
>                             cacheKey = cacheKey.Insert(index +
> "Model:".Length, CacheKeyDefault);
>                         else
>                             cacheKey = CacheKeyDefault + cacheKey;
>                          cacheDataStreamer.AddData(cacheName + ":" +
> cacheKey, item.Value);
>
>
>                     }
>                     cacheDataStreamer.Flush();
>                 }
>
> public  class MyStreamReceiver<T> : IStreamReceiver<string, T>
>     {
>         public void Receive(ICache<string, T> cache,
> ICollection<ICacheEntry<string, T>> entries)
>         {
>             foreach (var entry in entries)
>             {
>                 cache.WithExpiryPolicy(new
> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
> entry.Value);
>             }
>         }
>     }
>
> Regards,
> Charlin
>
>
> On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <pt...@apache.org> wrote:
>
>> 1. You can set expiry policy in CacheConfiguration so that entries
>> inserted with DataStreamer are also affected,
>> see
>> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy
>>
>> 2. Compiler error says it all. Generic arguments don't match.
>> Try changing
>> MyStreamReceiver : IStreamReceiver<string, object>
>> to
>> MyStreamReceiver<T> : IStreamReceiver<string, T>
>>
>> On Thu, May 26, 2022 at 5:24 PM Charlin S <Ch...@hotelhub.com> wrote:
>>
>>> We have a requirement to set data to expire after some time.
>>> I set the WithExpiryPolicy for cache instance, but the data added by
>>> GetDataStreamer does not expire, due to it returning a new instance with
>>> default policies.
>>> So I am trying to use IStreamReceiver but not able to build the solution.
>>>
>>> IStreamReceiver  Code:
>>>  public  class MyStreamReceiver : IStreamReceiver<string, object>
>>>     {
>>>         public void Receive(ICache<string, object> cache,
>>> ICollection<ICacheEntry<string, object>> entries)
>>>         {
>>>             foreach (var entry in entries)
>>>             {
>>>                 cache.WithExpiryPolicy(new
>>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>>> entry.Value);
>>>             }
>>>         }
>>>     }
>>>
>>> Datastreamer code error
>>> [image: image.png]
>>>
>>> How to implement IStreamReceiver. Please help me on this.
>>> Regards,
>>> Charlin
>>>
>>>
>>>
>>>

Re: C# GetDataStreamer with WithExpiryPolicy not working.

Posted by Charlin S <Ch...@hotelhub.com>.
Hi,
Thanks for the reply,
First option working for me by creating a cache instance with expiry policy
just before datastreamer.
My curiosity with datastreamer and receiver also.

no build error with new changes, but application not working as expected.
added breakpoint in MyStreamReceiver but not reached

using (var cacheDataStreamer =
DynamicIgniteInstance.Instance.InstanceObject.GetDataStreamer<string,
T>(cacheName))
                {
                    cacheDataStreamer.AllowOverwrite = true;
                    cacheDataStreamer.Receiver = new MyStreamReceiver<T>();
                    foreach (var item in data)
                    {
                        string cacheKey = item.Key;
                        int index = cacheKey.IndexOf("Model:");
                        if (index > 0)
                            cacheKey = cacheKey.Insert(index +
"Model:".Length, CacheKeyDefault);
                        else
                            cacheKey = CacheKeyDefault + cacheKey;
                         cacheDataStreamer.AddData(cacheName + ":" +
cacheKey, item.Value);


                    }
                    cacheDataStreamer.Flush();
                }

public  class MyStreamReceiver<T> : IStreamReceiver<string, T>
    {
        public void Receive(ICache<string, T> cache,
ICollection<ICacheEntry<string, T>> entries)
        {
            foreach (var entry in entries)
            {
                cache.WithExpiryPolicy(new
ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
entry.Value);
            }
        }
    }

Regards,
Charlin


On Thu, 26 May 2022 at 20:17, Pavel Tupitsyn <pt...@apache.org> wrote:

> 1. You can set expiry policy in CacheConfiguration so that entries
> inserted with DataStreamer are also affected,
> see
> https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy
>
> 2. Compiler error says it all. Generic arguments don't match.
> Try changing
> MyStreamReceiver : IStreamReceiver<string, object>
> to
> MyStreamReceiver<T> : IStreamReceiver<string, T>
>
> On Thu, May 26, 2022 at 5:24 PM Charlin S <Ch...@hotelhub.com> wrote:
>
>> We have a requirement to set data to expire after some time.
>> I set the WithExpiryPolicy for cache instance, but the data added by
>> GetDataStreamer does not expire, due to it returning a new instance with
>> default policies.
>> So I am trying to use IStreamReceiver but not able to build the solution.
>>
>> IStreamReceiver  Code:
>>  public  class MyStreamReceiver : IStreamReceiver<string, object>
>>     {
>>         public void Receive(ICache<string, object> cache,
>> ICollection<ICacheEntry<string, object>> entries)
>>         {
>>             foreach (var entry in entries)
>>             {
>>                 cache.WithExpiryPolicy(new
>> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
>> entry.Value);
>>             }
>>         }
>>     }
>>
>> Datastreamer code error
>> [image: image.png]
>>
>> How to implement IStreamReceiver. Please help me on this.
>> Regards,
>> Charlin
>>
>>
>>
>>

Re: C# GetDataStreamer with WithExpiryPolicy not working.

Posted by Pavel Tupitsyn <pt...@apache.org>.
1. You can set expiry policy in CacheConfiguration so that entries inserted
with DataStreamer are also affected,
see
https://stackoverflow.com/questions/63463142/apache-ignite-net-getdatastreamer-withexpirypolicy

2. Compiler error says it all. Generic arguments don't match.
Try changing
MyStreamReceiver : IStreamReceiver<string, object>
to
MyStreamReceiver<T> : IStreamReceiver<string, T>

On Thu, May 26, 2022 at 5:24 PM Charlin S <Ch...@hotelhub.com> wrote:

> We have a requirement to set data to expire after some time.
> I set the WithExpiryPolicy for cache instance, but the data added by
> GetDataStreamer does not expire, due to it returning a new instance with
> default policies.
> So I am trying to use IStreamReceiver but not able to build the solution.
>
> IStreamReceiver  Code:
>  public  class MyStreamReceiver : IStreamReceiver<string, object>
>     {
>         public void Receive(ICache<string, object> cache,
> ICollection<ICacheEntry<string, object>> entries)
>         {
>             foreach (var entry in entries)
>             {
>                 cache.WithExpiryPolicy(new
> ExpiryPolicy(TimeSpan.FromSeconds(600), null, null)).Put(entry.Key,
> entry.Value);
>             }
>         }
>     }
>
> Datastreamer code error
> [image: image.png]
>
> How to implement IStreamReceiver. Please help me on this.
> Regards,
> Charlin
>
>
>
>