You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@geode.apache.org by fdaniel7 <gi...@git.apache.org> on 2016/09/26 07:45:08 UTC

[GitHub] incubator-geode pull request #245: native-client-software-grant - ClientMeta...

GitHub user fdaniel7 opened a pull request:

    https://github.com/apache/incubator-geode/pull/245

    native-client-software-grant - ClientMetadata::getServerLocation is r\u2026

    \u2026eturning a primary BucketServerLocationPtr instead of a secondary
    
    A pointer to a primary BucketServerLocationPtr can be returned when
    asking for a secondary. The returned BucketServerLocationPtr is a
    randomization between [0,
    m_bucketServerLocationsList[bucketId].size()-1] where the result can be
    a primary copy.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fdaniel7/incubator-geode native-client-software-grant

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-geode/pull/245.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #245
    
----
commit 3a03704ae259288e768a64443af12f37fefa19e4
Author: fdaniel7 <fd...@amdocs.com>
Date:   2016-09-26T07:37:32Z

    native-client-software-grant - ClientMetadata::getServerLocation is returning a primary BucketServerLocationPtr instead of a secondary
    
    A pointer to a primary BucketServerLocationPtr can be returned when
    asking for a secondary. The returned BucketServerLocationPtr is a
    randomization between [0,
    m_bucketServerLocationsList[bucketId].size()-1] where the result can be
    a primary copy.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-geode pull request #245: native-client-software-grant - ClientMeta...

Posted by fdaniel7 <gi...@git.apache.org>.
Github user fdaniel7 commented on a diff in the pull request:

    https://github.com/apache/incubator-geode/pull/245#discussion_r84839367
  
    --- Diff: geode-client-native/src/cppcache/impl/ClientMetadataService.cpp ---
    @@ -1,925 +1 @@
    -/*=========================================================================
    - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
    - * This product is protected by U.S. and international copyright
    - * and intellectual property laws. Pivotal products are covered by
    - * one or more patents listed at http://www.pivotal.io/patents.
    - *=========================================================================
    - */
    -#include "TcrMessage.hpp"
    -#include "ClientMetadataService.hpp"
    -#include "ThinClientPoolDM.hpp"
    -#include "../FixedPartitionResolver.hpp"
    -#include <iterator>
    -#include <cstdlib>
    -#include <climits>
    -
    -namespace gemfire {
    -  const char * ClientMetadataService::NC_CMDSvcThread = "NC CMDSvcThread";
    -  ClientMetadataService::~ClientMetadataService ()
    -  {
    -    delete m_regionQueue;
    -    if(m_bucketWaitTimeout > 0)
    -    {
    -      try
    -      {
    -        std::map<std::string, PRbuckets*>::iterator  bi;
    -        for (bi = m_bucketStatus.begin(); bi != m_bucketStatus.end(); ++bi)
    -        {
    -          delete bi->second;
    -        }
    -
    -      }catch(...)
    -      {
    -        LOGINFO("Exception in ClientMetadataService destructor");
    -      }
    -    }
    -  }
    -
    -  ClientMetadataService::ClientMetadataService(PoolPtr pool)
    -    /* adongre
    -    * CID 28928: Uninitialized scalar field (UNINIT_CTOR)
    -    */
    -    : m_run(false)
    -
    -  {
    -    m_regionQueue = new Queue<std::string>(false);
    -    m_pool = pool;
    -    m_bucketWaitTimeout = DistributedSystem::getSystemProperties()->bucketWaitTimeout();
    -  }
    -
    -  int ClientMetadataService::svc() {
    -    DistributedSystemImpl::setThreadName(NC_CMDSvcThread);
    -    LOGINFO("ClientMetadataService started for pool %s", m_pool->getName()); 
    -    while (m_run) {
    -      m_regionQueueSema.acquire( );
    -      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) ); 
    -      CacheImpl * cache = tcrdm->getConnectionManager().getCacheImpl();
    -      while(true)
    -      {
    -        std::string * regionFullPath = m_regionQueue->get();
    -
    -        if(regionFullPath != NULL && regionFullPath->c_str() != NULL )
    -        {
    -          while(true) 
    -          {
    -            if(m_regionQueue->size() > 0) 
    -            {
    -              std::string * nextRegionFullPath = m_regionQueue->get();
    -              if(nextRegionFullPath != NULL && nextRegionFullPath->c_str() != NULL && regionFullPath->compare(nextRegionFullPath->c_str()) == 0)
    -              {
    -                delete nextRegionFullPath;//we are going for same
    -              }
    -              else
    -              {
    -                //different region; put it back
    -                m_regionQueue->put(nextRegionFullPath);
    -                break;
    -              }
    -            } else
    -              break;
    -          }
    -        }
    -
    -        if (!cache->isCacheDestroyPending() && regionFullPath != NULL && regionFullPath->c_str() != NULL) {
    -          getClientPRMetadata(regionFullPath->c_str());
    -          delete regionFullPath;
    -          regionFullPath = NULL;
    -        }
    -        else
    -        {
    -          delete regionFullPath;
    -          regionFullPath = NULL;
    -          break;
    -        }
    -      }
    -      //while(m_regionQueueSema.tryacquire( ) != -1); // release all
    -    }
    -    LOGINFO("ClientMetadataService stopped for pool %s", m_pool->getName());
    -    return 0;
    -  }
    -
    -  void  ClientMetadataService::getClientPRMetadata(const char * regionFullPath) {
    -    if (regionFullPath == NULL) return;
    -    ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );
    -    if(tcrdm == NULL)
    -    {
    -      throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");
    -    }
    -    // That means metadata for the region not found, So only for the first time for a particular region use GetClientPartitionAttributesOp
    -    // TcrMessage to fetch the metadata and put it into map for later use.send this message to server and get metadata from server.    
    -    TcrMessage reply;
    -    std::string path(regionFullPath);
    -    ClientMetadataPtr cptr = NULLPTR;
    -	{
    -	  ReadGuard guard( m_regionMetadataLock );
    -    RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);
    -    if (itr != m_regionMetaDataMap.end()) {
    -      cptr = itr->second;
    -    }
    -	  //cptr = m_regionMetaDataMap[path];
    -	}
    -    ClientMetadataPtr newCptr = NULLPTR;
    -
    -    {
    -      //ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_regionMetadataLock );  
    -
    -
    -      if (cptr == NULLPTR) {
    -        TcrMessage request(TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES, regionFullPath);
    -        GfErrType err = tcrdm->sendSyncRequest(request, reply);        
    -        if (err == GF_NOERR && reply.getMessageType() == TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES) {
    -          cptr = new ClientMetadata(reply.getNumBuckets(), reply.getColocatedWith(), tcrdm, reply.getFpaSet());
    -          if(m_bucketWaitTimeout > 0 && reply.getNumBuckets() > 0)
    -          {
    -            WriteGuard guard( m_PRbucketStatusLock );
    -            m_bucketStatus[regionFullPath] = new PRbuckets(reply.getNumBuckets());
    -          }
    -          LOGDEBUG("ClientMetadata buckets %d ", reply.getNumBuckets());
    -          if (cptr != NULLPTR) {
    -            //m_regionMetaDataMap[regionFullPath] = cptr;    
    -          }
    -        }        
    -      }
    -    }
    -    if (cptr == NULLPTR) {
    -      return;
    -    }
    -    CacheableStringPtr colocatedWith;
    -    if (cptr != NULLPTR) {
    -      colocatedWith = cptr->getColocatedWith();
    -    }
    -    if (colocatedWith == NULLPTR) {     
    -      newCptr = SendClientPRMetadata(regionFullPath,cptr);
    -      //now we will get new instance so assign it again
    -      if(newCptr != NULLPTR) 
    -      {
    -        cptr->setPreviousone(NULLPTR);
    -        newCptr->setPreviousone(cptr);
    -        WriteGuard guard( m_regionMetadataLock );
    -        m_regionMetaDataMap[path] = newCptr;
    -		LOGINFO("Updated client meta data");
    -      }
    -    }
    -    else {
    -      {
    -        //ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_regionMetadataLock );
    -        //m_regionMetaDataMap[colocatedWith->asChar()] = cptr;
    -      }      
    -      newCptr = SendClientPRMetadata(colocatedWith->asChar(),cptr);         
    -
    -      if(newCptr != NULLPTR) 
    -      {
    -        cptr->setPreviousone(NULLPTR);
    -        newCptr->setPreviousone(cptr);
    -        //now we will get new instance so assign it again
    -        WriteGuard guard( m_regionMetadataLock );
    -        m_regionMetaDataMap[colocatedWith->asChar()] = newCptr;
    -        m_regionMetaDataMap[path] = newCptr;
    -		LOGINFO("Updated client meta data");
    -      }
    -    }    
    -  }
    -
    -  ClientMetadataPtr ClientMetadataService::SendClientPRMetadata(const char* regionPath, ClientMetadataPtr cptr) {
    -    ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );
    -    if(tcrdm == NULL)
    -    {
    -      throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");
    -    }
    -    TcrMessage request(TcrMessage::GET_CLIENT_PR_METADATA, regionPath);
    -    TcrMessage reply;
    -    // send this message to server and get metadata from server.
    -    LOGFINE("Now sending GET_CLIENT_PR_METADATA for getting from server: %s",regionPath);
    -    RegionPtr region = NULLPTR;    
    -    GfErrType err = tcrdm->sendSyncRequest(request, reply);    
    -    if (err == GF_NOERR && reply.getMessageType() == TcrMessage::RESPONSE_CLIENT_PR_METADATA) {
    -      tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionPath, region);      
    -      if (region != NULLPTR) {       
    -        LocalRegion* lregion = dynamic_cast<LocalRegion*> (region.ptr());          
    -        lregion->getRegionStats()->incMetaDataRefreshCount();        
    -      }      
    -      std::vector<BucketServerLocationsType>* metadata = reply.getMetadata();
    -      if(metadata == NULL)
    -        return NULLPTR;
    -      if ( metadata->empty()) {
    -        delete metadata;
    -        return NULLPTR;
    -      }
    -      ClientMetadata* newCptr = new ClientMetadata(*(cptr.ptr()));
    -      for (std::vector<BucketServerLocationsType>::iterator iter = metadata->begin();
    -        iter != metadata->end(); ++iter) {      
    -          if (!(*iter).empty()) {
    -            newCptr->updateBucketServerLocations((*iter).at(0)->getBucketId(), (*iter));                   
    -          }
    -      }
    -      delete metadata;
    -      ClientMetadataPtr newCMDPtr(newCptr);
    -      return newCMDPtr;
    -    }
    -    return NULLPTR;
    -  }
    -
    -  void ClientMetadataService::getBucketServerLocation(const RegionPtr& region, const CacheableKeyPtr& key,
    -    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)
    -  {
    -    //ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_regionMetadataLock );    
    -    if (region != NULLPTR) {
    -	  ReadGuard guard( m_regionMetadataLock );
    -	  LOGDEBUG("ClientMetadataService::getBucketServerLocation m_regionMetaDataMap size is %d",m_regionMetaDataMap.size());	  
    -    std::string path(region->getFullPath());
    -    ClientMetadataPtr cptr = NULLPTR;
    -    RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);
    -    if (itr != m_regionMetaDataMap.end()) {
    -      cptr = itr->second;
    -    }
    -      //ClientMetadataPtr cptr = m_regionMetaDataMap[path];
    -      if (cptr == NULLPTR) {
    -        //serverLocation = BucketServerLocation();
    -        return;
    -      }
    -      CacheableKeyPtr resolvekey;
    -      const PartitionResolverPtr& resolver = region->getAttributes()->getPartitionResolver();
    -      
    -      EntryEvent event(region, key, value, NULLPTR, aCallbackArgument, false);
    -      int bucketId = 0;
    -      if (resolver == NULLPTR) {
    -        resolvekey = key;
    -      }
    -      else {
    -        resolvekey = resolver->getRoutingObject(event);
    -        if (resolvekey == NULLPTR) {
    -          throw IllegalStateException("The RoutingObject returned by PartitionResolver is null.");
    -        }
    -      }
    -      CacheableHashSetPtr& partNames = cptr->getFixedPartitionNames();
    -      if (partNames != NULLPTR && partNames->size() > 0) {        
    -        FixedPartitionResolverPtr fpResolver( dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));
    -        if(fpResolver != NULLPTR) {       
    -          const char* partition = fpResolver->getPartitionName(event, partNames);        
    -          if ( partition == NULL ) {
    -            throw IllegalStateException("partition name returned by Partition resolver is null.");
    -          }
    -          else {
    -            bucketId = cptr->assignFixedBucketId(partition, resolvekey);
    -            if ( bucketId == -1) {
    -              return;
    -            }          
    -          }
    -        }
    -      }      
    -      else {
    -        if (cptr->getTotalNumBuckets() > 0 ) {          
    -          bucketId = std::abs((int)resolvekey->hashcode() % cptr->getTotalNumBuckets());          
    -        }
    -      }
    -      cptr->getServerLocation(bucketId, isPrimary, serverLocation, version);
    -    }    
    -  }
    -
    -  void ClientMetadataService::removeBucketServerLocation(BucketServerLocation serverLocation)
    -  {
    -    ReadGuard guard( m_regionMetadataLock );
    -    for(RegionMetadataMapType::iterator regionMetadataIter= m_regionMetaDataMap.begin();
    -      regionMetadataIter!=m_regionMetaDataMap.end();
    -      regionMetadataIter++) {
    -        ClientMetadataPtr cptr = (*regionMetadataIter).second;
    -        if (cptr != NULLPTR) {
    -          //Yogesh has commented out this as it was causing a SIGV
    -          //cptr->removeBucketServerLocation(serverLocation);
    -        }
    -    }
    -  }
    -
    -  ClientMetadataPtr ClientMetadataService::getClientMetadata(const char* regionFullPath)
    -  {
    -    ReadGuard guard( m_regionMetadataLock );
    -    RegionMetadataMapType::iterator regionMetadataIter = m_regionMetaDataMap.find(regionFullPath);
    -    if (regionMetadataIter != m_regionMetaDataMap.end()) {
    -      return (*regionMetadataIter).second;
    -    }
    -    return NULLPTR;
    -  }
    -
    - /*const  PartitionResolverPtr& ClientMetadataService::getResolver(const RegionPtr& region, const CacheableKeyPtr& key,
    -    const UserDataPtr& aCallbackArgument){
    -      //const char * regionFullPath = region->getFullPath();
    -      //if (regionFullPath != NULL) {
    -        //const RegionAttributesPtr& rAttrsPtr = region->getAttributes();
    -        return region->getAttributes()->getPartitionResolver();
    -      //}
    -   }*/   
    -
    -  /*BucketServerLocation ClientMetadataService::getServerLocation(ClientMetadataPtr cptr, int bucketId, bool tryPrimary)
    -  {
    -  LOGFINE("Inside getServerLocation");
    -  if (cptr == NULLPTR) {
    -  LOGDEBUG("MetaData does not exist");
    -  return BucketServerLocation();
    -  }
    -  LOGFINE("Ending getServerLocation");
    -  return cptr->getServerLocation(bucketId, tryPrimary);
    -  }*/
    -
    -  void ClientMetadataService::populateDummyServers(const char * regionName, ClientMetadataPtr cptr)
    -  {
    -    WriteGuard guard( m_regionMetadataLock );
    -    m_regionMetaDataMap[regionName] = cptr;
    -  }
    -
    -  void ClientMetadataService::enqueueForMetadataRefresh(
    -    const char* regionFullPath, int8 serverGroupFlag) {
    -      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );
    -      if(tcrdm == NULL)
    -      {
    -        throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");
    -      }
    -      RegionPtr region;
    -      tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionFullPath, region);      
    -      LocalRegion* lregion = dynamic_cast<LocalRegion*> (region.ptr());          
    -      lregion->getRegionStats()->incNonSingleHopCount();//we are here means nonSinglehop
    -
    -      std::string serverGroup = tcrdm->getServerGroup();    
    -      if (serverGroup.length() != 0 ) {    
    -        CacheImpl::setServerGroupFlag(serverGroupFlag);
    -        if (serverGroupFlag == 2 ) {
    -          LOGFINER("Network hop but, from within same server-group, so no metadata fetch from the server");
    -          return;
    -        }
    -      }
    -
    -      if (region != NULLPTR) {       
    -        ThinClientRegion* tcrRegion = dynamic_cast<ThinClientRegion*> (region.ptr());
    -        {
    -          TryWriteGuard guardRegionMetaDataRefresh( tcrRegion->getMataDataMutex(), tcrRegion->getMetaDataRefreshed());
    -          if ( tcrRegion->getMetaDataRefreshed() ) {
    -            return;
    -          }
    -          LOGFINE("Network hop so fetching single hop metadata from the server");
    -          CacheImpl::setNetworkHopFlag(true);
    -          tcrRegion->setMetaDataRefreshed(true);            
    -          std::string* tempRegionPath = new std::string(regionFullPath);
    -          m_regionQueue->put(tempRegionPath);
    -          m_regionQueueSema.release( );
    -        }
    -      }   
    -  }
    -
    -  HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* ClientMetadataService::getServerToFilterMap(
    -    const VectorOfCacheableKey* keys, const RegionPtr& region, bool isPrimary) {
    -      //const char* regionFullPath = region->getFullPath();
    -      ClientMetadataPtr cptr = NULLPTR;
    -      {   
    -        ReadGuard guard( m_regionMetadataLock );
    -        RegionMetadataMapType::iterator cptrIter = m_regionMetaDataMap.find(region->getFullPath());
    -
    -        if(cptrIter != m_regionMetaDataMap.end())
    -        {
    -          cptr = cptrIter->second;
    -        }
    -
    -        if (cptr == NULLPTR || keys == NULL) {
    -          // enqueueForMetadataRefresh(region->getFullPath());
    -          return NULL;
    -          //		//serverLocation = BucketServerLocation();
    -          //		return;
    -        }
    -      }
    -      //int totalNumberOfBuckets = cptr->getTotalNumBuckets();
    -      HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* result = new HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>();
    -      VectorOfCacheableKeyPtr keysWhichLeft(new VectorOfCacheableKey());
    -
    -      std::map<int, BucketServerLocationPtr> buckets;
    -
    -      for (VectorOfCacheableKey::Iterator iter = keys->begin(); iter != keys->end(); iter++) {
    -        CacheableKeyPtr key = *iter;
    -        LOGDEBUG("cmds = %s", key->toString()->toString());
    -        PartitionResolverPtr resolver = region->getAttributes()->getPartitionResolver();
    -        CacheableKeyPtr resolveKey;
    -
    -        if (resolver == NULLPTR) {
    -          // client has not registered PartitionResolver
    -          // Assuming even PR at server side is not using PartitionResolver
    -          resolveKey = key;
    -        } else {
    -          EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, false);
    -          resolveKey = resolver->getRoutingObject(event);
    -        }
    -
    -        int bucketId = std::abs((int)resolveKey->hashcode() % cptr->getTotalNumBuckets());
    -        VectorOfCacheableKeyPtr keyList = NULLPTR;
    -        std::map<int, BucketServerLocationPtr>::iterator bucketsIter =
    -          buckets.find(bucketId);
    -
    -        if (bucketsIter == buckets.end()) {
    -          int8 version = -1;
    -          //BucketServerLocationPtr serverLocation(new BucketServerLocation());
    -          BucketServerLocationPtr serverLocation = NULLPTR;
    -          cptr->getServerLocation(bucketId, isPrimary, serverLocation,version);
    -          if(serverLocation == NULLPTR)
    -          {//HItesh:if server not returns all buckets, need to confiem with PR team about this why??
    -            keysWhichLeft->push_back(key);
    -            continue;
    -          }
    -          else if(!serverLocation->isValid())
    -          {
    -            keysWhichLeft->push_back(key);
    -            continue;
    -          }
    -          //if(serverLocation == NULLPTR)
    -          //continue;//hitesh need to fix
    -          buckets[bucketId] = serverLocation;
    -          HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator itrRes = result->find(serverLocation);
    -          //keyList = (*result)[serverLocation];
    -
    -          if(itrRes == result->end())
    -          {
    -            keyList = new VectorOfCacheableKey();
    -            result->insert(serverLocation, keyList);
    -          }
    -          else
    -            keyList = itrRes.second();
    -          LOGDEBUG("new keylist buckets =%d res = %d", buckets.size(), result->size() );
    -        } else {
    -          keyList = (*result)[bucketsIter->second];
    -        }
    -
    -        keyList->push_back(key);
    -      }
    -
    -      if(keysWhichLeft->size() > 0 && result->size() > 0)
    -      {//add left keys in result
    -        int keyLefts = keysWhichLeft->size();
    -        int totalServers = result->size();
    -        int perServer = keyLefts/totalServers + 1;
    -
    -        int keyIdx = 0;
    -        for(HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator locationIter = result->begin();
    -          locationIter != result->end(); locationIter++)
    -        {
    -          VectorOfCacheableKeyPtr keys = locationIter.second();
    -          for(int i = 0; i < perServer; i++)
    -          {
    -            if(keyIdx < keyLefts)
    -            {
    -              keys->push_back(keysWhichLeft->at(keyIdx++));
    -            }
    -            else
    -              break;
    -          }
    -          if(keyIdx >= keyLefts)
    -            break;//done
    -        }
    -      }
    -      else if(result->size() == 0)
    -      {//not be able to map any key
    -        return NULL;//it will force all keys to send to one server
    -      }
    -
    -      return result;
    -  }
    -
    -
    -  void ClientMetadataService::markPrimaryBucketForTimeout(const RegionPtr& region, const CacheableKeyPtr& key,
    -    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)
    -  {
    -    if(m_bucketWaitTimeout == 0)
    -      return;
    -
    -    WriteGuard guard( m_PRbucketStatusLock ); 
    -
    -    getBucketServerLocation(region, key, value, aCallbackArgument, false/*look for secondary host*/, serverLocation, version);
    -
    -    if(serverLocation != NULLPTR && serverLocation->isValid())
    -    {
    -      LOGDEBUG("Server host and port are %s:%d", serverLocation->getServerName().c_str(), serverLocation->getPort());
    -      int32_t bId = serverLocation->getBucketId();
    -
    -      std::map<std::string, PRbuckets*>::iterator  bs = m_bucketStatus.find(region->getFullPath());
    -
    -      if(bs != m_bucketStatus.end())
    -      {
    -        bs->second->setBucketTimeout(bId);
    -        LOGDEBUG("marking bucket %d as timeout ", bId); 
    -      }
    -    }  
    -  }
    -
    -  HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* ClientMetadataService::groupByBucketOnClientSide(
    -    const RegionPtr& region, CacheableVectorPtr* keySet, ClientMetadataPtr& metadata) {
    -
    -      HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = new HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>();
    -      for (CacheableVector::Iterator itr = (*keySet)->begin(); itr != (*keySet)->end(); ++itr) {
    -        CacheableKeyPtr key = dynCast<CacheableKeyPtr> (*itr);
    -        PartitionResolverPtr resolver = region->getAttributes()->getPartitionResolver();
    -        CacheableKeyPtr resolvekey;
    -        EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, false);
    -        int bucketId = -1;
    -        if (resolver == NULLPTR) {
    -          resolvekey = key;
    -        }
    -        else {
    -          resolvekey = resolver->getRoutingObject(event);
    -          if (resolvekey == NULLPTR) {
    -            throw IllegalStateException("The RoutingObject returned by PartitionResolver is null.");
    -          }
    -        }
    -        CacheableHashSetPtr partitionNames = metadata->getFixedPartitionNames();
    -        if (partitionNames != NULLPTR && partitionNames->size() > 0) {
    -          FixedPartitionResolverPtr fpResolver( dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));
    -          if(fpResolver != NULLPTR) {
    -            const char* partition = fpResolver->getPartitionName(event, partitionNames);
    -            if ( partition == NULL ) {
    -              throw IllegalStateException("partition name returned by Partition resolver is null.");
    -            }
    -            else {
    -              bucketId = metadata->assignFixedBucketId(partition, resolvekey);
    -              if ( bucketId == -1) {
    -                this->enqueueForMetadataRefresh(region->getFullPath(), 0);
    -              }
    -            }
    -          }
    -        }
    -        else {
    -          if (metadata->getTotalNumBuckets() > 0 ) {
    -            bucketId = std::abs((int)resolvekey->hashcode() % metadata->getTotalNumBuckets());
    -          }
    -        }
    -        HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator iter =  bucketToKeysMap->find(CacheableInt32::create(bucketId));
    -        CacheableHashSetPtr bucketKeys;
    -        if (iter == bucketToKeysMap->end()) {
    -          bucketKeys = CacheableHashSet::create();
    -          bucketToKeysMap->insert(CacheableInt32::create(bucketId), bucketKeys);
    -        }
    -        else {
    -          bucketKeys = iter.second();
    -        }
    -        bucketKeys->insert(key);
    -      }
    -      return bucketToKeysMap;
    -  }
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* ClientMetadataService::getServerToFilterMapFESHOP(
    -  CacheableVectorPtr* routingKeys, const RegionPtr& region, bool isPrimary) {
    -
    -    ClientMetadataPtr cptr = getClientMetadata(region->getFullPath());
    -
    -    if (cptr == NULLPTR /*|| cptr->adviseRandomServerLocation() == NULLPTR*/) {
    -      enqueueForMetadataRefresh(region->getFullPath(), 0);
    -      return NULL;
    -    }
    -
    -    if (routingKeys == NULL) {
    -      return NULL;
    -    }
    -
    -    HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = groupByBucketOnClientSide(region, routingKeys, cptr);
    -    CacheableHashSetPtr bucketSet = CacheableHashSet::create();
    -    for (HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator iter = bucketToKeysMap->begin(); iter != bucketToKeysMap->end(); ++iter) {
    -      bucketSet->insert(iter.first());
    -    }
    -    LOGDEBUG("ClientMetadataService::getServerToFilterMapFESHOP: bucketSet size = %d ", bucketSet->size());
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToBuckets = groupByServerToBuckets(cptr, bucketSet, isPrimary);
    -
    -    if (serverToBuckets == NULL) {
    -      delete bucketToKeysMap;
    -      return NULL;
    -    }
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToKeysMap = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -
    -    for (HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBuckets->begin(); itrRes != serverToBuckets->end(); ++itrRes) {
    -      BucketServerLocationPtr serverLocation = itrRes.first();
    -      CacheableHashSetPtr buckets = itrRes.second();
    -      for(CacheableHashSet::Iterator bucket = buckets->begin(); bucket != buckets->end(); ++bucket) {
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator iter = serverToKeysMap->find(serverLocation);
    -        CacheableHashSetPtr keys;
    -        if (iter == serverToKeysMap->end()) {
    -          keys = CacheableHashSet::create();
    -        }
    -        else {
    -          keys = iter.second();
    -        }
    -        HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator bucketToKeysiter = bucketToKeysMap->find(*bucket);
    -        if (bucketToKeysiter !=  bucketToKeysMap->end()) {
    -          CacheableHashSetPtr bkeys = bucketToKeysiter.second();
    -          for (CacheableHashSet::Iterator itr = bkeys->begin(); itr != bkeys->end(); ++itr) {
    -            keys->insert(*itr);
    -          }
    -        }
    -        serverToKeysMap->insert(serverLocation, keys);
    -      }
    -    }
    -    delete bucketToKeysMap;
    -    delete serverToBuckets;
    -    return serverToKeysMap;
    -}
    -
    -BucketServerLocationPtr ClientMetadataService::findNextServer
    -  (HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToBucketsMap, CacheableHashSetPtr& currentBucketSet) {
    -
    -    BucketServerLocationPtr serverLocation;
    -    int max = -1;
    -    std::vector<BucketServerLocationPtr> nodesOfEqualSize;
    -    for(HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itr = serverToBucketsMap->begin(); itr != serverToBucketsMap->end(); ++itr) {
    -      CacheableHashSetPtr buckets = CacheableHashSet::create();
    -      CacheableHashSetPtr sBuckets = itr.second();
    -
    -      for(CacheableHashSet::Iterator sItr = sBuckets->begin(); sItr != sBuckets->end(); ++sItr) {
    -        buckets->insert(*sItr);
    -      }
    -
    -      LOGDEBUG("ClientMetadataService::findNextServer currentBucketSet->size() = %d  bucketSet->size() = %d ", currentBucketSet->size(), buckets->size());
    -
    -      for(CacheableHashSet::Iterator currentBucketSetIter = currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); ++currentBucketSetIter) {        
    -        buckets->erase(*currentBucketSetIter);
    -        LOGDEBUG("ClientMetadataService::findNextServer bucketSet->size() = %d ",  buckets->size());        
    -      }
    -
    -      int size = buckets->size();
    -      if (max < size) {
    -        max = size;
    -        serverLocation = itr.first();
    -        nodesOfEqualSize.clear();
    -        nodesOfEqualSize.push_back(serverLocation);
    -      }
    -      else if (max == size){
    -        nodesOfEqualSize.push_back(serverLocation);
    -      }
    -    }
    -
    -    int nodeSize = nodesOfEqualSize.size();
    -    if(nodeSize > 0) {
    -      RandGen randgen;
    -      int random = randgen(nodeSize);
    -      return nodesOfEqualSize.at(random);
    -    }
    -    return NULLPTR;
    -}
    -
    -bool ClientMetadataService::AreBucketSetsEqual(CacheableHashSetPtr& currentBucketSet, CacheableHashSetPtr& bucketSet) {
    -
    -  int32_t currentBucketSetSize = currentBucketSet->size();
    -  int32_t bucketSetSetSize = bucketSet->size();
    -
    -  LOGDEBUG("ClientMetadataService::AreBucketSetsEqual currentBucketSetSize = %d bucketSetSetSize = %d ", currentBucketSetSize, bucketSetSetSize);
    -
    -  if (currentBucketSetSize != bucketSetSetSize) {
    -    return false;
    -  }
    -
    -  bool found = false;
    -  for(CacheableHashSet::Iterator currentBucketSetIter = currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); ++currentBucketSetIter) {
    -    found=false;
    -    for(CacheableHashSet::Iterator bucketSetIter = bucketSet->begin(); bucketSetIter != bucketSet->end(); ++bucketSetIter) {
    -      if(*currentBucketSetIter == *bucketSetIter) {
    -        found = true;
    -        break;
    -      }
    -    }
    -    if (!found) return false;
    -  }
    -  return true;
    -}
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* ClientMetadataService::
    -  pruneNodes(ClientMetadataPtr& metadata, CacheableHashSetPtr&  buckets) {
    -
    -    CacheableHashSetPtr bucketSetWithoutServer = CacheableHashSet::create();
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToBucketsMap
    -      = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* prunedServerToBucketsMap
    -      = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -
    -    for(CacheableHashSet::Iterator bucketId= buckets->begin(); bucketId != buckets->end(); ++bucketId) {
    -      CacheableInt32Ptr bID = *bucketId;
    -      std::vector<BucketServerLocationPtr > locations = metadata->adviseServerLocations(bID->value());
    -      if (locations.size() == 0) {
    -        LOGDEBUG("ClientMetadataService::pruneNodes Since no server location available for bucketId = %d  putting it into bucketSetWithoutServer ", bID->value() );
    -        bucketSetWithoutServer->insert(bID);
    -        continue;
    -      }
    -
    -      for (std::vector<BucketServerLocationPtr >::iterator location= locations.begin(); location != locations.end(); ++location) {
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBucketsMap->find(*location);
    -        CacheableHashSetPtr bucketSet;
    -        if (itrRes == serverToBucketsMap->end()) {
    -          bucketSet = CacheableHashSet::create();
    -        }
    -        else {
    -          bucketSet = itrRes.second();
    -        }
    -        bucketSet->insert(bID);
    -        serverToBucketsMap->insert(*location, bucketSet);
    -      }
    -    }
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBucketsMap->begin();
    -    CacheableHashSetPtr currentBucketSet = CacheableHashSet::create();
    -    BucketServerLocationPtr randomFirstServer;
    -    if(serverToBucketsMap->empty()) {
    -      LOGDEBUG("ClientMetadataService::pruneNodes serverToBucketsMap is empty so returning NULL");      
    -      delete prunedServerToBucketsMap;
    -      delete serverToBucketsMap;
    -      return NULL;
    -    }
    -    else {
    -      size_t size = serverToBucketsMap->size();
    -      LOGDEBUG("ClientMetadataService::pruneNodes Total size of serverToBucketsMap = %d ", size);
    -      for ( size_t idx = 0; idx < (rand() % size); idx++) {
    -        itrRes++;
    -      }
    -      randomFirstServer = itrRes.first();
    -    }
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes1 = serverToBucketsMap->find(randomFirstServer);
    -    CacheableHashSetPtr bucketSet = itrRes1.second();
    -
    -    for(CacheableHashSet::Iterator bt = bucketSet->begin(); bt != bucketSet->end(); ++bt) {
    -      currentBucketSet->insert(*bt);
    -    }
    -    prunedServerToBucketsMap->insert(randomFirstServer, bucketSet);
    -    serverToBucketsMap->erase(randomFirstServer);
    -
    -    while(!AreBucketSetsEqual(currentBucketSet, buckets)) {
    -      BucketServerLocationPtr server = findNextServer(serverToBucketsMap, currentBucketSet);
    -      if (server == NULLPTR) {
    -        LOGDEBUG("ClientMetadataService::pruneNodes findNextServer returned no server");
    -        break;
    -      }
    -
    -      HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes2 = serverToBucketsMap->find(server);
    -      CacheableHashSetPtr bucketSet2 = itrRes2.second();
    -
    -      LOGDEBUG("ClientMetadataService::pruneNodes currentBucketSet->size() = %d  bucketSet2->size() = %d ", currentBucketSet->size(), bucketSet2->size());
    -
    -      for(CacheableHashSet::Iterator currentBucketSetIter = currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); ++currentBucketSetIter) {        
    -        bucketSet2->erase(*currentBucketSetIter);
    -        LOGDEBUG("ClientMetadataService::pruneNodes bucketSet2->size() = %d ",  bucketSet2->size());        
    -      }
    -
    -      if (bucketSet2->empty()) {
    -        LOGDEBUG("ClientMetadataService::pruneNodes bucketSet2 is empty() so removing server from serverToBucketsMap");
    -        serverToBucketsMap->erase(server);
    -        continue;
    -      }
    -
    -      for(CacheableHashSet::Iterator itr = bucketSet2->begin(); itr != bucketSet2->end(); ++itr) {
    -        currentBucketSet->insert(*itr);
    -      }
    -
    -      prunedServerToBucketsMap->insert(server, bucketSet2);
    -      serverToBucketsMap->erase(server);
    -    }
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes2 = prunedServerToBucketsMap->begin();
    -    for(CacheableHashSet::Iterator itr = bucketSetWithoutServer->begin(); itr != bucketSetWithoutServer->end(); ++itr) {
    -      CacheableInt32Ptr buckstId = *itr;
    -      itrRes2.second()->insert(buckstId);
    -    }
    -
    -    delete serverToBucketsMap;
    -    return prunedServerToBucketsMap;
    -}
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* ClientMetadataService::
    -  groupByServerToAllBuckets(const RegionPtr& region, bool optimizeForWrite) {
    -
    -    ClientMetadataPtr cptr = getClientMetadata(region->getFullPath());
    -    if (cptr == NULLPTR ) {
    -      enqueueForMetadataRefresh(region->getFullPath(), false);
    -      return NULL;
    -    }
    -    int totalBuckets = cptr->getTotalNumBuckets();
    -    CacheableHashSetPtr bucketSet = CacheableHashSet::create();
    -    for(int i=0; i<totalBuckets; i++) {
    -      bucketSet->insert(CacheableInt32::create(i));
    -    }
    -    return groupByServerToBuckets(cptr, bucketSet, optimizeForWrite);
    -}
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* ClientMetadataService::
    -  groupByServerToBuckets(ClientMetadataPtr& metadata, CacheableHashSetPtr& bucketSet, bool optimizeForWrite) {
    -    if (optimizeForWrite) {
    -      HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToBucketsMap
    -        = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -      CacheableHashSetPtr bucketsWithoutServer = CacheableHashSet::create();
    -      for(CacheableHashSet::Iterator itr = bucketSet->begin(); itr != bucketSet->end(); ++itr ) {
    -        CacheableInt32Ptr bucketId = *itr;
    -        BucketServerLocationPtr serverLocation = metadata->advisePrimaryServerLocation(bucketId->value());
    -        if (serverLocation == NULLPTR) {
    -          bucketsWithoutServer->insert(bucketId);
    -          continue;
    -        }
    -        else if(!serverLocation->isValid()) {
    -          bucketsWithoutServer->insert(bucketId);
    -          continue;
    -        }
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBucketsMap->find(serverLocation);
    -        CacheableHashSetPtr buckets;
    -        if(itrRes == serverToBucketsMap->end()) {
    -          buckets = CacheableHashSet::create();
    -          serverToBucketsMap->insert(serverLocation, buckets);
    -        }
    -        else {
    -          buckets = itrRes.second();
    -        }
    -        buckets->insert(bucketId);
    -      }
    -
    -      if (!serverToBucketsMap->empty()) {
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBucketsMap->begin();
    -        for(CacheableHashSet::Iterator itr = bucketsWithoutServer->begin(); itr != bucketsWithoutServer->end(); ++itr ) {
    -          itrRes.second()->insert(*itr);
    -          LOGDEBUG("ClientMetadataService::groupByServerToBuckets inserting bucketsWithoutServer");
    -        }
    -      }
    -      return serverToBucketsMap;
    -    }
    -    else {
    -      return pruneNodes(metadata, bucketSet);
    -    }
    -}
    -
    -void ClientMetadataService::markPrimaryBucketForTimeoutButLookSecondaryBucket(const RegionPtr& region, const CacheableKeyPtr& key,
    -    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)
    -{
    -  if(m_bucketWaitTimeout == 0)
    -    return;
    -
    -  WriteGuard guard( m_PRbucketStatusLock ); 
    -
    -  std::map<std::string, PRbuckets*>::iterator  bs = m_bucketStatus.find(region->getFullPath());
    -
    -  PRbuckets* prBuckets = NULL;
    -  if(bs != m_bucketStatus.end())
    -  {
    -    prBuckets = bs->second;    
    -  }
    -
    -  if(prBuckets == NULL)
    -    return;
    -
    -  getBucketServerLocation(region, key, value, aCallbackArgument, true, serverLocation, version);
    -
    -
    -  ClientMetadataPtr cptr = NULLPTR;
    -  {   
    -   ReadGuard guard( m_regionMetadataLock );
    -   RegionMetadataMapType::iterator cptrIter = m_regionMetaDataMap.find(region->getFullPath());
    -
    -   if(cptrIter != m_regionMetaDataMap.end())
    -   {
    -     cptr = cptrIter->second;
    -   }
    -
    -   if (cptr == NULLPTR ) {
    -     return ;
    -   }
    -  }
    -
    -  LOGFINE("Setting in markPrimaryBucketForTimeoutButLookSecondaryBucket");
    -
    -  int32_t totalBuckets = cptr->getTotalNumBuckets();
    -
    -  for(int32_t i =0; i< totalBuckets; i++)
    -  {
    -    int8_t version;
    -    BucketServerLocationPtr bsl;
    -    cptr->getServerLocation(i, false, bsl, version);
    -
    -    if(bsl == serverLocation)
    -    {
    -      prBuckets->setBucketTimeout(i);
    -      LOGFINE("markPrimaryBucketForTimeoutButLookSecondaryBucket::setting bucket timeout...");
    -    }
    -  }
    -
    -}
    -
    -bool ClientMetadataService::isBucketMarkedForTimeout(const char * regionFullPath, int32_t bucketid)
    -{
    -  if(m_bucketWaitTimeout == 0)
    -    return false;
    -
    -  ReadGuard guard( m_PRbucketStatusLock );
    -  
    -  std::map<std::string, PRbuckets*>::iterator  bs = m_bucketStatus.find(regionFullPath);
    -
    -  if(bs != m_bucketStatus.end())
    -  {
    -    bool m = bs->second->isBucketTimedOut(bucketid, m_bucketWaitTimeout);
    -    if(m == true) {
    -      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) ); 
    -      CacheImpl * cache = tcrdm->getConnectionManager().getCacheImpl();
    -      cache->setBlackListBucketTimeouts();
    -    }
    -    LOGFINE("isBucketMarkedForTimeout:: for bucket %d returning = %d", bucketid, m); 
    -    
    -    return m;
    -  }
    -
    -  return false;
    -}
    -
    -}
    -
    +/*========================================================================= * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. * This product is protected by U.S. and international copyright * and intellectual property laws. Pivotal products are covered by * one or more patents listed at http://www.pivotal.io/patents. *========================================================================= */#include "TcrMessage.hpp"#include "ClientMetadataService.hpp"#include "ThinClientPoolDM.hpp"#include "../FixedPartitionResolver.hpp"#include <iterator>#include <cstdlib>#include <climits>namespace gemfire {  const char * ClientMetadataService::NC_CMDSvcThread = "NC CMDSvcThread";  ClientMetadataService::~ClientMetadataService ()  {    delete m_regionQueue;    if(m_bucketWaitTimeout > 0)    {      try      {        std::map<std::string, PRbuckets*>::iterator  bi;        for (bi = m_bucketStatus.begin(); bi != m_bucketStatus.end(); ++
 bi)        {          delete bi->second;        }      }catch(...)      {        LOGINFO("Exception in ClientMetadataService destructor");      }    }  }  ClientMetadataService::ClientMetadataService(PoolPtr pool)    /* adongre    * CID 28928: Uninitialized scalar field (UNINIT_CTOR)    */    : m_run(false)  {    m_regionQueue = new Queue<std::string>(false);    m_pool = pool;    m_bucketWaitTimeout = DistributedSystem::getSystemProperties()->bucketWaitTimeout();  }  int ClientMetadataService::svc() {    DistributedSystemImpl::setThreadName(NC_CMDSvcThread);    LOGINFO("ClientMetadataService started for pool %s", m_pool->getName());     while (m_run) {      m_regionQueueSema.acquire( );      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );       CacheImpl * cache = tcrdm->getConnectionManager().getCacheImpl();      while(true)      {        std::string * regionFullPath = m_regionQueue->get
 ();        if(regionFullPath != NULL && regionFullPath->c_str() != NULL )        {          while(true)           {            if(m_regionQueue->size() > 0)             {              std::string * nextRegionFullPath = m_regionQueue->get();              if(nextRegionFullPath != NULL && nextRegionFullPath->c_str() != NULL && regionFullPath->compare(nextRegionFullPath->c_str()) == 0)              {                delete nextRegionFullPath;//we are going for same              }              else              {                //different region; put it back                m_regionQueue->put(nextRegionFullPath);                break;              }            } else              break;          }        }        if (!cache->isCacheDestroyPending() && regionFullPath != NULL && regionFullPath->c_str() != NULL) {          getClientPRMetadata(regionFullPath->c_str());          delete regionFullPath;          regionFullPath = NULL;      
   }        else        {          delete regionFullPath;          regionFullPath = NULL;          break;        }      }      //while(m_regionQueueSema.tryacquire( ) != -1); // release all    }    LOGINFO("ClientMetadataService stopped for pool %s", m_pool->getName());    return 0;  }  void  ClientMetadataService::getClientPRMetadata(const char * regionFullPath) {    if (regionFullPath == NULL) return;    ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );    if(tcrdm == NULL)    {      throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");    }    // That means metadata for the region not found, So only for the first time for a particular region use GetClientPartitionAttributesOp    // TcrMessage to fetch the metadata and put it into map for later use.send this message to server and get metadata from server.        TcrMessage reply;    std::string path(regionFullPath);    Clie
 ntMetadataPtr cptr = NULLPTR;	{	  ReadGuard guard( m_regionMetadataLock );    RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);    if (itr != m_regionMetaDataMap.end()) {      cptr = itr->second;    }	  //cptr = m_regionMetaDataMap[path];	}    ClientMetadataPtr newCptr = NULLPTR;      if (cptr == NULLPTR) {        TcrMessage request(TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES, regionFullPath);        GfErrType err = tcrdm->sendSyncRequest(request, reply);                if (err == GF_NOERR && reply.getMessageType() == TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES) {          cptr = new ClientMetadata(reply.getNumBuckets(), reply.getColocatedWith(), tcrdm, reply.getFpaSet());          if(m_bucketWaitTimeout > 0 && reply.getNumBuckets() > 0)          {            WriteGuard guard( m_PRbucketStatusLock );            m_bucketStatus[regionFullPath] = new PRbuckets(reply.getNumBuckets());          }          LOGDEBUG("ClientMetadata b
 uckets %d ", reply.getNumBuckets());        }              }    if (cptr == NULLPTR) {      return;    }    CacheableStringPtr colocatedWith;    colocatedWith = cptr->getColocatedWith();    if (colocatedWith == NULLPTR) {           newCptr = SendClientPRMetadata(regionFullPath,cptr);      //now we will get new instance so assign it again      if(newCptr != NULLPTR)       {        cptr->setPreviousone(NULLPTR);        newCptr->setPreviousone(cptr);        WriteGuard guard( m_regionMetadataLock );        m_regionMetaDataMap[path] = newCptr;		LOGFINE("Updated client meta data");      }    }    else {      newCptr = SendClientPRMetadata(colocatedWith->asChar(),cptr);               if(newCptr != NULLPTR)       {        cptr->setPreviousone(NULLPTR);        newCptr->setPreviousone(cptr);        //now we will get new instance so assign it again        WriteGuard guard( m_regionMetadataLock );        m_regionMetaDataMap[colocatedWith->
 asChar()] = newCptr;        m_regionMetaDataMap[path] = newCptr;		LOGFINE("Updated client meta data");      }    }      }  ClientMetadataPtr ClientMetadataService::SendClientPRMetadata(const char* regionPath, ClientMetadataPtr cptr) {    ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );    if(tcrdm == NULL)    {      throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");    }    TcrMessage request(TcrMessage::GET_CLIENT_PR_METADATA, regionPath);    TcrMessage reply;    // send this message to server and get metadata from server.    LOGFINE("Now sending GET_CLIENT_PR_METADATA for getting from server: %s",regionPath);    RegionPtr region = NULLPTR;        GfErrType err = tcrdm->sendSyncRequest(request, reply);        if (err == GF_NOERR && reply.getMessageType() == TcrMessage::RESPONSE_CLIENT_PR_METADATA) {      tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionPath, region)
 ;            if (region != NULLPTR) {               LocalRegion* lregion = dynamic_cast<LocalRegion*> (region.ptr());                  lregion->getRegionStats()->incMetaDataRefreshCount();              }            std::vector<BucketServerLocationsType>* metadata = reply.getMetadata();      if(metadata == NULL)        return NULLPTR;      if ( metadata->empty()) {        delete metadata;        return NULLPTR;      }      ClientMetadata* newCptr = new ClientMetadata(*(cptr.ptr()));      for (std::vector<BucketServerLocationsType>::iterator iter = metadata->begin();        iter != metadata->end(); ++iter) {                if (!(*iter).empty()) {            newCptr->updateBucketServerLocations((*iter).at(0)->getBucketId(), (*iter));                             }      }      delete metadata;      ClientMetadataPtr newCMDPtr(newCptr);      return newCMDPtr;    }    return NULLPTR;  }  void ClientMetadataService::getBucketServerLocation
 (const RegionPtr& region, const CacheableKeyPtr& key,    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)  {    //ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_regionMetadataLock );        if (region != NULLPTR) {	  ReadGuard guard( m_regionMetadataLock );	  LOGDEBUG("ClientMetadataService::getBucketServerLocation m_regionMetaDataMap size is %d",m_regionMetaDataMap.size());	      std::string path(region->getFullPath());    ClientMetadataPtr cptr = NULLPTR;    RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);    if (itr != m_regionMetaDataMap.end()) {      cptr = itr->second;    }      //ClientMetadataPtr cptr = m_regionMetaDataMap[path];      if (cptr == NULLPTR) {        //serverLocation = BucketServerLocation();        return;      }      CacheableKeyPtr resolvekey;      const PartitionResolverPtr& resolver = region->getAttributes()->getParti
 tionResolver();            EntryEvent event(region, key, value, NULLPTR, aCallbackArgument, false);      int bucketId = 0;      if (resolver == NULLPTR) {        resolvekey = key;      }      else {        resolvekey = resolver->getRoutingObject(event);        if (resolvekey == NULLPTR) {          throw IllegalStateException("The RoutingObject returned by PartitionResolver is null.");        }      }      CacheableHashSetPtr& partNames = cptr->getFixedPartitionNames();      if (partNames != NULLPTR && partNames->size() > 0) {                FixedPartitionResolverPtr fpResolver( dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));        if(fpResolver != NULLPTR) {                 const char* partition = fpResolver->getPartitionName(event, partNames);                  if ( partition == NULL ) {            throw IllegalStateException("partition name returned by Partition resolver is null.");          }          else {            bucketI
 d = cptr->assignFixedBucketId(partition, resolvekey);            if ( bucketId == -1) {              return;            }                    }        }      }            else {        if (cptr->getTotalNumBuckets() > 0 ) {                    bucketId = std::abs((int)resolvekey->hashcode() % cptr->getTotalNumBuckets());                  }      }      cptr->getServerLocation(bucketId, isPrimary, serverLocation, version);    }      }  void ClientMetadataService::removeBucketServerLocation(BucketServerLocation serverLocation)  {    ReadGuard guard( m_regionMetadataLock );    for(RegionMetadataMapType::iterator regionMetadataIter= m_regionMetaDataMap.begin();      regionMetadataIter!=m_regionMetaDataMap.end();      regionMetadataIter++) {        ClientMetadataPtr cptr = (*regionMetadataIter).second;        if (cptr != NULLPTR) {          //Yogesh has commented out this as it was causing a SIGV          //cptr->removeBucketServerLocation(s
 erverLocation);        }    }  }  ClientMetadataPtr ClientMetadataService::getClientMetadata(const char* regionFullPath)  {    ReadGuard guard( m_regionMetadataLock );    RegionMetadataMapType::iterator regionMetadataIter = m_regionMetaDataMap.find(regionFullPath);    if (regionMetadataIter != m_regionMetaDataMap.end()) {      return (*regionMetadataIter).second;    }    return NULLPTR;  } /*const  PartitionResolverPtr& ClientMetadataService::getResolver(const RegionPtr& region, const CacheableKeyPtr& key,    const UserDataPtr& aCallbackArgument){      //const char * regionFullPath = region->getFullPath();      //if (regionFullPath != NULL) {        //const RegionAttributesPtr& rAttrsPtr = region->getAttributes();        return region->getAttributes()->getPartitionResolver();      //}   }*/     /*BucketServerLocation ClientMetadataService::getServerLocation(ClientMetadataPtr cptr, int bucketId, bool tryPrimary)  {  LOGFINE("Inside 
 getServerLocation");  if (cptr == NULLPTR) {  LOGDEBUG("MetaData does not exist");  return BucketServerLocation();  }  LOGFINE("Ending getServerLocation");  return cptr->getServerLocation(bucketId, tryPrimary);  }*/  void ClientMetadataService::populateDummyServers(const char * regionName, ClientMetadataPtr cptr)  {    WriteGuard guard( m_regionMetadataLock );    m_regionMetaDataMap[regionName] = cptr;  }  void ClientMetadataService::enqueueForMetadataRefresh(    const char* regionFullPath, int8 serverGroupFlag) {      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );      if(tcrdm == NULL)      {        throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");      }      RegionPtr region;      tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionFullPath, region);            LocalRegion* lregion = dynamic_cast<LocalRegion*> (region.ptr());                lregion->ge
 tRegionStats()->incNonSingleHopCount();//we are here means nonSinglehop      std::string serverGroup = tcrdm->getServerGroup();          if (serverGroup.length() != 0 ) {            CacheImpl::setServerGroupFlag(serverGroupFlag);        if (serverGroupFlag == 2 ) {          LOGFINER("Network hop but, from within same server-group, so no metadata fetch from the server");          return;        }      }      if (region != NULLPTR) {               ThinClientRegion* tcrRegion = dynamic_cast<ThinClientRegion*> (region.ptr());        {          TryWriteGuard guardRegionMetaDataRefresh( tcrRegion->getMataDataMutex(), tcrRegion->getMetaDataRefreshed());          if ( tcrRegion->getMetaDataRefreshed() ) {            return;          }          LOGFINE("Network hop so fetching single hop metadata from the server");          CacheImpl::setNetworkHopFlag(true);          tcrRegion->setMetaDataRefreshed(true);                      std::string* tempRegion
 Path = new std::string(regionFullPath);          m_regionQueue->put(tempRegionPath);          m_regionQueueSema.release( );        }      }     }  HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* ClientMetadataService::getServerToFilterMap(    const VectorOfCacheableKey* keys, const RegionPtr& region, bool isPrimary) {      //const char* regionFullPath = region->getFullPath();      ClientMetadataPtr cptr = NULLPTR;      {           ReadGuard guard( m_regionMetadataLock );        RegionMetadataMapType::iterator cptrIter = m_regionMetaDataMap.find(region->getFullPath());        if(cptrIter != m_regionMetaDataMap.end())        {          cptr = cptrIter->second;        }        if (cptr == NULLPTR || keys == NULL) {          // enqueueForMetadataRefresh(region->getFullPath());          return NULL;          //		//serverLocation = BucketServerLocation();          //		return;        }      }      //int totalNumberOfBuckets 
 = cptr->getTotalNumBuckets();      HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* result = new HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>();      VectorOfCacheableKeyPtr keysWhichLeft(new VectorOfCacheableKey());      std::map<int, BucketServerLocationPtr> buckets;      for (VectorOfCacheableKey::Iterator iter = keys->begin(); iter != keys->end(); iter++) {        CacheableKeyPtr key = *iter;        LOGDEBUG("cmds = %s", key->toString()->toString());        PartitionResolverPtr resolver = region->getAttributes()->getPartitionResolver();        CacheableKeyPtr resolveKey;        if (resolver == NULLPTR) {          // client has not registered PartitionResolver          // Assuming even PR at server side is not using PartitionResolver          resolveKey = key;        } else {          EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, false);          resolveKey = resolver->getRoutingObject(event);        } 
        int bucketId = std::abs((int)resolveKey->hashcode() % cptr->getTotalNumBuckets());        VectorOfCacheableKeyPtr keyList = NULLPTR;        std::map<int, BucketServerLocationPtr>::iterator bucketsIter =          buckets.find(bucketId);        if (bucketsIter == buckets.end()) {          int8 version = -1;          //BucketServerLocationPtr serverLocation(new BucketServerLocation());          BucketServerLocationPtr serverLocation = NULLPTR;          cptr->getServerLocation(bucketId, isPrimary, serverLocation,version);          if(serverLocation == NULLPTR)          {//HItesh:if server not returns all buckets, need to confiem with PR team about this why??            keysWhichLeft->push_back(key);            continue;          }          else if(!serverLocation->isValid())          {            keysWhichLeft->push_back(key);            continue;          }          //if(serverLocation == NULLPTR)          //continue;//hitesh need to fix
           buckets[bucketId] = serverLocation;          HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator itrRes = result->find(serverLocation);          //keyList = (*result)[serverLocation];          if(itrRes == result->end())          {            keyList = new VectorOfCacheableKey();            result->insert(serverLocation, keyList);          }          else            keyList = itrRes.second();          LOGDEBUG("new keylist buckets =%d res = %d", buckets.size(), result->size() );        } else {          keyList = (*result)[bucketsIter->second];        }        keyList->push_back(key);      }      if(keysWhichLeft->size() > 0 && result->size() > 0)      {//add left keys in result        int keyLefts = keysWhichLeft->size();        int totalServers = result->size();        int perServer = keyLefts/totalServers + 1;        int keyIdx = 0;        for(HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr
 >::Iterator locationIter = result->begin();          locationIter != result->end(); locationIter++)        {          VectorOfCacheableKeyPtr keys = locationIter.second();          for(int i = 0; i < perServer; i++)          {            if(keyIdx < keyLefts)            {              keys->push_back(keysWhichLeft->at(keyIdx++));            }            else              break;          }          if(keyIdx >= keyLefts)            break;//done        }      }      else if(result->size() == 0)      {//not be able to map any key        return NULL;//it will force all keys to send to one server      }      return result;  }  void ClientMetadataService::markPrimaryBucketForTimeout(const RegionPtr& region, const CacheableKeyPtr& key,    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)  {    if(m_bucketWaitTimeout == 0)      return;    W
 riteGuard guard( m_PRbucketStatusLock );     getBucketServerLocation(region, key, value, aCallbackArgument, false/*look for secondary host*/, serverLocation, version);    if(serverLocation != NULLPTR && serverLocation->isValid())    {      LOGDEBUG("Server host and port are %s:%d", serverLocation->getServerName().c_str(), serverLocation->getPort());      int32_t bId = serverLocation->getBucketId();      std::map<std::string, PRbuckets*>::iterator  bs = m_bucketStatus.find(region->getFullPath());      if(bs != m_bucketStatus.end())      {        bs->second->setBucketTimeout(bId);        LOGDEBUG("marking bucket %d as timeout ", bId);       }    }    }  HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* ClientMetadataService::groupByBucketOnClientSide(    const RegionPtr& region, CacheableVectorPtr* keySet, ClientMetadataPtr& metadata) {      HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = new HashMapT<CacheableInt32Ptr, Ca
 cheableHashSetPtr>();      for (CacheableVector::Iterator itr = (*keySet)->begin(); itr != (*keySet)->end(); ++itr) {        CacheableKeyPtr key = dynCast<CacheableKeyPtr> (*itr);        PartitionResolverPtr resolver = region->getAttributes()->getPartitionResolver();        CacheableKeyPtr resolvekey;        EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, false);        int bucketId = -1;        if (resolver == NULLPTR) {          resolvekey = key;        }        else {          resolvekey = resolver->getRoutingObject(event);          if (resolvekey == NULLPTR) {            throw IllegalStateException("The RoutingObject returned by PartitionResolver is null.");          }        }        CacheableHashSetPtr partitionNames = metadata->getFixedPartitionNames();        if (partitionNames != NULLPTR && partitionNames->size() > 0) {          FixedPartitionResolverPtr fpResolver( dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));     
      if(fpResolver != NULLPTR) {            const char* partition = fpResolver->getPartitionName(event, partitionNames);            if ( partition == NULL ) {              throw IllegalStateException("partition name returned by Partition resolver is null.");            }            else {              bucketId = metadata->assignFixedBucketId(partition, resolvekey);              if ( bucketId == -1) {                this->enqueueForMetadataRefresh(region->getFullPath(), 0);              }            }          }        }        else {          if (metadata->getTotalNumBuckets() > 0 ) {            bucketId = std::abs((int)resolvekey->hashcode() % metadata->getTotalNumBuckets());          }        }        HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator iter =  bucketToKeysMap->find(CacheableInt32::create(bucketId));        CacheableHashSetPtr bucketKeys;        if (iter == bucketToKeysMap->end()) {          bucketKeys = CacheableHas
 hSet::create();          bucketToKeysMap->insert(CacheableInt32::create(bucketId), bucketKeys);        }        else {          bucketKeys = iter.second();        }        bucketKeys->insert(key);      }      return bucketToKeysMap;  }HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* ClientMetadataService::getServerToFilterMapFESHOP(  CacheableVectorPtr* routingKeys, const RegionPtr& region, bool isPrimary) {    ClientMetadataPtr cptr = getClientMetadata(region->getFullPath());    if (cptr == NULLPTR /*|| cptr->adviseRandomServerLocation() == NULLPTR*/) {      enqueueForMetadataRefresh(region->getFullPath(), 0);      return NULL;    }    if (routingKeys == NULL) {      return NULL;    }    HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = groupByBucketOnClientSide(region, routingKeys, cptr);    CacheableHashSetPtr bucketSet = CacheableHashSet::create();    for (HashMapT<CacheableInt32Ptr, CacheableHashSetP
 tr>::Iterator iter = bucketToKeysMap->begin(); iter != bucketToKeysMap->end(); ++iter) {      bucketSet->insert(iter.first());    }    LOGDEBUG("ClientMetadataService::getServerToFilterMapFESHOP: bucketSet size = %d ", bucketSet->size());    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToBuckets = groupByServerToBuckets(cptr, bucketSet, isPrimary);    if (serverToBuckets == NULL) {      delete bucketToKeysMap;      return NULL;    }    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToKeysMap = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();    for (HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBuckets->begin(); itrRes != serverToBuckets->end(); ++itrRes) {      BucketServerLocationPtr serverLocation = itrRes.first();      CacheableHashSetPtr buckets = itrRes.second();      for(CacheableHashSet::Iterator bucket = buckets->begin(); bucket != buckets->end(); ++bucket) {
         HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator iter = serverToKeysMap->find(serverLocation);        CacheableHashSetPtr keys;        if (iter == serverToKeysMap->end()) {          keys = CacheableHashSet::create();        }        else {          keys = iter.second();        }        HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator bucketToKeysiter = bucketToKeysMap->find(*bucket);        if (bucketToKeysiter !=  bucketToKeysMap->end()) {          CacheableHashSetPtr bkeys = bucketToKeysiter.second();          for (CacheableHashSet::Iterator itr = bkeys->begin(); itr != bkeys->end(); ++itr) {            keys->insert(*itr);          }        }        serverToKeysMap->insert(serverLocation, keys);      }    }    delete bucketToKeysMap;    delete serverToBuckets;    return serverToKeysMap;}BucketServerLocationPtr ClientMetadataService::findNextServer  (HashMapT<BucketServerLocationPtr, CacheableHashSetP
 tr>* serverToBucketsMap, CacheableHashSetPtr& currentBucketSet) {    BucketServerLocationPtr serverLocation;    int max = -1;    std::vector<BucketServerLocationPtr> nodesOfEqualSize;    for(HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itr = serverToBucketsMap->begin(); itr != serverToBucketsMap->end(); ++itr) {      CacheableHashSetPtr buckets = CacheableHashSet::create();      CacheableHashSetPtr sBuckets = itr.second();      for(CacheableHashSet::Iterator sItr = sBuckets->begin(); sItr != sBuckets->end(); ++sItr) {        buckets->insert(*sItr);      }      LOGDEBUG("ClientMetadataService::findNextServer currentBucketSet->size() = %d  bucketSet->size() = %d ", currentBucketSet->size(), buckets->size());      for(CacheableHashSet::Iterator currentBucketSetIter = currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); ++currentBucketSetIter) {                buckets->erase(*currentBucketSetIter);        LOGDE
 BUG("ClientMetadataService::findNextServer bucketSet->size() = %d ",  buckets->size());              }      int size = buckets->size();      if (max < size) {        max = size;        serverLocation = itr.first();        nodesOfEqualSize.clear();        nodesOfEqualSize.push_back(serverLocation);      }      else if (max == size){        nodesOfEqualSize.push_back(serverLocation);      }    }    int nodeSize = nodesOfEqualSize.size();    if(nodeSize > 0) {      RandGen randgen;      int random = randgen(nodeSize);      return nodesOfEqualSize.at(random);    }    return NULLPTR;}bool ClientMetadataService::AreBucketSetsEqual(CacheableHashSetPtr& currentBucketSet, CacheableHashSetPtr& bucketSet) {  int32_t currentBucketSetSize = currentBucketSet->size();  int32_t bucketSetSetSize = bucketSet->size();  LOGDEBUG("ClientMetadataService::AreBucketSetsEqual currentBucketSetSize = %d bucketSetSetSize = %d ", currentBucketSetSize, bu
 cketSetSetSize);  if (currentBucketSetSize != bucketSetS
    --- End diff --
    
    Hi, whitespaces removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-geode pull request #245: native-client-software-grant - ClientMeta...

Posted by pivotal-jbarrett <gi...@git.apache.org>.
Github user pivotal-jbarrett commented on a diff in the pull request:

    https://github.com/apache/incubator-geode/pull/245#discussion_r84287364
  
    --- Diff: geode-client-native/src/cppcache/impl/ClientMetadataService.cpp ---
    @@ -1,925 +1 @@
    -/*=========================================================================
    - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
    - * This product is protected by U.S. and international copyright
    - * and intellectual property laws. Pivotal products are covered by
    - * one or more patents listed at http://www.pivotal.io/patents.
    - *=========================================================================
    - */
    -#include "TcrMessage.hpp"
    -#include "ClientMetadataService.hpp"
    -#include "ThinClientPoolDM.hpp"
    -#include "../FixedPartitionResolver.hpp"
    -#include <iterator>
    -#include <cstdlib>
    -#include <climits>
    -
    -namespace gemfire {
    -  const char * ClientMetadataService::NC_CMDSvcThread = "NC CMDSvcThread";
    -  ClientMetadataService::~ClientMetadataService ()
    -  {
    -    delete m_regionQueue;
    -    if(m_bucketWaitTimeout > 0)
    -    {
    -      try
    -      {
    -        std::map<std::string, PRbuckets*>::iterator  bi;
    -        for (bi = m_bucketStatus.begin(); bi != m_bucketStatus.end(); ++bi)
    -        {
    -          delete bi->second;
    -        }
    -
    -      }catch(...)
    -      {
    -        LOGINFO("Exception in ClientMetadataService destructor");
    -      }
    -    }
    -  }
    -
    -  ClientMetadataService::ClientMetadataService(PoolPtr pool)
    -    /* adongre
    -    * CID 28928: Uninitialized scalar field (UNINIT_CTOR)
    -    */
    -    : m_run(false)
    -
    -  {
    -    m_regionQueue = new Queue<std::string>(false);
    -    m_pool = pool;
    -    m_bucketWaitTimeout = DistributedSystem::getSystemProperties()->bucketWaitTimeout();
    -  }
    -
    -  int ClientMetadataService::svc() {
    -    DistributedSystemImpl::setThreadName(NC_CMDSvcThread);
    -    LOGINFO("ClientMetadataService started for pool %s", m_pool->getName()); 
    -    while (m_run) {
    -      m_regionQueueSema.acquire( );
    -      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) ); 
    -      CacheImpl * cache = tcrdm->getConnectionManager().getCacheImpl();
    -      while(true)
    -      {
    -        std::string * regionFullPath = m_regionQueue->get();
    -
    -        if(regionFullPath != NULL && regionFullPath->c_str() != NULL )
    -        {
    -          while(true) 
    -          {
    -            if(m_regionQueue->size() > 0) 
    -            {
    -              std::string * nextRegionFullPath = m_regionQueue->get();
    -              if(nextRegionFullPath != NULL && nextRegionFullPath->c_str() != NULL && regionFullPath->compare(nextRegionFullPath->c_str()) == 0)
    -              {
    -                delete nextRegionFullPath;//we are going for same
    -              }
    -              else
    -              {
    -                //different region; put it back
    -                m_regionQueue->put(nextRegionFullPath);
    -                break;
    -              }
    -            } else
    -              break;
    -          }
    -        }
    -
    -        if (!cache->isCacheDestroyPending() && regionFullPath != NULL && regionFullPath->c_str() != NULL) {
    -          getClientPRMetadata(regionFullPath->c_str());
    -          delete regionFullPath;
    -          regionFullPath = NULL;
    -        }
    -        else
    -        {
    -          delete regionFullPath;
    -          regionFullPath = NULL;
    -          break;
    -        }
    -      }
    -      //while(m_regionQueueSema.tryacquire( ) != -1); // release all
    -    }
    -    LOGINFO("ClientMetadataService stopped for pool %s", m_pool->getName());
    -    return 0;
    -  }
    -
    -  void  ClientMetadataService::getClientPRMetadata(const char * regionFullPath) {
    -    if (regionFullPath == NULL) return;
    -    ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );
    -    if(tcrdm == NULL)
    -    {
    -      throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");
    -    }
    -    // That means metadata for the region not found, So only for the first time for a particular region use GetClientPartitionAttributesOp
    -    // TcrMessage to fetch the metadata and put it into map for later use.send this message to server and get metadata from server.    
    -    TcrMessage reply;
    -    std::string path(regionFullPath);
    -    ClientMetadataPtr cptr = NULLPTR;
    -	{
    -	  ReadGuard guard( m_regionMetadataLock );
    -    RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);
    -    if (itr != m_regionMetaDataMap.end()) {
    -      cptr = itr->second;
    -    }
    -	  //cptr = m_regionMetaDataMap[path];
    -	}
    -    ClientMetadataPtr newCptr = NULLPTR;
    -
    -    {
    -      //ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_regionMetadataLock );  
    -
    -
    -      if (cptr == NULLPTR) {
    -        TcrMessage request(TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES, regionFullPath);
    -        GfErrType err = tcrdm->sendSyncRequest(request, reply);        
    -        if (err == GF_NOERR && reply.getMessageType() == TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES) {
    -          cptr = new ClientMetadata(reply.getNumBuckets(), reply.getColocatedWith(), tcrdm, reply.getFpaSet());
    -          if(m_bucketWaitTimeout > 0 && reply.getNumBuckets() > 0)
    -          {
    -            WriteGuard guard( m_PRbucketStatusLock );
    -            m_bucketStatus[regionFullPath] = new PRbuckets(reply.getNumBuckets());
    -          }
    -          LOGDEBUG("ClientMetadata buckets %d ", reply.getNumBuckets());
    -          if (cptr != NULLPTR) {
    -            //m_regionMetaDataMap[regionFullPath] = cptr;    
    -          }
    -        }        
    -      }
    -    }
    -    if (cptr == NULLPTR) {
    -      return;
    -    }
    -    CacheableStringPtr colocatedWith;
    -    if (cptr != NULLPTR) {
    -      colocatedWith = cptr->getColocatedWith();
    -    }
    -    if (colocatedWith == NULLPTR) {     
    -      newCptr = SendClientPRMetadata(regionFullPath,cptr);
    -      //now we will get new instance so assign it again
    -      if(newCptr != NULLPTR) 
    -      {
    -        cptr->setPreviousone(NULLPTR);
    -        newCptr->setPreviousone(cptr);
    -        WriteGuard guard( m_regionMetadataLock );
    -        m_regionMetaDataMap[path] = newCptr;
    -		LOGINFO("Updated client meta data");
    -      }
    -    }
    -    else {
    -      {
    -        //ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_regionMetadataLock );
    -        //m_regionMetaDataMap[colocatedWith->asChar()] = cptr;
    -      }      
    -      newCptr = SendClientPRMetadata(colocatedWith->asChar(),cptr);         
    -
    -      if(newCptr != NULLPTR) 
    -      {
    -        cptr->setPreviousone(NULLPTR);
    -        newCptr->setPreviousone(cptr);
    -        //now we will get new instance so assign it again
    -        WriteGuard guard( m_regionMetadataLock );
    -        m_regionMetaDataMap[colocatedWith->asChar()] = newCptr;
    -        m_regionMetaDataMap[path] = newCptr;
    -		LOGINFO("Updated client meta data");
    -      }
    -    }    
    -  }
    -
    -  ClientMetadataPtr ClientMetadataService::SendClientPRMetadata(const char* regionPath, ClientMetadataPtr cptr) {
    -    ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );
    -    if(tcrdm == NULL)
    -    {
    -      throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");
    -    }
    -    TcrMessage request(TcrMessage::GET_CLIENT_PR_METADATA, regionPath);
    -    TcrMessage reply;
    -    // send this message to server and get metadata from server.
    -    LOGFINE("Now sending GET_CLIENT_PR_METADATA for getting from server: %s",regionPath);
    -    RegionPtr region = NULLPTR;    
    -    GfErrType err = tcrdm->sendSyncRequest(request, reply);    
    -    if (err == GF_NOERR && reply.getMessageType() == TcrMessage::RESPONSE_CLIENT_PR_METADATA) {
    -      tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionPath, region);      
    -      if (region != NULLPTR) {       
    -        LocalRegion* lregion = dynamic_cast<LocalRegion*> (region.ptr());          
    -        lregion->getRegionStats()->incMetaDataRefreshCount();        
    -      }      
    -      std::vector<BucketServerLocationsType>* metadata = reply.getMetadata();
    -      if(metadata == NULL)
    -        return NULLPTR;
    -      if ( metadata->empty()) {
    -        delete metadata;
    -        return NULLPTR;
    -      }
    -      ClientMetadata* newCptr = new ClientMetadata(*(cptr.ptr()));
    -      for (std::vector<BucketServerLocationsType>::iterator iter = metadata->begin();
    -        iter != metadata->end(); ++iter) {      
    -          if (!(*iter).empty()) {
    -            newCptr->updateBucketServerLocations((*iter).at(0)->getBucketId(), (*iter));                   
    -          }
    -      }
    -      delete metadata;
    -      ClientMetadataPtr newCMDPtr(newCptr);
    -      return newCMDPtr;
    -    }
    -    return NULLPTR;
    -  }
    -
    -  void ClientMetadataService::getBucketServerLocation(const RegionPtr& region, const CacheableKeyPtr& key,
    -    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)
    -  {
    -    //ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_regionMetadataLock );    
    -    if (region != NULLPTR) {
    -	  ReadGuard guard( m_regionMetadataLock );
    -	  LOGDEBUG("ClientMetadataService::getBucketServerLocation m_regionMetaDataMap size is %d",m_regionMetaDataMap.size());	  
    -    std::string path(region->getFullPath());
    -    ClientMetadataPtr cptr = NULLPTR;
    -    RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);
    -    if (itr != m_regionMetaDataMap.end()) {
    -      cptr = itr->second;
    -    }
    -      //ClientMetadataPtr cptr = m_regionMetaDataMap[path];
    -      if (cptr == NULLPTR) {
    -        //serverLocation = BucketServerLocation();
    -        return;
    -      }
    -      CacheableKeyPtr resolvekey;
    -      const PartitionResolverPtr& resolver = region->getAttributes()->getPartitionResolver();
    -      
    -      EntryEvent event(region, key, value, NULLPTR, aCallbackArgument, false);
    -      int bucketId = 0;
    -      if (resolver == NULLPTR) {
    -        resolvekey = key;
    -      }
    -      else {
    -        resolvekey = resolver->getRoutingObject(event);
    -        if (resolvekey == NULLPTR) {
    -          throw IllegalStateException("The RoutingObject returned by PartitionResolver is null.");
    -        }
    -      }
    -      CacheableHashSetPtr& partNames = cptr->getFixedPartitionNames();
    -      if (partNames != NULLPTR && partNames->size() > 0) {        
    -        FixedPartitionResolverPtr fpResolver( dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));
    -        if(fpResolver != NULLPTR) {       
    -          const char* partition = fpResolver->getPartitionName(event, partNames);        
    -          if ( partition == NULL ) {
    -            throw IllegalStateException("partition name returned by Partition resolver is null.");
    -          }
    -          else {
    -            bucketId = cptr->assignFixedBucketId(partition, resolvekey);
    -            if ( bucketId == -1) {
    -              return;
    -            }          
    -          }
    -        }
    -      }      
    -      else {
    -        if (cptr->getTotalNumBuckets() > 0 ) {          
    -          bucketId = std::abs((int)resolvekey->hashcode() % cptr->getTotalNumBuckets());          
    -        }
    -      }
    -      cptr->getServerLocation(bucketId, isPrimary, serverLocation, version);
    -    }    
    -  }
    -
    -  void ClientMetadataService::removeBucketServerLocation(BucketServerLocation serverLocation)
    -  {
    -    ReadGuard guard( m_regionMetadataLock );
    -    for(RegionMetadataMapType::iterator regionMetadataIter= m_regionMetaDataMap.begin();
    -      regionMetadataIter!=m_regionMetaDataMap.end();
    -      regionMetadataIter++) {
    -        ClientMetadataPtr cptr = (*regionMetadataIter).second;
    -        if (cptr != NULLPTR) {
    -          //Yogesh has commented out this as it was causing a SIGV
    -          //cptr->removeBucketServerLocation(serverLocation);
    -        }
    -    }
    -  }
    -
    -  ClientMetadataPtr ClientMetadataService::getClientMetadata(const char* regionFullPath)
    -  {
    -    ReadGuard guard( m_regionMetadataLock );
    -    RegionMetadataMapType::iterator regionMetadataIter = m_regionMetaDataMap.find(regionFullPath);
    -    if (regionMetadataIter != m_regionMetaDataMap.end()) {
    -      return (*regionMetadataIter).second;
    -    }
    -    return NULLPTR;
    -  }
    -
    - /*const  PartitionResolverPtr& ClientMetadataService::getResolver(const RegionPtr& region, const CacheableKeyPtr& key,
    -    const UserDataPtr& aCallbackArgument){
    -      //const char * regionFullPath = region->getFullPath();
    -      //if (regionFullPath != NULL) {
    -        //const RegionAttributesPtr& rAttrsPtr = region->getAttributes();
    -        return region->getAttributes()->getPartitionResolver();
    -      //}
    -   }*/   
    -
    -  /*BucketServerLocation ClientMetadataService::getServerLocation(ClientMetadataPtr cptr, int bucketId, bool tryPrimary)
    -  {
    -  LOGFINE("Inside getServerLocation");
    -  if (cptr == NULLPTR) {
    -  LOGDEBUG("MetaData does not exist");
    -  return BucketServerLocation();
    -  }
    -  LOGFINE("Ending getServerLocation");
    -  return cptr->getServerLocation(bucketId, tryPrimary);
    -  }*/
    -
    -  void ClientMetadataService::populateDummyServers(const char * regionName, ClientMetadataPtr cptr)
    -  {
    -    WriteGuard guard( m_regionMetadataLock );
    -    m_regionMetaDataMap[regionName] = cptr;
    -  }
    -
    -  void ClientMetadataService::enqueueForMetadataRefresh(
    -    const char* regionFullPath, int8 serverGroupFlag) {
    -      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );
    -      if(tcrdm == NULL)
    -      {
    -        throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");
    -      }
    -      RegionPtr region;
    -      tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionFullPath, region);      
    -      LocalRegion* lregion = dynamic_cast<LocalRegion*> (region.ptr());          
    -      lregion->getRegionStats()->incNonSingleHopCount();//we are here means nonSinglehop
    -
    -      std::string serverGroup = tcrdm->getServerGroup();    
    -      if (serverGroup.length() != 0 ) {    
    -        CacheImpl::setServerGroupFlag(serverGroupFlag);
    -        if (serverGroupFlag == 2 ) {
    -          LOGFINER("Network hop but, from within same server-group, so no metadata fetch from the server");
    -          return;
    -        }
    -      }
    -
    -      if (region != NULLPTR) {       
    -        ThinClientRegion* tcrRegion = dynamic_cast<ThinClientRegion*> (region.ptr());
    -        {
    -          TryWriteGuard guardRegionMetaDataRefresh( tcrRegion->getMataDataMutex(), tcrRegion->getMetaDataRefreshed());
    -          if ( tcrRegion->getMetaDataRefreshed() ) {
    -            return;
    -          }
    -          LOGFINE("Network hop so fetching single hop metadata from the server");
    -          CacheImpl::setNetworkHopFlag(true);
    -          tcrRegion->setMetaDataRefreshed(true);            
    -          std::string* tempRegionPath = new std::string(regionFullPath);
    -          m_regionQueue->put(tempRegionPath);
    -          m_regionQueueSema.release( );
    -        }
    -      }   
    -  }
    -
    -  HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* ClientMetadataService::getServerToFilterMap(
    -    const VectorOfCacheableKey* keys, const RegionPtr& region, bool isPrimary) {
    -      //const char* regionFullPath = region->getFullPath();
    -      ClientMetadataPtr cptr = NULLPTR;
    -      {   
    -        ReadGuard guard( m_regionMetadataLock );
    -        RegionMetadataMapType::iterator cptrIter = m_regionMetaDataMap.find(region->getFullPath());
    -
    -        if(cptrIter != m_regionMetaDataMap.end())
    -        {
    -          cptr = cptrIter->second;
    -        }
    -
    -        if (cptr == NULLPTR || keys == NULL) {
    -          // enqueueForMetadataRefresh(region->getFullPath());
    -          return NULL;
    -          //		//serverLocation = BucketServerLocation();
    -          //		return;
    -        }
    -      }
    -      //int totalNumberOfBuckets = cptr->getTotalNumBuckets();
    -      HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* result = new HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>();
    -      VectorOfCacheableKeyPtr keysWhichLeft(new VectorOfCacheableKey());
    -
    -      std::map<int, BucketServerLocationPtr> buckets;
    -
    -      for (VectorOfCacheableKey::Iterator iter = keys->begin(); iter != keys->end(); iter++) {
    -        CacheableKeyPtr key = *iter;
    -        LOGDEBUG("cmds = %s", key->toString()->toString());
    -        PartitionResolverPtr resolver = region->getAttributes()->getPartitionResolver();
    -        CacheableKeyPtr resolveKey;
    -
    -        if (resolver == NULLPTR) {
    -          // client has not registered PartitionResolver
    -          // Assuming even PR at server side is not using PartitionResolver
    -          resolveKey = key;
    -        } else {
    -          EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, false);
    -          resolveKey = resolver->getRoutingObject(event);
    -        }
    -
    -        int bucketId = std::abs((int)resolveKey->hashcode() % cptr->getTotalNumBuckets());
    -        VectorOfCacheableKeyPtr keyList = NULLPTR;
    -        std::map<int, BucketServerLocationPtr>::iterator bucketsIter =
    -          buckets.find(bucketId);
    -
    -        if (bucketsIter == buckets.end()) {
    -          int8 version = -1;
    -          //BucketServerLocationPtr serverLocation(new BucketServerLocation());
    -          BucketServerLocationPtr serverLocation = NULLPTR;
    -          cptr->getServerLocation(bucketId, isPrimary, serverLocation,version);
    -          if(serverLocation == NULLPTR)
    -          {//HItesh:if server not returns all buckets, need to confiem with PR team about this why??
    -            keysWhichLeft->push_back(key);
    -            continue;
    -          }
    -          else if(!serverLocation->isValid())
    -          {
    -            keysWhichLeft->push_back(key);
    -            continue;
    -          }
    -          //if(serverLocation == NULLPTR)
    -          //continue;//hitesh need to fix
    -          buckets[bucketId] = serverLocation;
    -          HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator itrRes = result->find(serverLocation);
    -          //keyList = (*result)[serverLocation];
    -
    -          if(itrRes == result->end())
    -          {
    -            keyList = new VectorOfCacheableKey();
    -            result->insert(serverLocation, keyList);
    -          }
    -          else
    -            keyList = itrRes.second();
    -          LOGDEBUG("new keylist buckets =%d res = %d", buckets.size(), result->size() );
    -        } else {
    -          keyList = (*result)[bucketsIter->second];
    -        }
    -
    -        keyList->push_back(key);
    -      }
    -
    -      if(keysWhichLeft->size() > 0 && result->size() > 0)
    -      {//add left keys in result
    -        int keyLefts = keysWhichLeft->size();
    -        int totalServers = result->size();
    -        int perServer = keyLefts/totalServers + 1;
    -
    -        int keyIdx = 0;
    -        for(HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator locationIter = result->begin();
    -          locationIter != result->end(); locationIter++)
    -        {
    -          VectorOfCacheableKeyPtr keys = locationIter.second();
    -          for(int i = 0; i < perServer; i++)
    -          {
    -            if(keyIdx < keyLefts)
    -            {
    -              keys->push_back(keysWhichLeft->at(keyIdx++));
    -            }
    -            else
    -              break;
    -          }
    -          if(keyIdx >= keyLefts)
    -            break;//done
    -        }
    -      }
    -      else if(result->size() == 0)
    -      {//not be able to map any key
    -        return NULL;//it will force all keys to send to one server
    -      }
    -
    -      return result;
    -  }
    -
    -
    -  void ClientMetadataService::markPrimaryBucketForTimeout(const RegionPtr& region, const CacheableKeyPtr& key,
    -    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)
    -  {
    -    if(m_bucketWaitTimeout == 0)
    -      return;
    -
    -    WriteGuard guard( m_PRbucketStatusLock ); 
    -
    -    getBucketServerLocation(region, key, value, aCallbackArgument, false/*look for secondary host*/, serverLocation, version);
    -
    -    if(serverLocation != NULLPTR && serverLocation->isValid())
    -    {
    -      LOGDEBUG("Server host and port are %s:%d", serverLocation->getServerName().c_str(), serverLocation->getPort());
    -      int32_t bId = serverLocation->getBucketId();
    -
    -      std::map<std::string, PRbuckets*>::iterator  bs = m_bucketStatus.find(region->getFullPath());
    -
    -      if(bs != m_bucketStatus.end())
    -      {
    -        bs->second->setBucketTimeout(bId);
    -        LOGDEBUG("marking bucket %d as timeout ", bId); 
    -      }
    -    }  
    -  }
    -
    -  HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* ClientMetadataService::groupByBucketOnClientSide(
    -    const RegionPtr& region, CacheableVectorPtr* keySet, ClientMetadataPtr& metadata) {
    -
    -      HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = new HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>();
    -      for (CacheableVector::Iterator itr = (*keySet)->begin(); itr != (*keySet)->end(); ++itr) {
    -        CacheableKeyPtr key = dynCast<CacheableKeyPtr> (*itr);
    -        PartitionResolverPtr resolver = region->getAttributes()->getPartitionResolver();
    -        CacheableKeyPtr resolvekey;
    -        EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, false);
    -        int bucketId = -1;
    -        if (resolver == NULLPTR) {
    -          resolvekey = key;
    -        }
    -        else {
    -          resolvekey = resolver->getRoutingObject(event);
    -          if (resolvekey == NULLPTR) {
    -            throw IllegalStateException("The RoutingObject returned by PartitionResolver is null.");
    -          }
    -        }
    -        CacheableHashSetPtr partitionNames = metadata->getFixedPartitionNames();
    -        if (partitionNames != NULLPTR && partitionNames->size() > 0) {
    -          FixedPartitionResolverPtr fpResolver( dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));
    -          if(fpResolver != NULLPTR) {
    -            const char* partition = fpResolver->getPartitionName(event, partitionNames);
    -            if ( partition == NULL ) {
    -              throw IllegalStateException("partition name returned by Partition resolver is null.");
    -            }
    -            else {
    -              bucketId = metadata->assignFixedBucketId(partition, resolvekey);
    -              if ( bucketId == -1) {
    -                this->enqueueForMetadataRefresh(region->getFullPath(), 0);
    -              }
    -            }
    -          }
    -        }
    -        else {
    -          if (metadata->getTotalNumBuckets() > 0 ) {
    -            bucketId = std::abs((int)resolvekey->hashcode() % metadata->getTotalNumBuckets());
    -          }
    -        }
    -        HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator iter =  bucketToKeysMap->find(CacheableInt32::create(bucketId));
    -        CacheableHashSetPtr bucketKeys;
    -        if (iter == bucketToKeysMap->end()) {
    -          bucketKeys = CacheableHashSet::create();
    -          bucketToKeysMap->insert(CacheableInt32::create(bucketId), bucketKeys);
    -        }
    -        else {
    -          bucketKeys = iter.second();
    -        }
    -        bucketKeys->insert(key);
    -      }
    -      return bucketToKeysMap;
    -  }
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* ClientMetadataService::getServerToFilterMapFESHOP(
    -  CacheableVectorPtr* routingKeys, const RegionPtr& region, bool isPrimary) {
    -
    -    ClientMetadataPtr cptr = getClientMetadata(region->getFullPath());
    -
    -    if (cptr == NULLPTR /*|| cptr->adviseRandomServerLocation() == NULLPTR*/) {
    -      enqueueForMetadataRefresh(region->getFullPath(), 0);
    -      return NULL;
    -    }
    -
    -    if (routingKeys == NULL) {
    -      return NULL;
    -    }
    -
    -    HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = groupByBucketOnClientSide(region, routingKeys, cptr);
    -    CacheableHashSetPtr bucketSet = CacheableHashSet::create();
    -    for (HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator iter = bucketToKeysMap->begin(); iter != bucketToKeysMap->end(); ++iter) {
    -      bucketSet->insert(iter.first());
    -    }
    -    LOGDEBUG("ClientMetadataService::getServerToFilterMapFESHOP: bucketSet size = %d ", bucketSet->size());
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToBuckets = groupByServerToBuckets(cptr, bucketSet, isPrimary);
    -
    -    if (serverToBuckets == NULL) {
    -      delete bucketToKeysMap;
    -      return NULL;
    -    }
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToKeysMap = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -
    -    for (HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBuckets->begin(); itrRes != serverToBuckets->end(); ++itrRes) {
    -      BucketServerLocationPtr serverLocation = itrRes.first();
    -      CacheableHashSetPtr buckets = itrRes.second();
    -      for(CacheableHashSet::Iterator bucket = buckets->begin(); bucket != buckets->end(); ++bucket) {
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator iter = serverToKeysMap->find(serverLocation);
    -        CacheableHashSetPtr keys;
    -        if (iter == serverToKeysMap->end()) {
    -          keys = CacheableHashSet::create();
    -        }
    -        else {
    -          keys = iter.second();
    -        }
    -        HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator bucketToKeysiter = bucketToKeysMap->find(*bucket);
    -        if (bucketToKeysiter !=  bucketToKeysMap->end()) {
    -          CacheableHashSetPtr bkeys = bucketToKeysiter.second();
    -          for (CacheableHashSet::Iterator itr = bkeys->begin(); itr != bkeys->end(); ++itr) {
    -            keys->insert(*itr);
    -          }
    -        }
    -        serverToKeysMap->insert(serverLocation, keys);
    -      }
    -    }
    -    delete bucketToKeysMap;
    -    delete serverToBuckets;
    -    return serverToKeysMap;
    -}
    -
    -BucketServerLocationPtr ClientMetadataService::findNextServer
    -  (HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToBucketsMap, CacheableHashSetPtr& currentBucketSet) {
    -
    -    BucketServerLocationPtr serverLocation;
    -    int max = -1;
    -    std::vector<BucketServerLocationPtr> nodesOfEqualSize;
    -    for(HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itr = serverToBucketsMap->begin(); itr != serverToBucketsMap->end(); ++itr) {
    -      CacheableHashSetPtr buckets = CacheableHashSet::create();
    -      CacheableHashSetPtr sBuckets = itr.second();
    -
    -      for(CacheableHashSet::Iterator sItr = sBuckets->begin(); sItr != sBuckets->end(); ++sItr) {
    -        buckets->insert(*sItr);
    -      }
    -
    -      LOGDEBUG("ClientMetadataService::findNextServer currentBucketSet->size() = %d  bucketSet->size() = %d ", currentBucketSet->size(), buckets->size());
    -
    -      for(CacheableHashSet::Iterator currentBucketSetIter = currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); ++currentBucketSetIter) {        
    -        buckets->erase(*currentBucketSetIter);
    -        LOGDEBUG("ClientMetadataService::findNextServer bucketSet->size() = %d ",  buckets->size());        
    -      }
    -
    -      int size = buckets->size();
    -      if (max < size) {
    -        max = size;
    -        serverLocation = itr.first();
    -        nodesOfEqualSize.clear();
    -        nodesOfEqualSize.push_back(serverLocation);
    -      }
    -      else if (max == size){
    -        nodesOfEqualSize.push_back(serverLocation);
    -      }
    -    }
    -
    -    int nodeSize = nodesOfEqualSize.size();
    -    if(nodeSize > 0) {
    -      RandGen randgen;
    -      int random = randgen(nodeSize);
    -      return nodesOfEqualSize.at(random);
    -    }
    -    return NULLPTR;
    -}
    -
    -bool ClientMetadataService::AreBucketSetsEqual(CacheableHashSetPtr& currentBucketSet, CacheableHashSetPtr& bucketSet) {
    -
    -  int32_t currentBucketSetSize = currentBucketSet->size();
    -  int32_t bucketSetSetSize = bucketSet->size();
    -
    -  LOGDEBUG("ClientMetadataService::AreBucketSetsEqual currentBucketSetSize = %d bucketSetSetSize = %d ", currentBucketSetSize, bucketSetSetSize);
    -
    -  if (currentBucketSetSize != bucketSetSetSize) {
    -    return false;
    -  }
    -
    -  bool found = false;
    -  for(CacheableHashSet::Iterator currentBucketSetIter = currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); ++currentBucketSetIter) {
    -    found=false;
    -    for(CacheableHashSet::Iterator bucketSetIter = bucketSet->begin(); bucketSetIter != bucketSet->end(); ++bucketSetIter) {
    -      if(*currentBucketSetIter == *bucketSetIter) {
    -        found = true;
    -        break;
    -      }
    -    }
    -    if (!found) return false;
    -  }
    -  return true;
    -}
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* ClientMetadataService::
    -  pruneNodes(ClientMetadataPtr& metadata, CacheableHashSetPtr&  buckets) {
    -
    -    CacheableHashSetPtr bucketSetWithoutServer = CacheableHashSet::create();
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToBucketsMap
    -      = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* prunedServerToBucketsMap
    -      = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -
    -    for(CacheableHashSet::Iterator bucketId= buckets->begin(); bucketId != buckets->end(); ++bucketId) {
    -      CacheableInt32Ptr bID = *bucketId;
    -      std::vector<BucketServerLocationPtr > locations = metadata->adviseServerLocations(bID->value());
    -      if (locations.size() == 0) {
    -        LOGDEBUG("ClientMetadataService::pruneNodes Since no server location available for bucketId = %d  putting it into bucketSetWithoutServer ", bID->value() );
    -        bucketSetWithoutServer->insert(bID);
    -        continue;
    -      }
    -
    -      for (std::vector<BucketServerLocationPtr >::iterator location= locations.begin(); location != locations.end(); ++location) {
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBucketsMap->find(*location);
    -        CacheableHashSetPtr bucketSet;
    -        if (itrRes == serverToBucketsMap->end()) {
    -          bucketSet = CacheableHashSet::create();
    -        }
    -        else {
    -          bucketSet = itrRes.second();
    -        }
    -        bucketSet->insert(bID);
    -        serverToBucketsMap->insert(*location, bucketSet);
    -      }
    -    }
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBucketsMap->begin();
    -    CacheableHashSetPtr currentBucketSet = CacheableHashSet::create();
    -    BucketServerLocationPtr randomFirstServer;
    -    if(serverToBucketsMap->empty()) {
    -      LOGDEBUG("ClientMetadataService::pruneNodes serverToBucketsMap is empty so returning NULL");      
    -      delete prunedServerToBucketsMap;
    -      delete serverToBucketsMap;
    -      return NULL;
    -    }
    -    else {
    -      size_t size = serverToBucketsMap->size();
    -      LOGDEBUG("ClientMetadataService::pruneNodes Total size of serverToBucketsMap = %d ", size);
    -      for ( size_t idx = 0; idx < (rand() % size); idx++) {
    -        itrRes++;
    -      }
    -      randomFirstServer = itrRes.first();
    -    }
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes1 = serverToBucketsMap->find(randomFirstServer);
    -    CacheableHashSetPtr bucketSet = itrRes1.second();
    -
    -    for(CacheableHashSet::Iterator bt = bucketSet->begin(); bt != bucketSet->end(); ++bt) {
    -      currentBucketSet->insert(*bt);
    -    }
    -    prunedServerToBucketsMap->insert(randomFirstServer, bucketSet);
    -    serverToBucketsMap->erase(randomFirstServer);
    -
    -    while(!AreBucketSetsEqual(currentBucketSet, buckets)) {
    -      BucketServerLocationPtr server = findNextServer(serverToBucketsMap, currentBucketSet);
    -      if (server == NULLPTR) {
    -        LOGDEBUG("ClientMetadataService::pruneNodes findNextServer returned no server");
    -        break;
    -      }
    -
    -      HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes2 = serverToBucketsMap->find(server);
    -      CacheableHashSetPtr bucketSet2 = itrRes2.second();
    -
    -      LOGDEBUG("ClientMetadataService::pruneNodes currentBucketSet->size() = %d  bucketSet2->size() = %d ", currentBucketSet->size(), bucketSet2->size());
    -
    -      for(CacheableHashSet::Iterator currentBucketSetIter = currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); ++currentBucketSetIter) {        
    -        bucketSet2->erase(*currentBucketSetIter);
    -        LOGDEBUG("ClientMetadataService::pruneNodes bucketSet2->size() = %d ",  bucketSet2->size());        
    -      }
    -
    -      if (bucketSet2->empty()) {
    -        LOGDEBUG("ClientMetadataService::pruneNodes bucketSet2 is empty() so removing server from serverToBucketsMap");
    -        serverToBucketsMap->erase(server);
    -        continue;
    -      }
    -
    -      for(CacheableHashSet::Iterator itr = bucketSet2->begin(); itr != bucketSet2->end(); ++itr) {
    -        currentBucketSet->insert(*itr);
    -      }
    -
    -      prunedServerToBucketsMap->insert(server, bucketSet2);
    -      serverToBucketsMap->erase(server);
    -    }
    -
    -    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes2 = prunedServerToBucketsMap->begin();
    -    for(CacheableHashSet::Iterator itr = bucketSetWithoutServer->begin(); itr != bucketSetWithoutServer->end(); ++itr) {
    -      CacheableInt32Ptr buckstId = *itr;
    -      itrRes2.second()->insert(buckstId);
    -    }
    -
    -    delete serverToBucketsMap;
    -    return prunedServerToBucketsMap;
    -}
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* ClientMetadataService::
    -  groupByServerToAllBuckets(const RegionPtr& region, bool optimizeForWrite) {
    -
    -    ClientMetadataPtr cptr = getClientMetadata(region->getFullPath());
    -    if (cptr == NULLPTR ) {
    -      enqueueForMetadataRefresh(region->getFullPath(), false);
    -      return NULL;
    -    }
    -    int totalBuckets = cptr->getTotalNumBuckets();
    -    CacheableHashSetPtr bucketSet = CacheableHashSet::create();
    -    for(int i=0; i<totalBuckets; i++) {
    -      bucketSet->insert(CacheableInt32::create(i));
    -    }
    -    return groupByServerToBuckets(cptr, bucketSet, optimizeForWrite);
    -}
    -
    -HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* ClientMetadataService::
    -  groupByServerToBuckets(ClientMetadataPtr& metadata, CacheableHashSetPtr& bucketSet, bool optimizeForWrite) {
    -    if (optimizeForWrite) {
    -      HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToBucketsMap
    -        = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();
    -      CacheableHashSetPtr bucketsWithoutServer = CacheableHashSet::create();
    -      for(CacheableHashSet::Iterator itr = bucketSet->begin(); itr != bucketSet->end(); ++itr ) {
    -        CacheableInt32Ptr bucketId = *itr;
    -        BucketServerLocationPtr serverLocation = metadata->advisePrimaryServerLocation(bucketId->value());
    -        if (serverLocation == NULLPTR) {
    -          bucketsWithoutServer->insert(bucketId);
    -          continue;
    -        }
    -        else if(!serverLocation->isValid()) {
    -          bucketsWithoutServer->insert(bucketId);
    -          continue;
    -        }
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBucketsMap->find(serverLocation);
    -        CacheableHashSetPtr buckets;
    -        if(itrRes == serverToBucketsMap->end()) {
    -          buckets = CacheableHashSet::create();
    -          serverToBucketsMap->insert(serverLocation, buckets);
    -        }
    -        else {
    -          buckets = itrRes.second();
    -        }
    -        buckets->insert(bucketId);
    -      }
    -
    -      if (!serverToBucketsMap->empty()) {
    -        HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBucketsMap->begin();
    -        for(CacheableHashSet::Iterator itr = bucketsWithoutServer->begin(); itr != bucketsWithoutServer->end(); ++itr ) {
    -          itrRes.second()->insert(*itr);
    -          LOGDEBUG("ClientMetadataService::groupByServerToBuckets inserting bucketsWithoutServer");
    -        }
    -      }
    -      return serverToBucketsMap;
    -    }
    -    else {
    -      return pruneNodes(metadata, bucketSet);
    -    }
    -}
    -
    -void ClientMetadataService::markPrimaryBucketForTimeoutButLookSecondaryBucket(const RegionPtr& region, const CacheableKeyPtr& key,
    -    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)
    -{
    -  if(m_bucketWaitTimeout == 0)
    -    return;
    -
    -  WriteGuard guard( m_PRbucketStatusLock ); 
    -
    -  std::map<std::string, PRbuckets*>::iterator  bs = m_bucketStatus.find(region->getFullPath());
    -
    -  PRbuckets* prBuckets = NULL;
    -  if(bs != m_bucketStatus.end())
    -  {
    -    prBuckets = bs->second;    
    -  }
    -
    -  if(prBuckets == NULL)
    -    return;
    -
    -  getBucketServerLocation(region, key, value, aCallbackArgument, true, serverLocation, version);
    -
    -
    -  ClientMetadataPtr cptr = NULLPTR;
    -  {   
    -   ReadGuard guard( m_regionMetadataLock );
    -   RegionMetadataMapType::iterator cptrIter = m_regionMetaDataMap.find(region->getFullPath());
    -
    -   if(cptrIter != m_regionMetaDataMap.end())
    -   {
    -     cptr = cptrIter->second;
    -   }
    -
    -   if (cptr == NULLPTR ) {
    -     return ;
    -   }
    -  }
    -
    -  LOGFINE("Setting in markPrimaryBucketForTimeoutButLookSecondaryBucket");
    -
    -  int32_t totalBuckets = cptr->getTotalNumBuckets();
    -
    -  for(int32_t i =0; i< totalBuckets; i++)
    -  {
    -    int8_t version;
    -    BucketServerLocationPtr bsl;
    -    cptr->getServerLocation(i, false, bsl, version);
    -
    -    if(bsl == serverLocation)
    -    {
    -      prBuckets->setBucketTimeout(i);
    -      LOGFINE("markPrimaryBucketForTimeoutButLookSecondaryBucket::setting bucket timeout...");
    -    }
    -  }
    -
    -}
    -
    -bool ClientMetadataService::isBucketMarkedForTimeout(const char * regionFullPath, int32_t bucketid)
    -{
    -  if(m_bucketWaitTimeout == 0)
    -    return false;
    -
    -  ReadGuard guard( m_PRbucketStatusLock );
    -  
    -  std::map<std::string, PRbuckets*>::iterator  bs = m_bucketStatus.find(regionFullPath);
    -
    -  if(bs != m_bucketStatus.end())
    -  {
    -    bool m = bs->second->isBucketTimedOut(bucketid, m_bucketWaitTimeout);
    -    if(m == true) {
    -      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) ); 
    -      CacheImpl * cache = tcrdm->getConnectionManager().getCacheImpl();
    -      cache->setBlackListBucketTimeouts();
    -    }
    -    LOGFINE("isBucketMarkedForTimeout:: for bucket %d returning = %d", bucketid, m); 
    -    
    -    return m;
    -  }
    -
    -  return false;
    -}
    -
    -}
    -
    +/*========================================================================= * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. * This product is protected by U.S. and international copyright * and intellectual property laws. Pivotal products are covered by * one or more patents listed at http://www.pivotal.io/patents. *========================================================================= */#include "TcrMessage.hpp"#include "ClientMetadataService.hpp"#include "ThinClientPoolDM.hpp"#include "../FixedPartitionResolver.hpp"#include <iterator>#include <cstdlib>#include <climits>namespace gemfire {  const char * ClientMetadataService::NC_CMDSvcThread = "NC CMDSvcThread";  ClientMetadataService::~ClientMetadataService ()  {    delete m_regionQueue;    if(m_bucketWaitTimeout > 0)    {      try      {        std::map<std::string, PRbuckets*>::iterator  bi;        for (bi = m_bucketStatus.begin(); bi != m_bucketStatus.end(); ++
 bi)        {          delete bi->second;        }      }catch(...)      {        LOGINFO("Exception in ClientMetadataService destructor");      }    }  }  ClientMetadataService::ClientMetadataService(PoolPtr pool)    /* adongre    * CID 28928: Uninitialized scalar field (UNINIT_CTOR)    */    : m_run(false)  {    m_regionQueue = new Queue<std::string>(false);    m_pool = pool;    m_bucketWaitTimeout = DistributedSystem::getSystemProperties()->bucketWaitTimeout();  }  int ClientMetadataService::svc() {    DistributedSystemImpl::setThreadName(NC_CMDSvcThread);    LOGINFO("ClientMetadataService started for pool %s", m_pool->getName());     while (m_run) {      m_regionQueueSema.acquire( );      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );       CacheImpl * cache = tcrdm->getConnectionManager().getCacheImpl();      while(true)      {        std::string * regionFullPath = m_regionQueue->get
 ();        if(regionFullPath != NULL && regionFullPath->c_str() != NULL )        {          while(true)           {            if(m_regionQueue->size() > 0)             {              std::string * nextRegionFullPath = m_regionQueue->get();              if(nextRegionFullPath != NULL && nextRegionFullPath->c_str() != NULL && regionFullPath->compare(nextRegionFullPath->c_str()) == 0)              {                delete nextRegionFullPath;//we are going for same              }              else              {                //different region; put it back                m_regionQueue->put(nextRegionFullPath);                break;              }            } else              break;          }        }        if (!cache->isCacheDestroyPending() && regionFullPath != NULL && regionFullPath->c_str() != NULL) {          getClientPRMetadata(regionFullPath->c_str());          delete regionFullPath;          regionFullPath = NULL;      
   }        else        {          delete regionFullPath;          regionFullPath = NULL;          break;        }      }      //while(m_regionQueueSema.tryacquire( ) != -1); // release all    }    LOGINFO("ClientMetadataService stopped for pool %s", m_pool->getName());    return 0;  }  void  ClientMetadataService::getClientPRMetadata(const char * regionFullPath) {    if (regionFullPath == NULL) return;    ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );    if(tcrdm == NULL)    {      throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");    }    // That means metadata for the region not found, So only for the first time for a particular region use GetClientPartitionAttributesOp    // TcrMessage to fetch the metadata and put it into map for later use.send this message to server and get metadata from server.        TcrMessage reply;    std::string path(regionFullPath);    Clie
 ntMetadataPtr cptr = NULLPTR;	{	  ReadGuard guard( m_regionMetadataLock );    RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);    if (itr != m_regionMetaDataMap.end()) {      cptr = itr->second;    }	  //cptr = m_regionMetaDataMap[path];	}    ClientMetadataPtr newCptr = NULLPTR;      if (cptr == NULLPTR) {        TcrMessage request(TcrMessage::GET_CLIENT_PARTITION_ATTRIBUTES, regionFullPath);        GfErrType err = tcrdm->sendSyncRequest(request, reply);                if (err == GF_NOERR && reply.getMessageType() == TcrMessage::RESPONSE_CLIENT_PARTITION_ATTRIBUTES) {          cptr = new ClientMetadata(reply.getNumBuckets(), reply.getColocatedWith(), tcrdm, reply.getFpaSet());          if(m_bucketWaitTimeout > 0 && reply.getNumBuckets() > 0)          {            WriteGuard guard( m_PRbucketStatusLock );            m_bucketStatus[regionFullPath] = new PRbuckets(reply.getNumBuckets());          }          LOGDEBUG("ClientMetadata b
 uckets %d ", reply.getNumBuckets());        }              }    if (cptr == NULLPTR) {      return;    }    CacheableStringPtr colocatedWith;    colocatedWith = cptr->getColocatedWith();    if (colocatedWith == NULLPTR) {           newCptr = SendClientPRMetadata(regionFullPath,cptr);      //now we will get new instance so assign it again      if(newCptr != NULLPTR)       {        cptr->setPreviousone(NULLPTR);        newCptr->setPreviousone(cptr);        WriteGuard guard( m_regionMetadataLock );        m_regionMetaDataMap[path] = newCptr;		LOGFINE("Updated client meta data");      }    }    else {      newCptr = SendClientPRMetadata(colocatedWith->asChar(),cptr);               if(newCptr != NULLPTR)       {        cptr->setPreviousone(NULLPTR);        newCptr->setPreviousone(cptr);        //now we will get new instance so assign it again        WriteGuard guard( m_regionMetadataLock );        m_regionMetaDataMap[colocatedWith->
 asChar()] = newCptr;        m_regionMetaDataMap[path] = newCptr;		LOGFINE("Updated client meta data");      }    }      }  ClientMetadataPtr ClientMetadataService::SendClientPRMetadata(const char* regionPath, ClientMetadataPtr cptr) {    ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );    if(tcrdm == NULL)    {      throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");    }    TcrMessage request(TcrMessage::GET_CLIENT_PR_METADATA, regionPath);    TcrMessage reply;    // send this message to server and get metadata from server.    LOGFINE("Now sending GET_CLIENT_PR_METADATA for getting from server: %s",regionPath);    RegionPtr region = NULLPTR;        GfErrType err = tcrdm->sendSyncRequest(request, reply);        if (err == GF_NOERR && reply.getMessageType() == TcrMessage::RESPONSE_CLIENT_PR_METADATA) {      tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionPath, region)
 ;            if (region != NULLPTR) {               LocalRegion* lregion = dynamic_cast<LocalRegion*> (region.ptr());                  lregion->getRegionStats()->incMetaDataRefreshCount();              }            std::vector<BucketServerLocationsType>* metadata = reply.getMetadata();      if(metadata == NULL)        return NULLPTR;      if ( metadata->empty()) {        delete metadata;        return NULLPTR;      }      ClientMetadata* newCptr = new ClientMetadata(*(cptr.ptr()));      for (std::vector<BucketServerLocationsType>::iterator iter = metadata->begin();        iter != metadata->end(); ++iter) {                if (!(*iter).empty()) {            newCptr->updateBucketServerLocations((*iter).at(0)->getBucketId(), (*iter));                             }      }      delete metadata;      ClientMetadataPtr newCMDPtr(newCptr);      return newCMDPtr;    }    return NULLPTR;  }  void ClientMetadataService::getBucketServerLocation
 (const RegionPtr& region, const CacheableKeyPtr& key,    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)  {    //ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_regionMetadataLock );        if (region != NULLPTR) {	  ReadGuard guard( m_regionMetadataLock );	  LOGDEBUG("ClientMetadataService::getBucketServerLocation m_regionMetaDataMap size is %d",m_regionMetaDataMap.size());	      std::string path(region->getFullPath());    ClientMetadataPtr cptr = NULLPTR;    RegionMetadataMapType::iterator itr = m_regionMetaDataMap.find(path);    if (itr != m_regionMetaDataMap.end()) {      cptr = itr->second;    }      //ClientMetadataPtr cptr = m_regionMetaDataMap[path];      if (cptr == NULLPTR) {        //serverLocation = BucketServerLocation();        return;      }      CacheableKeyPtr resolvekey;      const PartitionResolverPtr& resolver = region->getAttributes()->getParti
 tionResolver();            EntryEvent event(region, key, value, NULLPTR, aCallbackArgument, false);      int bucketId = 0;      if (resolver == NULLPTR) {        resolvekey = key;      }      else {        resolvekey = resolver->getRoutingObject(event);        if (resolvekey == NULLPTR) {          throw IllegalStateException("The RoutingObject returned by PartitionResolver is null.");        }      }      CacheableHashSetPtr& partNames = cptr->getFixedPartitionNames();      if (partNames != NULLPTR && partNames->size() > 0) {                FixedPartitionResolverPtr fpResolver( dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));        if(fpResolver != NULLPTR) {                 const char* partition = fpResolver->getPartitionName(event, partNames);                  if ( partition == NULL ) {            throw IllegalStateException("partition name returned by Partition resolver is null.");          }          else {            bucketI
 d = cptr->assignFixedBucketId(partition, resolvekey);            if ( bucketId == -1) {              return;            }                    }        }      }            else {        if (cptr->getTotalNumBuckets() > 0 ) {                    bucketId = std::abs((int)resolvekey->hashcode() % cptr->getTotalNumBuckets());                  }      }      cptr->getServerLocation(bucketId, isPrimary, serverLocation, version);    }      }  void ClientMetadataService::removeBucketServerLocation(BucketServerLocation serverLocation)  {    ReadGuard guard( m_regionMetadataLock );    for(RegionMetadataMapType::iterator regionMetadataIter= m_regionMetaDataMap.begin();      regionMetadataIter!=m_regionMetaDataMap.end();      regionMetadataIter++) {        ClientMetadataPtr cptr = (*regionMetadataIter).second;        if (cptr != NULLPTR) {          //Yogesh has commented out this as it was causing a SIGV          //cptr->removeBucketServerLocation(s
 erverLocation);        }    }  }  ClientMetadataPtr ClientMetadataService::getClientMetadata(const char* regionFullPath)  {    ReadGuard guard( m_regionMetadataLock );    RegionMetadataMapType::iterator regionMetadataIter = m_regionMetaDataMap.find(regionFullPath);    if (regionMetadataIter != m_regionMetaDataMap.end()) {      return (*regionMetadataIter).second;    }    return NULLPTR;  } /*const  PartitionResolverPtr& ClientMetadataService::getResolver(const RegionPtr& region, const CacheableKeyPtr& key,    const UserDataPtr& aCallbackArgument){      //const char * regionFullPath = region->getFullPath();      //if (regionFullPath != NULL) {        //const RegionAttributesPtr& rAttrsPtr = region->getAttributes();        return region->getAttributes()->getPartitionResolver();      //}   }*/     /*BucketServerLocation ClientMetadataService::getServerLocation(ClientMetadataPtr cptr, int bucketId, bool tryPrimary)  {  LOGFINE("Inside 
 getServerLocation");  if (cptr == NULLPTR) {  LOGDEBUG("MetaData does not exist");  return BucketServerLocation();  }  LOGFINE("Ending getServerLocation");  return cptr->getServerLocation(bucketId, tryPrimary);  }*/  void ClientMetadataService::populateDummyServers(const char * regionName, ClientMetadataPtr cptr)  {    WriteGuard guard( m_regionMetadataLock );    m_regionMetaDataMap[regionName] = cptr;  }  void ClientMetadataService::enqueueForMetadataRefresh(    const char* regionFullPath, int8 serverGroupFlag) {      ThinClientPoolDM* tcrdm = dynamic_cast< ThinClientPoolDM* >( m_pool.ptr( ) );      if(tcrdm == NULL)      {        throw IllegalArgumentException("ClientMetaData: pool cast to ThinClientPoolDM failed");      }      RegionPtr region;      tcrdm->getConnectionManager().getCacheImpl()->getRegion(regionFullPath, region);            LocalRegion* lregion = dynamic_cast<LocalRegion*> (region.ptr());                lregion->ge
 tRegionStats()->incNonSingleHopCount();//we are here means nonSinglehop      std::string serverGroup = tcrdm->getServerGroup();          if (serverGroup.length() != 0 ) {            CacheImpl::setServerGroupFlag(serverGroupFlag);        if (serverGroupFlag == 2 ) {          LOGFINER("Network hop but, from within same server-group, so no metadata fetch from the server");          return;        }      }      if (region != NULLPTR) {               ThinClientRegion* tcrRegion = dynamic_cast<ThinClientRegion*> (region.ptr());        {          TryWriteGuard guardRegionMetaDataRefresh( tcrRegion->getMataDataMutex(), tcrRegion->getMetaDataRefreshed());          if ( tcrRegion->getMetaDataRefreshed() ) {            return;          }          LOGFINE("Network hop so fetching single hop metadata from the server");          CacheImpl::setNetworkHopFlag(true);          tcrRegion->setMetaDataRefreshed(true);                      std::string* tempRegion
 Path = new std::string(regionFullPath);          m_regionQueue->put(tempRegionPath);          m_regionQueueSema.release( );        }      }     }  HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* ClientMetadataService::getServerToFilterMap(    const VectorOfCacheableKey* keys, const RegionPtr& region, bool isPrimary) {      //const char* regionFullPath = region->getFullPath();      ClientMetadataPtr cptr = NULLPTR;      {           ReadGuard guard( m_regionMetadataLock );        RegionMetadataMapType::iterator cptrIter = m_regionMetaDataMap.find(region->getFullPath());        if(cptrIter != m_regionMetaDataMap.end())        {          cptr = cptrIter->second;        }        if (cptr == NULLPTR || keys == NULL) {          // enqueueForMetadataRefresh(region->getFullPath());          return NULL;          //		//serverLocation = BucketServerLocation();          //		return;        }      }      //int totalNumberOfBuckets 
 = cptr->getTotalNumBuckets();      HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* result = new HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>();      VectorOfCacheableKeyPtr keysWhichLeft(new VectorOfCacheableKey());      std::map<int, BucketServerLocationPtr> buckets;      for (VectorOfCacheableKey::Iterator iter = keys->begin(); iter != keys->end(); iter++) {        CacheableKeyPtr key = *iter;        LOGDEBUG("cmds = %s", key->toString()->toString());        PartitionResolverPtr resolver = region->getAttributes()->getPartitionResolver();        CacheableKeyPtr resolveKey;        if (resolver == NULLPTR) {          // client has not registered PartitionResolver          // Assuming even PR at server side is not using PartitionResolver          resolveKey = key;        } else {          EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, false);          resolveKey = resolver->getRoutingObject(event);        } 
        int bucketId = std::abs((int)resolveKey->hashcode() % cptr->getTotalNumBuckets());        VectorOfCacheableKeyPtr keyList = NULLPTR;        std::map<int, BucketServerLocationPtr>::iterator bucketsIter =          buckets.find(bucketId);        if (bucketsIter == buckets.end()) {          int8 version = -1;          //BucketServerLocationPtr serverLocation(new BucketServerLocation());          BucketServerLocationPtr serverLocation = NULLPTR;          cptr->getServerLocation(bucketId, isPrimary, serverLocation,version);          if(serverLocation == NULLPTR)          {//HItesh:if server not returns all buckets, need to confiem with PR team about this why??            keysWhichLeft->push_back(key);            continue;          }          else if(!serverLocation->isValid())          {            keysWhichLeft->push_back(key);            continue;          }          //if(serverLocation == NULLPTR)          //continue;//hitesh need to fix
           buckets[bucketId] = serverLocation;          HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator itrRes = result->find(serverLocation);          //keyList = (*result)[serverLocation];          if(itrRes == result->end())          {            keyList = new VectorOfCacheableKey();            result->insert(serverLocation, keyList);          }          else            keyList = itrRes.second();          LOGDEBUG("new keylist buckets =%d res = %d", buckets.size(), result->size() );        } else {          keyList = (*result)[bucketsIter->second];        }        keyList->push_back(key);      }      if(keysWhichLeft->size() > 0 && result->size() > 0)      {//add left keys in result        int keyLefts = keysWhichLeft->size();        int totalServers = result->size();        int perServer = keyLefts/totalServers + 1;        int keyIdx = 0;        for(HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr
 >::Iterator locationIter = result->begin();          locationIter != result->end(); locationIter++)        {          VectorOfCacheableKeyPtr keys = locationIter.second();          for(int i = 0; i < perServer; i++)          {            if(keyIdx < keyLefts)            {              keys->push_back(keysWhichLeft->at(keyIdx++));            }            else              break;          }          if(keyIdx >= keyLefts)            break;//done        }      }      else if(result->size() == 0)      {//not be able to map any key        return NULL;//it will force all keys to send to one server      }      return result;  }  void ClientMetadataService::markPrimaryBucketForTimeout(const RegionPtr& region, const CacheableKeyPtr& key,    const CacheablePtr& value,const UserDataPtr& aCallbackArgument, bool isPrimary, BucketServerLocationPtr& serverLocation,  int8_t& version)  {    if(m_bucketWaitTimeout == 0)      return;    W
 riteGuard guard( m_PRbucketStatusLock );     getBucketServerLocation(region, key, value, aCallbackArgument, false/*look for secondary host*/, serverLocation, version);    if(serverLocation != NULLPTR && serverLocation->isValid())    {      LOGDEBUG("Server host and port are %s:%d", serverLocation->getServerName().c_str(), serverLocation->getPort());      int32_t bId = serverLocation->getBucketId();      std::map<std::string, PRbuckets*>::iterator  bs = m_bucketStatus.find(region->getFullPath());      if(bs != m_bucketStatus.end())      {        bs->second->setBucketTimeout(bId);        LOGDEBUG("marking bucket %d as timeout ", bId);       }    }    }  HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* ClientMetadataService::groupByBucketOnClientSide(    const RegionPtr& region, CacheableVectorPtr* keySet, ClientMetadataPtr& metadata) {      HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = new HashMapT<CacheableInt32Ptr, Ca
 cheableHashSetPtr>();      for (CacheableVector::Iterator itr = (*keySet)->begin(); itr != (*keySet)->end(); ++itr) {        CacheableKeyPtr key = dynCast<CacheableKeyPtr> (*itr);        PartitionResolverPtr resolver = region->getAttributes()->getPartitionResolver();        CacheableKeyPtr resolvekey;        EntryEvent event(region, key, NULLPTR, NULLPTR, NULLPTR, false);        int bucketId = -1;        if (resolver == NULLPTR) {          resolvekey = key;        }        else {          resolvekey = resolver->getRoutingObject(event);          if (resolvekey == NULLPTR) {            throw IllegalStateException("The RoutingObject returned by PartitionResolver is null.");          }        }        CacheableHashSetPtr partitionNames = metadata->getFixedPartitionNames();        if (partitionNames != NULLPTR && partitionNames->size() > 0) {          FixedPartitionResolverPtr fpResolver( dynamic_cast<FixedPartitionResolver*> (resolver.ptr() ));     
      if(fpResolver != NULLPTR) {            const char* partition = fpResolver->getPartitionName(event, partitionNames);            if ( partition == NULL ) {              throw IllegalStateException("partition name returned by Partition resolver is null.");            }            else {              bucketId = metadata->assignFixedBucketId(partition, resolvekey);              if ( bucketId == -1) {                this->enqueueForMetadataRefresh(region->getFullPath(), 0);              }            }          }        }        else {          if (metadata->getTotalNumBuckets() > 0 ) {            bucketId = std::abs((int)resolvekey->hashcode() % metadata->getTotalNumBuckets());          }        }        HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator iter =  bucketToKeysMap->find(CacheableInt32::create(bucketId));        CacheableHashSetPtr bucketKeys;        if (iter == bucketToKeysMap->end()) {          bucketKeys = CacheableHas
 hSet::create();          bucketToKeysMap->insert(CacheableInt32::create(bucketId), bucketKeys);        }        else {          bucketKeys = iter.second();        }        bucketKeys->insert(key);      }      return bucketToKeysMap;  }HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* ClientMetadataService::getServerToFilterMapFESHOP(  CacheableVectorPtr* routingKeys, const RegionPtr& region, bool isPrimary) {    ClientMetadataPtr cptr = getClientMetadata(region->getFullPath());    if (cptr == NULLPTR /*|| cptr->adviseRandomServerLocation() == NULLPTR*/) {      enqueueForMetadataRefresh(region->getFullPath(), 0);      return NULL;    }    if (routingKeys == NULL) {      return NULL;    }    HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>* bucketToKeysMap = groupByBucketOnClientSide(region, routingKeys, cptr);    CacheableHashSetPtr bucketSet = CacheableHashSet::create();    for (HashMapT<CacheableInt32Ptr, CacheableHashSetP
 tr>::Iterator iter = bucketToKeysMap->begin(); iter != bucketToKeysMap->end(); ++iter) {      bucketSet->insert(iter.first());    }    LOGDEBUG("ClientMetadataService::getServerToFilterMapFESHOP: bucketSet size = %d ", bucketSet->size());    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToBuckets = groupByServerToBuckets(cptr, bucketSet, isPrimary);    if (serverToBuckets == NULL) {      delete bucketToKeysMap;      return NULL;    }    HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>* serverToKeysMap = new HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>();    for (HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itrRes = serverToBuckets->begin(); itrRes != serverToBuckets->end(); ++itrRes) {      BucketServerLocationPtr serverLocation = itrRes.first();      CacheableHashSetPtr buckets = itrRes.second();      for(CacheableHashSet::Iterator bucket = buckets->begin(); bucket != buckets->end(); ++bucket) {
         HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator iter = serverToKeysMap->find(serverLocation);        CacheableHashSetPtr keys;        if (iter == serverToKeysMap->end()) {          keys = CacheableHashSet::create();        }        else {          keys = iter.second();        }        HashMapT<CacheableInt32Ptr, CacheableHashSetPtr>::Iterator bucketToKeysiter = bucketToKeysMap->find(*bucket);        if (bucketToKeysiter !=  bucketToKeysMap->end()) {          CacheableHashSetPtr bkeys = bucketToKeysiter.second();          for (CacheableHashSet::Iterator itr = bkeys->begin(); itr != bkeys->end(); ++itr) {            keys->insert(*itr);          }        }        serverToKeysMap->insert(serverLocation, keys);      }    }    delete bucketToKeysMap;    delete serverToBuckets;    return serverToKeysMap;}BucketServerLocationPtr ClientMetadataService::findNextServer  (HashMapT<BucketServerLocationPtr, CacheableHashSetP
 tr>* serverToBucketsMap, CacheableHashSetPtr& currentBucketSet) {    BucketServerLocationPtr serverLocation;    int max = -1;    std::vector<BucketServerLocationPtr> nodesOfEqualSize;    for(HashMapT<BucketServerLocationPtr, CacheableHashSetPtr>::Iterator itr = serverToBucketsMap->begin(); itr != serverToBucketsMap->end(); ++itr) {      CacheableHashSetPtr buckets = CacheableHashSet::create();      CacheableHashSetPtr sBuckets = itr.second();      for(CacheableHashSet::Iterator sItr = sBuckets->begin(); sItr != sBuckets->end(); ++sItr) {        buckets->insert(*sItr);      }      LOGDEBUG("ClientMetadataService::findNextServer currentBucketSet->size() = %d  bucketSet->size() = %d ", currentBucketSet->size(), buckets->size());      for(CacheableHashSet::Iterator currentBucketSetIter = currentBucketSet->begin(); currentBucketSetIter != currentBucketSet->end(); ++currentBucketSetIter) {                buckets->erase(*currentBucketSetIter);        LOGDE
 BUG("ClientMetadataService::findNextServer bucketSet->size() = %d ",  buckets->size());              }      int size = buckets->size();      if (max < size) {        max = size;        serverLocation = itr.first();        nodesOfEqualSize.clear();        nodesOfEqualSize.push_back(serverLocation);      }      else if (max == size){        nodesOfEqualSize.push_back(serverLocation);      }    }    int nodeSize = nodesOfEqualSize.size();    if(nodeSize > 0) {      RandGen randgen;      int random = randgen(nodeSize);      return nodesOfEqualSize.at(random);    }    return NULLPTR;}bool ClientMetadataService::AreBucketSetsEqual(CacheableHashSetPtr& currentBucketSet, CacheableHashSetPtr& bucketSet) {  int32_t currentBucketSetSize = currentBucketSet->size();  int32_t bucketSetSetSize = bucketSet->size();  LOGDEBUG("ClientMetadataService::AreBucketSetsEqual currentBucketSetSize = %d bucketSetSetSize = %d ", currentBucketSetSize, bu
 cketSetSetSize);  if (currentBucketSetSize != bucketSetS
    --- End diff --
    
    It appears you have added new whitespace to every line in this file. Please correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-geode issue #245: native-client-software-grant - ClientMetadata::g...

Posted by PivotalSarge <gi...@git.apache.org>.
Github user PivotalSarge commented on the issue:

    https://github.com/apache/incubator-geode/pull/245
  
    From inspection of the places where ClientMetadata::getServerLocation() gets called, I do not believe that it ought to return only secondaries when tryPrimary == false. Additionally, setting serverLocation to NULL when the list of locations is empty may break assumptions by the caller. I think there should be more information about why it is needed since ClientMetadata.hpp is not a public API and more analysis into how this change it would not break the existing calls in ClientMetaService.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-geode issue #245: native-client-software-grant - ClientMetadata::g...

Posted by fdaniel7 <gi...@git.apache.org>.
Github user fdaniel7 commented on the issue:

    https://github.com/apache/incubator-geode/pull/245
  
    It looks like when tryPrimary == false, the intention was that ClientMetadata::getServerLocation() will return a secondary bucket from m_bucketServerLocationsList[bucketId], considering that there's only one primary location and 0 or more secondaries. the range of the randomization is definitely wrong in m_bucketServerLocationsList[bucketId].at(randgen((int)m_bucketServerLocationsList[bucketId].size())). Since we don't want to assume that the primary is always located in m_bucketServerLocationsList[bucketId].at(0), a while loop is added.
    I don't see a point splitting this function to sections unless  we want specifically primary bucket or secondary.
    
    The reason why I wanted to change this code is because  ClientMetadataService::markPrimaryBucketForTimeoutButLookSecondaryBucket() doesn't work properly since prBuckets->setBucketTimeout(i) will be called only if getServerLocation randomized the primary bucket, which makes no sense.
    I'm going to add new pull request with a new function for marking buckets as blacklisted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---