You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2022/02/05 17:02:18 UTC

[GitHub] [geode-native] gaussianrecurrence opened a new pull request #920: GEODE-10012: Avoid non-HA function retries

gaussianrecurrence opened a new pull request #920:
URL: https://github.com/apache/geode-native/pull/920


    - Currently, in the native client, if you configure retryAttemps to be >0 for your pool, non-HA function executions are retried. 
       This happens as retry are coded in the network layer.
    - Geode documentation states that retries should happen for functions
      which isHA=true and in the function execution logic. And this is how
      it works in the Java client API.
    - If a function is retried at the network layer, it could happen that 
    - So, this commit changes the behavior, so non-HA functions are not
      retried.
    - Also, FunctionAttributes were introduced in order to improve code
      readability.
    - Moved function execution classes to a file of its own, in order to
      improve readability and modularity.
    - Solved a bug in which there could not exist 2 functions with the same
      name, defined on different clusters and with different function
      attributes.
    - Implemented several new ITs to test that non-HA function executions are not
      retried by the API.
    - These are the cases being tested:
       * Function execution with PR enabled:
         - A timeout is forced from the server-side, so the server closes
           the connection towards the client.
         - A timeout is forced in the client.
       * Function execution with PR disabled.
   
   
   Co-authored-by: Jakov Varenina <ja...@est.tech>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode-native] pdxcodemonkey commented on a change in pull request #920: GEODE-10012: Avoid non-HA function retries

Posted by GitBox <gi...@apache.org>.
pdxcodemonkey commented on a change in pull request #920:
URL: https://github.com/apache/geode-native/pull/920#discussion_r803908544



##########
File path: cppcache/integration/test/FunctionExecutionTest.cpp
##########
@@ -74,6 +99,9 @@ class TestResultCollector : public ResultCollector {
   }
 
   virtual void addResult(const std::shared_ptr<Cacheable> &result) override {
+    LOGINFO("Before mutex lock!");
+    std::lock_guard<decltype(mutex_)> lock{mutex_};
+    LOGINFO("Adding a new result!");

Review comment:
       Do we really wish to log this at INFO level?  Looks more like a DEBUG-level message to me...

##########
File path: cppcache/src/ExecutionImpl.hpp
##########
@@ -100,29 +114,18 @@ class ExecutionImpl {
         m_allServer(allServer),
         m_pool(pool),
         m_authenticatedView(authenticatedView) {}
+ protected:
+
   std::shared_ptr<CacheableVector> m_routingObj;

Review comment:
       You've essentially rewritten the class already, it would be a shame not to go ahead and properly rename the member variables.

##########
File path: cppcache/src/TcrMessage.cpp
##########
@@ -1184,12 +1184,11 @@ void TcrMessage::handleByteArrayResponse(
         input.readInt32();
         input.advanceCursor(1);  // ignore byte
 
-        if (!m_functionAttributes) {
-          m_functionAttributes = std::make_shared<std::vector<int8_t>>();
-        }
-        m_functionAttributes->push_back(input.read());
-        m_functionAttributes->push_back(input.read());
-        m_functionAttributes->push_back(input.read());
+        bool hasResult = input.read() != 0;
+        bool isHA = input.read() != 0;
+        bool optimizeForWrite = input.read() != 0;
+        m_functionAttributes =
+            FunctionAttributes{isHA, hasResult, optimizeForWrite};

Review comment:
       Just pausing for a minute to think about how hateful it was to have this encoded as a vector<int8_t> with no indication of meaning whatsoever.  This is a huge improvement in readability, thanks!

##########
File path: cppcache/integration/test/FunctionExecutionTest.cpp
##########
@@ -74,6 +99,9 @@ class TestResultCollector : public ResultCollector {
   }
 
   virtual void addResult(const std::shared_ptr<Cacheable> &result) override {
+    LOGINFO("Before mutex lock!");
+    std::lock_guard<decltype(mutex_)> lock{mutex_};
+    LOGINFO("Adding a new result!");

Review comment:
       Actually no, we shouldn't be using any of the LOG* macros in integration test code at all.  Please remove.

##########
File path: cppcache/src/FunctionAttributes.hpp
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#ifndef GEODE_FUNCTIONATTRIBUTES_H_
+#define GEODE_FUNCTIONATTRIBUTES_H_
+
+#include <cstdint>
+
+namespace apache {
+namespace geode {
+namespace client {
+
+class FunctionAttributes {
+ public:
+  enum : uint8_t {

Review comment:
       Does this enum need to be public?  You have accessors for the individual values already, it seems like a thing you'd want to hide.

##########
File path: cppcache/src/ExecutionImpl.cpp
##########
@@ -434,31 +384,15 @@ void ExecutionImpl::executeOnAllServers(const std::string& func,
   }
 }
 std::shared_ptr<CacheableVector> ExecutionImpl::executeOnPool(
-    const std::string& func, uint8_t getResult, int32_t retryAttempts,
-    std::chrono::milliseconds timeout) {
+    const std::string& func, FunctionAttributes funcAttrs,
+    int32_t retryAttempts, std::chrono::milliseconds timeout) {
   ThinClientPoolDM* tcrdm = dynamic_cast<ThinClientPoolDM*>(m_pool.get());
   if (tcrdm == nullptr) {
     throw IllegalArgumentException(
         "Execute: pool cast to ThinClientPoolDM failed");
   }
   int32_t attempt = 0;
 
-  // auto csArray = tcrdm->getServers();
-
-  // if (csArray != nullptr && csArray->length() != 0) {
-  //  for (int i = 0; i < csArray->length(); i++)
-  //  {
-  //   auto cs = csArray[i];
-  //    TcrEndpoint *ep = nullptr;
-  //    /*
-  //    std::string endpointStr =
-  //    Utils::convertHostToCanonicalForm(cs->value().c_str()
-  //    );
-  //    */
-  //    ep = tcrdm->addEP(cs->value().c_str());
-  //  }
-  //}
-
   // if pools retry attempts are not set then retry once on all available

Review comment:
       Thanks for deleting commented code!

##########
File path: cppcache/src/TcrEndpoint.cpp
##########
@@ -754,9 +754,9 @@ GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request,
   // TcrMessage * req = const_cast<TcrMessage *>(&request);
   LOGDEBUG("TcrEndpoint::sendRequestConn  = %p", m_baseDM);
   if (m_baseDM != nullptr) m_baseDM->beforeSendingRequest(request, conn);
-  if (((type == TcrMessage::EXECUTE_FUNCTION ||
-        type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
-       (request.hasResult() & 2))) {
+  if ((type == TcrMessage::EXECUTE_FUNCTION ||
+       type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
+      request.hasResult()) {

Review comment:
       lolwut?  I poked around briefly to attempt to understand the '& 2' that was here previously, and haven't found a clue.  Was this some magic flag to indicate HA or something?  

##########
File path: cppcache/src/FunctionExecution.cpp
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FunctionExecution.hpp"
+
+#include <geode/ResultCollector.hpp>
+
+#include "CacheImpl.hpp"
+#include "TcrConnectionManager.hpp"
+#include "TcrMessage.hpp"
+#include "ThinClientPoolDM.hpp"
+#include "ThinClientRegion.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+void FunctionExecution::setParameters(
+    const std::string& funcName, FunctionAttributes funcAttrs,
+    std::chrono::milliseconds timeout, std::shared_ptr<Cacheable> args,
+    TcrEndpoint* ep, ThinClientPoolDM* poolDM,
+    std::shared_ptr<std::recursive_mutex> resultCollectorMutex,
+    std::shared_ptr<ResultCollector> resultCollector,
+    std::shared_ptr<UserAttributes> userAttr) {
+  exceptionMsg_.clear();
+  resultCollectorMutex_ = std::move(resultCollectorMutex);
+  resultCollector_ = resultCollector;
+  error_ = GF_NOTCON;
+  funcName_ = funcName;
+  funcAttrs_ = funcAttrs;
+  timeout_ = timeout;
+  args_ = args;
+  endpoint_ = ep;
+  pool_ = poolDM;
+  userAttrs_ = userAttr;
+}
+
+GfErrType FunctionExecution::execute() {
+  GuardUserAttributes gua;
+
+  if (userAttrs_) {
+    gua.setAuthenticatedView(userAttrs_->getAuthenticatedView());
+  }
+
+  TcrMessageExecuteFunction request(
+      new DataOutput(
+          pool_->getConnectionManager().getCacheImpl()->createDataOutput()),
+      funcName_, args_, funcAttrs_, pool_, timeout_);
+  TcrMessageReply reply(true, pool_);
+
+  auto resultProcessor = std::unique_ptr<ChunkedFunctionExecutionResponse>(
+      new ChunkedFunctionExecutionResponse(reply, funcAttrs_.hasResult(),
+                                           resultCollector_,
+                                           resultCollectorMutex_));
+
+  reply.setChunkedResultHandler(resultProcessor.get());
+  reply.setTimeout(timeout_);
+  reply.setDM(pool_);
+
+  LOGDEBUG(
+      "ThinClientPoolDM::sendRequestToAllServer sendRequest on endpoint[%s]!",
+      endpoint_->name().c_str());
+
+  error_ = pool_->sendRequestToEP(request, reply, endpoint_);
+  error_ = pool_->handleEPError(endpoint_, reply, error_);
+  if (error_ != GF_NOERR) {
+    if (error_ == GF_NOTCON) {
+      return GF_NOERR;  // if server is unavailable its not an error for
+      // functionexec OnServers() case
+    }
+    LOGDEBUG("FunctionExecution::execute failed on endpoint[%s]!. Error = %d ",
+             endpoint_->name().c_str(), error_);
+    if (reply.getMessageType() == TcrMessage::EXCEPTION) {
+      exceptionMsg_ = reply.getException();
+    }
+
+    return error_;
+  } else if (reply.getMessageType() == TcrMessage::EXCEPTION ||
+             reply.getMessageType() == TcrMessage::EXECUTE_FUNCTION_ERROR) {
+    error_ = ThinClientRegion::handleServerException("Execute",
+                                                     reply.getException());
+    exceptionMsg_ = reply.getException();
+  }
+
+  return error_;
+}
+
+OnRegionFunctionExecution::OnRegionFunctionExecution(
+    std::string funcName, const Region* region, std::shared_ptr<Cacheable> args,
+    std::shared_ptr<CacheableHashSet> routingObj, FunctionAttributes funcAttrs,
+    std::chrono::milliseconds timeout, ThinClientPoolDM* poolDM,
+    const std::shared_ptr<std::recursive_mutex>& collectorMutex,
+    std::shared_ptr<ResultCollector> collector,
+    std::shared_ptr<UserAttributes> userAttrs, bool isBGThread,
+    const std::shared_ptr<BucketServerLocation>& serverLocation,
+    bool allBuckets)
+    : serverLocation_{serverLocation},
+      backgroundThread_{isBGThread},
+      pool_{poolDM},
+      funcName_{funcName},
+      funcAttrs_{funcAttrs},
+      timeout_{timeout},
+      args_{args},
+      routingObj_{routingObj},
+      resultCollector_{std::move(collector)},
+      resultCollectorMutex_{collectorMutex},
+      userAttrs_{userAttrs},
+      region_{region},
+      allBuckets_{allBuckets} {
+  request_ = new TcrMessageExecuteRegionFunctionSingleHop(

Review comment:
       Eek!  new/delete?  How about std::unique_ptr instead?

##########
File path: cppcache/src/TcrEndpoint.cpp
##########
@@ -754,9 +754,9 @@ GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request,
   // TcrMessage * req = const_cast<TcrMessage *>(&request);
   LOGDEBUG("TcrEndpoint::sendRequestConn  = %p", m_baseDM);
   if (m_baseDM != nullptr) m_baseDM->beforeSendingRequest(request, conn);
-  if (((type == TcrMessage::EXECUTE_FUNCTION ||
-        type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
-       (request.hasResult() & 2))) {
+  if ((type == TcrMessage::EXECUTE_FUNCTION ||
+       type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
+      request.hasResult()) {

Review comment:
       Okay, I see the type changed from int8_t to bool, that explains why the test against 2 had to go away.  Is this some byte in a protocol message, where 2 is a bitflag indicating a result?  IIRC, the chunk flag in chunked messages does something very similar to indicate "this is the last chunk".  Such a strange thing...

##########
File path: cppcache/src/ThinClientRegion.cpp
##########
@@ -2852,22 +2853,25 @@ void ThinClientRegion::executeFunction(
     if (reExecuteForServ) {
       msg = new TcrMessageExecuteRegionFunction(
           new DataOutput(m_cacheImpl->createDataOutput()), func, this, args,
-          routingObj, getResult, failedNodes, timeout, m_tcrdm.get(),
+          routingObj, funcAttrs, failedNodes, timeout, m_tcrdm.get(),
           static_cast<int8_t>(1));
     } else {
       msg = new TcrMessageExecuteRegionFunction(
           new DataOutput(m_cacheImpl->createDataOutput()), func, this, args,
-          routingObj, getResult, failedNodes, timeout, m_tcrdm.get(),
+          routingObj, funcAttrs, failedNodes, timeout, m_tcrdm.get(),
           static_cast<int8_t>(0));
     }
     TcrMessageReply reply(true, m_tcrdm.get());
     // need to check
     ChunkedFunctionExecutionResponse* resultCollector(
-        new ChunkedFunctionExecutionResponse(reply, (getResult & 2) == 2, rc));
+        new ChunkedFunctionExecutionResponse(reply, funcAttrs.hasResult(), rc));
     reply.setChunkedResultHandler(resultCollector);
     reply.setTimeout(timeout);
     GfErrType err = GF_NOERR;
-    err = m_tcrdm->sendSyncRequest(*msg, reply, !(getResult & 1));
+
+    // Function failover logic is not handled in the network layer. That's why
+    // attemptFailover should be always be false when calling sendSyncRequest
+    err = m_tcrdm->sendSyncRequest(*msg, reply, false);

Review comment:
       If the comment here is true, why do we still need the attemptFailover parameter?

##########
File path: cppcache/src/TcrMessage.cpp
##########
@@ -1184,12 +1184,11 @@ void TcrMessage::handleByteArrayResponse(
         input.readInt32();
         input.advanceCursor(1);  // ignore byte
 
-        if (!m_functionAttributes) {
-          m_functionAttributes = std::make_shared<std::vector<int8_t>>();
-        }
-        m_functionAttributes->push_back(input.read());
-        m_functionAttributes->push_back(input.read());
-        m_functionAttributes->push_back(input.read());
+        bool hasResult = input.read() != 0;
+        bool isHA = input.read() != 0;
+        bool optimizeForWrite = input.read() != 0;
+        m_functionAttributes =
+            FunctionAttributes{isHA, hasResult, optimizeForWrite};

Review comment:
       hasResult is now set to whether or not the byte read was 0, which is not the same as "((byte)) & 2 != 0".  Is this the correct test?

##########
File path: cppcache/integration/test/FunctionExecutionTest.cpp
##########
@@ -276,31 +318,164 @@ TEST(FunctionExecutionTest, FunctionExecutionWithIncompleteBucketLocations) {
   cache.close();
 }
 
-std::shared_ptr<CacheableVector> populateRegionReturnFilter(
-    const std::shared_ptr<Region> &region, const int numberOfPuts) {
-  auto routingObj = CacheableVector::create();
-  for (int i = 0; i < numberOfPuts; i++) {
-    region->put("KEY--" + std::to_string(i), "VALUE--" + std::to_string(i));
-    routingObj->push_back(CacheableKey::create("KEY--" + std::to_string(i)));
-  }
-  return routingObj;
+TEST(FunctionExecutionTest, shNonHAFunctionExecServerTimeout) {
+  Cluster cluster{LocatorCount{1}, ServerCount{3}};
+
+  cluster.start([&]() {
+    cluster.getGfsh()
+        .deploy()
+        .jar(getFrameworkString(FrameworkVariable::JavaObjectJarPath))
+        .execute();
+  });
+
+  cluster.getGfsh()
+      .create()
+      .region()
+      .withName("partition_region")
+      .withType("PARTITION")
+      .withRedundantCopies("1")
+      .execute();
+
+  auto cache = CacheFactory().set("log-level", "none").create();
+  auto poolFactory = cache.getPoolManager().createFactory();
+
+  cluster.applyLocators(poolFactory);
+
+  auto pool =
+      poolFactory.setPRSingleHopEnabled(true).setRetryAttempts(2).create(
+          "pool");
+
+  auto region = cache.createRegionFactory(RegionShortcut::PROXY)
+                    .setPoolName("pool")
+                    .create("partition_region");
+
+  // Populate region in a way that not all buckets are created.
+  // Servers in this case will create 88 of possible 113 buckets.
+  populateRegionAllBuckets(region);
+
+  // Check that PR metadata is updated. This is done to be sure
+  // that client will execute function in a non single hop manner
+  // because metadata doesn't contain all bucket locations.
+  // After metadata is refreshed, it will contain at least one
+  // bucket location.
+  CacheImpl *cacheImpl = CacheRegionHelper::getCacheImpl(&cache);
+  waitUntilPRMetadataIsRefreshed(cacheImpl);
+
+  auto functionService = FunctionService::onRegion(region);
+  auto resultCollector = std::make_shared<TestResultCollector>();
+
+  resultCollector->lock();
+
+  auto runner =
+      std::async(std::launch::async, [&functionService, resultCollector]() {
+        functionService.withCollector(resultCollector)
+            .execute("MultiGetAllFunctionTimeoutNonHA");
+      });
+
+  std::this_thread::sleep_for(std::chrono::seconds{25});

Review comment:
       Please wait on a synchronization object, rather than introduce sleep_for calls into test code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode-native] pivotal-jbarrett commented on a change in pull request #920: GEODE-10012: Avoid non-HA function retries

Posted by GitBox <gi...@apache.org>.
pivotal-jbarrett commented on a change in pull request #920:
URL: https://github.com/apache/geode-native/pull/920#discussion_r817929905



##########
File path: cppcache/src/ExecutionImpl.hpp
##########
@@ -78,6 +78,20 @@ class ExecutionImpl {
   static void addResults(std::shared_ptr<ResultCollector>& collector,
                          const std::shared_ptr<CacheableVector>& results);
 
+ protected:
+  std::shared_ptr<CacheableVector> executeOnPool(
+      const std::string& func, FunctionAttributes funcAttrs, int32_t retryAttempts,
+      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
+
+  void executeOnAllServers(
+      const std::string& func, FunctionAttributes funcAttrs,
+      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
+
+  FunctionAttributes getFunctionAttributes(
+      const std::string& func);

Review comment:
       Please use full names, like `functionId` here.

##########
File path: cppcache/src/ExecutionImpl.hpp
##########
@@ -78,6 +78,20 @@ class ExecutionImpl {
   static void addResults(std::shared_ptr<ResultCollector>& collector,
                          const std::shared_ptr<CacheableVector>& results);
 
+ protected:
+  std::shared_ptr<CacheableVector> executeOnPool(
+      const std::string& func, FunctionAttributes funcAttrs, int32_t retryAttempts,
+      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
+
+  void executeOnAllServers(
+      const std::string& func, FunctionAttributes funcAttrs,
+      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
+
+  FunctionAttributes getFunctionAttributes(
+      const std::string& func);
+  FunctionAttributes updateFunctionAttributes(const std::string &funcName);
+  GfErrType getFuncAttributes(const std::string& func, FunctionAttributes& attr);

Review comment:
       `functionId` and `functionAttributes` please.

##########
File path: cppcache/src/ExecutionImpl.cpp
##########
@@ -329,26 +281,23 @@ std::shared_ptr<ResultCollector> ExecutionImpl::execute(
           "Execution::execute: Transaction function execution on pool is not "
           "supported");
     }
-    if (m_allServer == false) {
-      executeOnPool(
-          func, isHAHasResultOptimizeForWrite,
-          (isHAHasResultOptimizeForWrite & 1) ? m_pool->getRetryAttempts() : 0,
-          timeout);
-      if (serverHasResult == true) {
-        // ExecutionImpl::addResults(m_rc, rs);
+    if (!m_allServer) {
+      executeOnPool(func, attrs, attrs.isHA() ? m_pool->getRetryAttempts() : 0,
+                    timeout);
+      if (attrs.hasResult()) {
         m_rc->endResults();
       }
       return m_rc;
     }
-    executeOnAllServers(func, isHAHasResultOptimizeForWrite, timeout);
+    executeOnAllServers(func, attrs, timeout);
   } else {
     throw IllegalStateException("Execution::execute: should not be here");
   }
   return m_rc;
 }
 
-GfErrType ExecutionImpl::getFuncAttributes(
-    const std::string& func, std::shared_ptr<std::vector<int8_t>>* attr) {
+GfErrType ExecutionImpl::getFuncAttributes(const std::string& func,
+                                           FunctionAttributes& attr) {

Review comment:
       Does it make more sense to have the function return `FunctionAttributes` and `throw` exceptions?

##########
File path: cppcache/src/ExecutionImpl.cpp
##########
@@ -406,24 +355,25 @@ void ExecutionImpl::addResults(
 }
 
 void ExecutionImpl::executeOnAllServers(const std::string& func,
-                                        uint8_t getResult,
+                                        FunctionAttributes funcAttrs,

Review comment:
       Is the copy construction cheaper or necessary here rather than a `const &`?

##########
File path: cppcache/src/ThinClientRegion.cpp
##########
@@ -2907,18 +2911,20 @@ void ThinClientRegion::executeFunction(
         rc->clearResults();
         failedNodes->clear();
       } else if (err == GF_TIMEOUT) {
-        LOGINFO("function timeout. Name: %s, timeout: %s, params: %" PRIu8
-                ", "
-                "retryAttempts: %d ",
-                func.c_str(), to_string(timeout).c_str(), getResult,
-                retryAttempts);
+        LOGINFO(
+            "function timeout. Name: %s, timeout: %s, FunctionState: %" PRIu8
+            ", "
+            "retryAttempts: %d ",

Review comment:
       Cleanup the formatting here.

##########
File path: cppcache/src/FunctionExecution.cpp
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FunctionExecution.hpp"
+
+#include <geode/ResultCollector.hpp>
+
+#include "CacheImpl.hpp"
+#include "TcrConnectionManager.hpp"
+#include "TcrMessage.hpp"
+#include "ThinClientPoolDM.hpp"
+#include "ThinClientRegion.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+void FunctionExecution::setParameters(
+    const std::string& funcName, FunctionAttributes funcAttrs,
+    std::chrono::milliseconds timeout, std::shared_ptr<Cacheable> args,
+    TcrEndpoint* ep, ThinClientPoolDM* poolDM,
+    std::shared_ptr<std::recursive_mutex> resultCollectorMutex,
+    std::shared_ptr<ResultCollector> resultCollector,
+    std::shared_ptr<UserAttributes> userAttr) {
+  exceptionMsg_.clear();
+  resultCollectorMutex_ = std::move(resultCollectorMutex);
+  resultCollector_ = resultCollector;
+  error_ = GF_NOTCON;
+  funcName_ = funcName;

Review comment:
       Again, full names here.

##########
File path: cppcache/src/ExecutionImpl.hpp
##########
@@ -100,29 +114,18 @@ class ExecutionImpl {
         m_allServer(allServer),
         m_pool(pool),
         m_authenticatedView(authenticatedView) {}
+ protected:
+
   std::shared_ptr<CacheableVector> m_routingObj;
   std::shared_ptr<Cacheable> m_args;
   std::shared_ptr<ResultCollector> m_rc;
   std::shared_ptr<Region> m_region;
   bool m_allServer;
   std::shared_ptr<Pool> m_pool;
   AuthenticatedView* m_authenticatedView;
-  static std::recursive_mutex m_func_attrs_lock;
-  static FunctionToFunctionAttributes m_func_attrs;
-  //  std::vector<int8_t> m_attributes;
-
-  std::shared_ptr<CacheableVector> executeOnPool(
-      const std::string& func, uint8_t getResult, int32_t retryAttempts,
-      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
 
-  void executeOnAllServers(
-      const std::string& func, uint8_t getResult,
-      std::chrono::milliseconds timeout = DEFAULT_QUERY_RESPONSE_TIMEOUT);
-
-  std::shared_ptr<std::vector<int8_t>> getFunctionAttributes(
-      const std::string& func);
-  GfErrType getFuncAttributes(const std::string& func,
-                              std::shared_ptr<std::vector<int8_t>>* attr);
+  std::map<std::string, FunctionAttributes> funcAttrs_;

Review comment:
       Yes! Thanks for removing another global!
   Please use full names, not abbreviated names.
   `functionAttributes_`
   `functionAttributesMutex_`

##########
File path: cppcache/src/FunctionAttributes.hpp
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#ifndef GEODE_FUNCTIONATTRIBUTES_H_
+#define GEODE_FUNCTIONATTRIBUTES_H_
+
+#include <cstdint>
+
+namespace apache {
+namespace geode {
+namespace client {
+
+class FunctionAttributes {
+ public:
+  enum : uint8_t {

Review comment:
       Please use `enum class`.

##########
File path: cppcache/integration/test/FunctionExecutionTest.cpp
##########
@@ -276,31 +318,164 @@ TEST(FunctionExecutionTest, FunctionExecutionWithIncompleteBucketLocations) {
   cache.close();
 }
 
-std::shared_ptr<CacheableVector> populateRegionReturnFilter(
-    const std::shared_ptr<Region> &region, const int numberOfPuts) {
-  auto routingObj = CacheableVector::create();
-  for (int i = 0; i < numberOfPuts; i++) {
-    region->put("KEY--" + std::to_string(i), "VALUE--" + std::to_string(i));
-    routingObj->push_back(CacheableKey::create("KEY--" + std::to_string(i)));
-  }
-  return routingObj;
+TEST(FunctionExecutionTest, shNonHAFunctionExecServerTimeout) {
+  Cluster cluster{LocatorCount{1}, ServerCount{3}};
+
+  cluster.start([&]() {
+    cluster.getGfsh()
+        .deploy()
+        .jar(getFrameworkString(FrameworkVariable::JavaObjectJarPath))
+        .execute();
+  });
+
+  cluster.getGfsh()
+      .create()
+      .region()
+      .withName("partition_region")
+      .withType("PARTITION")
+      .withRedundantCopies("1")
+      .execute();
+
+  auto cache = CacheFactory().set("log-level", "none").create();
+  auto poolFactory = cache.getPoolManager().createFactory();
+
+  cluster.applyLocators(poolFactory);
+
+  auto pool =
+      poolFactory.setPRSingleHopEnabled(true).setRetryAttempts(2).create(
+          "pool");
+
+  auto region = cache.createRegionFactory(RegionShortcut::PROXY)
+                    .setPoolName("pool")
+                    .create("partition_region");
+
+  // Populate region in a way that not all buckets are created.
+  // Servers in this case will create 88 of possible 113 buckets.
+  populateRegionAllBuckets(region);
+
+  // Check that PR metadata is updated. This is done to be sure
+  // that client will execute function in a non single hop manner
+  // because metadata doesn't contain all bucket locations.
+  // After metadata is refreshed, it will contain at least one
+  // bucket location.
+  CacheImpl *cacheImpl = CacheRegionHelper::getCacheImpl(&cache);
+  waitUntilPRMetadataIsRefreshed(cacheImpl);
+
+  auto functionService = FunctionService::onRegion(region);
+  auto resultCollector = std::make_shared<TestResultCollector>();
+
+  resultCollector->lock();
+
+  auto runner =
+      std::async(std::launch::async, [&functionService, resultCollector]() {
+        functionService.withCollector(resultCollector)
+            .execute("MultiGetAllFunctionTimeoutNonHA");
+      });
+
+  std::this_thread::sleep_for(std::chrono::seconds{25});

Review comment:
       We have tests with examples of using different synchronization primitives in place of sleeping.

##########
File path: cppcache/integration/test/FunctionExecutionTest.cpp
##########
@@ -74,6 +99,9 @@ class TestResultCollector : public ResultCollector {
   }
 
   virtual void addResult(const std::shared_ptr<Cacheable> &result) override {
+    LOGINFO("Before mutex lock!");
+    std::lock_guard<decltype(mutex_)> lock{mutex_};
+    LOGINFO("Adding a new result!");

Review comment:
       Tests shouldn't be logging period. If the behavior is worth logging then it should just be asserted. 

##########
File path: cppcache/src/FunctionExecution.cpp
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FunctionExecution.hpp"
+
+#include <geode/ResultCollector.hpp>
+
+#include "CacheImpl.hpp"
+#include "TcrConnectionManager.hpp"
+#include "TcrMessage.hpp"
+#include "ThinClientPoolDM.hpp"
+#include "ThinClientRegion.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+void FunctionExecution::setParameters(
+    const std::string& funcName, FunctionAttributes funcAttrs,
+    std::chrono::milliseconds timeout, std::shared_ptr<Cacheable> args,
+    TcrEndpoint* ep, ThinClientPoolDM* poolDM,
+    std::shared_ptr<std::recursive_mutex> resultCollectorMutex,
+    std::shared_ptr<ResultCollector> resultCollector,
+    std::shared_ptr<UserAttributes> userAttr) {
+  exceptionMsg_.clear();
+  resultCollectorMutex_ = std::move(resultCollectorMutex);
+  resultCollector_ = resultCollector;
+  error_ = GF_NOTCON;
+  funcName_ = funcName;
+  funcAttrs_ = funcAttrs;
+  timeout_ = timeout;
+  args_ = args;
+  endpoint_ = ep;
+  pool_ = poolDM;
+  userAttrs_ = userAttr;
+}
+
+GfErrType FunctionExecution::execute() {
+  GuardUserAttributes gua;
+
+  if (userAttrs_) {
+    gua.setAuthenticatedView(userAttrs_->getAuthenticatedView());
+  }
+
+  TcrMessageExecuteFunction request(
+      new DataOutput(
+          pool_->getConnectionManager().getCacheImpl()->createDataOutput()),
+      funcName_, args_, funcAttrs_, pool_, timeout_);
+  TcrMessageReply reply(true, pool_);
+
+  auto resultProcessor = std::unique_ptr<ChunkedFunctionExecutionResponse>(
+      new ChunkedFunctionExecutionResponse(reply, funcAttrs_.hasResult(),
+                                           resultCollector_,
+                                           resultCollectorMutex_));
+
+  reply.setChunkedResultHandler(resultProcessor.get());
+  reply.setTimeout(timeout_);
+  reply.setDM(pool_);
+
+  LOGDEBUG(
+      "ThinClientPoolDM::sendRequestToAllServer sendRequest on endpoint[%s]!",
+      endpoint_->name().c_str());
+
+  error_ = pool_->sendRequestToEP(request, reply, endpoint_);
+  error_ = pool_->handleEPError(endpoint_, reply, error_);
+  if (error_ != GF_NOERR) {
+    if (error_ == GF_NOTCON) {
+      return GF_NOERR;  // if server is unavailable its not an error for
+      // functionexec OnServers() case
+    }
+    LOGDEBUG("FunctionExecution::execute failed on endpoint[%s]!. Error = %d ",
+             endpoint_->name().c_str(), error_);
+    if (reply.getMessageType() == TcrMessage::EXCEPTION) {
+      exceptionMsg_ = reply.getException();
+    }
+
+    return error_;
+  } else if (reply.getMessageType() == TcrMessage::EXCEPTION ||
+             reply.getMessageType() == TcrMessage::EXECUTE_FUNCTION_ERROR) {
+    error_ = ThinClientRegion::handleServerException("Execute",
+                                                     reply.getException());
+    exceptionMsg_ = reply.getException();
+  }
+
+  return error_;
+}
+
+OnRegionFunctionExecution::OnRegionFunctionExecution(
+    std::string funcName, const Region* region, std::shared_ptr<Cacheable> args,
+    std::shared_ptr<CacheableHashSet> routingObj, FunctionAttributes funcAttrs,
+    std::chrono::milliseconds timeout, ThinClientPoolDM* poolDM,
+    const std::shared_ptr<std::recursive_mutex>& collectorMutex,
+    std::shared_ptr<ResultCollector> collector,
+    std::shared_ptr<UserAttributes> userAttrs, bool isBGThread,
+    const std::shared_ptr<BucketServerLocation>& serverLocation,
+    bool allBuckets)
+    : serverLocation_{serverLocation},
+      backgroundThread_{isBGThread},
+      pool_{poolDM},
+      funcName_{funcName},

Review comment:
       `std::move` for probably both the  `functionId` and `functionAttributes`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org