You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@geode.apache.org by Barry Oglesby <bo...@pivotal.io> on 2018/11/01 17:40:30 UTC

Re: Creating Regions Dynamically using Functions

Marcus,

You might want to also co-locate all the requests for a single session in
the same partitioned region bucket. This will allow you to use a function
to query all the requests for a single session id all on one server rather
than having the query be spread among the buckets on all the servers.

To do that you'll need a PartitionResolver (
http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/PartitionResolver.html
)

If you define your key like:

public class SessionRequestKey implements DataSerializable {
  private String sessionId;
  private String requestId;
...

Then, the PartitionResolver.getRoutingObject method would look like:

public Object
getRoutingObject(EntryOperation<SessionRequestKey,SessionRequest>
operation) {
  return operation.getKey().getSessionId();
}

That'll cause the hashCode of the sessionId to be used to route the entry,
so all SessionRequestKeys with the same sessionId will be routed to the
same bucket.

Here is an xml version of defining the PartitionResolver:

<region name="SessionRequest">
  <region-attributes refid="PARTITION_REDUNDANT">
    <partition-attributes>
      <partition-resolver>
        <class-name>SessionRequestPartitionResolver</class-name>
      </partition-resolver>
    </partition-attributes>
  </region-attributes>
</region>

Then, you can define a function with an execute method like below to
execute the query in a specific context (bucket):

public void execute(FunctionContext context) {
  RegionFunctionContext rfc = (RegionFunctionContext) context;
  SessionRequestKey key = (SessionRequestKey)
rfc.getFilter().iterator().next();
  try {
    Query query = this.cache.getQueryService().newQuery("select * from
/SessionRequest.keySet key where key.sessionId = $1");
    SelectResults results = (SelectResults) this.query.execute(rfc, new
String[] {sessionId});
    context.getResultSender().lastResult(results);
  } catch (Exception e) {
    context.getResultSender().sendException(e);
  }
}

The function can be called like:

Object result = FunctionService
  .onRegion(region)
  .withFilter(Collections.singleton(new SessionRequestKey("session_10")))
  .execute("SessionRequestQueryFunction")
  .getResult();

The SessionRequestKey with the empty requestId will be routed to the server
that contains the bucket for that sessionId.

I'm not sure how you're processing your data, but you could also just
process it right in the function instead of returning it to the client. If
you're going to process the data in that function, make sure to have
optimizeForWrite return true instead of the default false. That will cause
the function to be executed on the primary bucket.

Thanks,
Barry Oglesby



On Wed, Oct 31, 2018 at 11:53 AM Marcus Dushshantha Chandradasa <
dushshantha.chandaradasa@gmail.com> wrote:

> Hi Udo,
>
> This is perfect. This is exactly what I was looking for. Thank you very
> much.
>
> Thanks
> Marcus
>
> On Tue, Oct 30, 2018 at 11:54 PM Udo Kohlmeyer <ud...@apache.org> wrote:
>
>> Hi there Marcus,
>>
>> It is actually a blessing that Mike asked the question of what you are
>> trying to achieve.
>>
>> So, basically, create a region for every sessionId does seem like the
>> logical answer, but imo it is most likely the lesser of the two options. I
>> could go into details, but with higher number of sessions, you will create
>> more regions, which is not something I would recommend.
>>
>> Stick with option 1. What is a lesser known feature is that the key can
>> be composite. It is not need to be limited to only primitive types. You
>> could for example have a key like the following:
>> class CompositeKey implements DataSerializable{
>> String sessionId;
>> int requestID;
>> }
>>
>> So two things here... Data serializable is a fast and efficient
>> serialization format provided by Geode (see
>> https://geode.apache.org/docs/guide/17/developing/data_serialization/gemfire_data_serialization.html
>> )
>>
>> So now you store the sessionId and the requestID. Of course you would
>> have to override the hashcode and equals methods, but that would simple.
>>
>> What you gain now is: quick gets for the sessionId+requestId combo.
>> Queries can also now gain an advantage, as your query could be "select
>> e.value from /SessionRegion.entrySet e where e.key.sessionid = "12356" and
>> key.requestID=12"
>>
>> You could also have an index the composite, to improve lookup performance.
>>
>> createIndex("sessionId", "e.key.sessionID", "/SessionRegion.entrySet e")
>> createIndex("requestId", "e.key.requestID", "/SessionRegion.entrySet e")
>>
>> Maybe you can try this approach.
>>
>> --Udo
>>
>> On 10/30/18 17:17, Marcus Dushshantha Chandradasa wrote:
>>
>> Hi Mike,
>>
>> Thanks for asking that question. I am working on a project where I need
>> to persist a series of work coming to a given docker based worker node. the
>> type of work is a sequential list with a unique id for each item in the
>> list. And these work requests come in separately in REST calls with Session
>> Id to map it to the worker.  If the node that is processing a given series
>> of work dies midway, I need to get the series and send it to a new worker
>> node. So, the design I had in mind is to save the work based on the Session
>> Id which will represent a unique host (say an IP address) as a Map based on
>> Request Id. Once a given series of work is completed, I remove the entry
>> for that particular Session. If it broke midway, I retrieve the Map from
>> Geode, append the new work to the Map, forward it to a brand new worker and
>> the process starts over as a brand new job. A given record will look like
>> below.
>>
>> { Session_Id : 23123132,      <------ Key
>>      [ { Request_Id : 1,     <----- Another embedded key
>>          Work : {some payload here} },  <---- Value
>>        { Request_Id : 2,
>>          Work : {some payload here} },
>>       { Request_Id : 3,
>>          Work : {some payload here} }
>>      ]
>> }
>>
>> Originally I thought I would use just one region and use Session_Id as
>> the key, and save a serialized object to represent the Map. Every time a
>> new work comes in for a session, I retrieve the object from the region,
>> append the new work to the map and resave it to Geode. This approach will
>> require a read and a write every time a new work comes in for one session.
>> In order to reduce the read times, I am exploring other alternatives as
>> below.
>>
>> Alternative 1.
>>
>> Use Session_Id + Reques_Id as the key and Work as the value. That will
>> look like this.
>>
>> [ {ID : 23123132_1,      <------ Key
>>     Work : {some payload here} },  <---- Value
>>    {ID : 23123132_2     <------ Key
>>     Work : {some payload here} },  <---- Value
>>    {ID : 23123132_3,      <------ Key
>>     Work : {some payload here} } <---- Value
>> ]
>>
>> The disadvantage of this approach is that when I read to retrieve a list
>> of works for a given session, I have to do a "Like Session_ID%" operation,
>> which will force Geode to go search a large number of records to get my
>> data set.
>>
>> Alternative 2:
>>
>> Create a Region for every Session_Id and save Work in the respective
>> Region using the Request_Id as the key and Work as the Value. This way,
>> When I read, I just get all the data from the Region. When I write, I just
>> blindly insert to the respective Region. Once work is done, I just drop the
>> Region. Data will look like below
>>
>> Region 23123132:
>>      [ {Request_Id : 1,      <------ Key
>>         Work : {some payload here} },  <---- Value
>>       {Request_Id : 2     <------ Key
>>        Work : {some payload here} },  <---- Value
>>       {Request_Id : 3,      <------ Key
>>        Work : {some payload here} } <---- Value
>>     ]
>>
>> Region 12312312:
>>      [ {Request_Id : 1,      <------ Key
>>         Work : {some payload here} },  <---- Value
>>       {Request_Id : 2     <------ Key
>>        Work : {some payload here} },  <---- Value
>>       {Request_Id : 3,      <------ Key
>>        Work : {some payload here} } <---- Value
>>     ]
>>
>> Alternative 2 seems to be the most efficient approach in terms of
>> read/write. The only overhead is to dynamically create a Region for every
>> session.
>>
>> This is where I am right now. I am trying to see how easy/efficient to
>> create Regions dynamically.
>>
>> Hope this helps
>>
>> Marcus
>>
>> On Tue, Oct 30, 2018, 6:56 PM Michael Stolz <ms...@pivotal.io> wrote:
>>
>>> While everyone is helping you get to your goal of creating regions
>>> dynamically,  I'd like to learn why dynamic region creation is important.
>>> Could you please explain that?
>>>
>>> --
>>> Mike Stolz
>>> Principal Engineer - Gemfire Product Manager
>>> Mobile: 631-835-4771
>>>
>>> On Oct 30, 2018 6:12 PM, "Udo Kohlmeyer" <ud...@apache.org> wrote:
>>>
>>>> Hi there Marcus,
>>>>
>>>> It seems the default pool is only create once a region operation is
>>>> done.
>>>>
>>>> In order to get around this, you can just do the following:
>>>> ClientCache cache = new
>>>> ClientCacheFactory().set("log-level","WARN").create();
>>>> Pool pool =
>>>> PoolManager.createFactory().addLocator("localhost",10334).create("MyCustomPool");
>>>>
>>>> And then you can replace the FunctionService call's poolname with
>>>> "MyCustomPool".
>>>>
>>>> The Spring Data Geode example could have been just as simple.. If you
>>>> want I can send you an example for that aswell.
>>>>
>>>> --Udo
>>>>
>>>> On 10/30/18 14:58, Marcus Dushshantha Chandradasa wrote:
>>>>
>>>> Thanks for the responses.
>>>>
>>>> I checked the versions. I was playing around with spring-data-geode as
>>>> well, maybe that might have had a different version. now I created a
>>>> separate project with just geode core 1.7 dependency and the version
>>>> problem seems to be gone away.
>>>>
>>>> Now, when I use a Cache like earlier, I connect to the server but get
>>>> the following error which is aligned with Udo's comment.
>>>>
>>>> Exception in thread "main"
>>>> org.apache.geode.cache.execute.FunctionException: The cache was not a
>>>> client cach
>>>>
>>>> So I changed the Cache to a ClientCache and this time I get the
>>>> following error.
>>>>
>>>> Code:
>>>>
>>>> ClientCache cache = new ClientCacheFactory()
>>>>                 .addPoolLocator("localhost", 10334)
>>>>                 .set("log-level", "WARN")
>>>>                 .create();
>>>>
>>>> Error :
>>>> Exception in thread "main" java.lang.UnsupportedOperationException:
>>>> operation is not supported on a client cache
>>>> at
>>>> org.apache.geode.internal.cache.GemFireCacheImpl.throwIfClient(GemFireCacheImpl.java:5354)
>>>> at
>>>> org.apache.geode.internal.cache.GemFireCacheImpl.createRegionFactory(GemFireCacheImpl.java:4629)
>>>> at
>>>> com.mmodal.Geode.CreateRegionFunction.createRegionAttributesMetadataRegion(CreateRegionFunction.java:74)
>>>> at
>>>> com.mmodal.Geode.CreateRegionFunction.<init>(CreateRegionFunction.java:24)
>>>>
>>>> Also, When I tried using PoolManager.find("DEFAULT") for the
>>>> onServers(), I get the below error.
>>>> Exception in thread "main"
>>>> org.apache.geode.cache.execute.FunctionException: Pool instance  passed is
>>>> null
>>>> at
>>>> org.apache.geode.cache.execute.internal.FunctionServiceManager.onServers(FunctionServiceManager.java:167)
>>>> at
>>>> org.apache.geode.cache.execute.FunctionService.onServers(FunctionService.java:95)
>>>> at Main.main(Main.java:31)
>>>>
>>>> Marcus
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Oct 30, 2018 at 4:45 PM Udo Kohlmeyer <ud...@apache.org> wrote:
>>>>
>>>>> From the exception, it seems that you are not using the same version
>>>>> of Geode. Could you look into that...
>>>>>
>>>>> Also, when creating a client, you use "ClientCacheFactory" and define
>>>>> a pool pointing at the locators.
>>>>> https://geode.apache.org/docs/guide/16/topologies_and_comm/cs_configuration/client_server_example_configurations.html
>>>>>
>>>>> ClientCache c = new ClientCacheFactory().addPoolLocator(host, port).create();
>>>>>
>>>>> FunctionService.onServers(PoolManager.find("DEFAULT")).execute("FunctionName").getResult() //I think the default pool created is called "DEFAULT".
>>>>>
>>>>> That should hopefully connect to your servers, where the functions
>>>>> have been deployed, invoke the function and return the result back to the
>>>>> client.
>>>>>
>>>>> --Udo
>>>>>
>>>>> On 10/30/18 13:13, Marcus Dushshantha Chandradasa wrote:
>>>>>
>>>>> Hi All,
>>>>>
>>>>> I am trying to figure out how to programmatically create Regions on a
>>>>> Geode Cluster. I followed below links but without any success.
>>>>>
>>>>>
>>>>> https://geode.apache.org/docs/guide/16/developing/region_options/dynamic_region_creation.html
>>>>>
>>>>> https://stackoverflow.com/questions/50833166/cannot-create-region-dynamically-from-client-in-geode/50850584
>>>>>
>>>>> So far, I have the CreateRegionFunction and CreateRegionCacheListener copied
>>>>> and JARed them together. I am referig them in my client as well as added to
>>>>> the classpath for Geode Cluster. Below is my client code. I am receiving
>>>>> the below error when I try to execute. Any help would be really
>>>>> appreciated.
>>>>>
>>>>> Error :
>>>>>
>>>>> SEVERE: Servlet.service() for servlet [dispatcherServlet] in context
>>>>> with path [] threw exception [Request processing failed; nested exception
>>>>> is org.apache.geode.SystemConnectException: Rejecting the attempt of a
>>>>> member using an older version of the product to join the distributed
>>>>> system] with root cause
>>>>> org.apache.geode.SystemConnectException: Rejecting the attempt of a
>>>>> member using an older version of the product to join the distributed system
>>>>> at
>>>>> org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.attemptToJoin(GMSJoinLeave.java:433)
>>>>> at
>>>>> org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave.join(GMSJoinLeave.java:329)
>>>>> at
>>>>> org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager.join(GMSMembershipManager.java:664)
>>>>>
>>>>>
>>>>>
>>>>> Cache cache = new CacheFactory()
>>>>>         .set( ConfigurationProperties.LOCATORS, "localhost[10334],localhost[10335]")
>>>>>         .create();Execution execution = FunctionService.onServers(cache);ArrayList argList = new ArrayList();argList.add("region_new");RegionAttributes attr = new AttributesFactory().create();argList.add(attr);Function function = new CreateRegionFunction();//FunctionService.registerFunction(function);Object result = execution.setArguments(argList).execute(function).getResult();
>>>>>
>>>>>
>>>>>
>>>>
>>