You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2018/11/13 16:45:41 UTC
[geode-native] branch develop updated: GEODE-5957: Parse
(previously) unknown server error messages (#400)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 98d1fb4 GEODE-5957: Parse (previously) unknown server error messages (#400)
98d1fb4 is described below
commit 98d1fb419071a37a9745cd0879e6b498f8843a14
Author: Blake Bender <ek...@hotmail.com>
AuthorDate: Tue Nov 13 08:45:36 2018 -0800
GEODE-5957: Parse (previously) unknown server error messages (#400)
- All are just essentially string error messages
- Provides more detail about what happened on server
Co-authored-by: Matthew Reddington <mr...@pivotal.io>
---
cppcache/integration-test-2/CMakeLists.txt | 2 +-
.../integration-test-2/FunctionExecutionTest.cpp | 93 ++++++++++++++++++++++
cppcache/src/ExecutionImpl.cpp | 7 +-
cppcache/src/TcrMessage.cpp | 48 ++++-------
cppcache/src/TcrMessage.hpp | 2 +-
5 files changed, 118 insertions(+), 34 deletions(-)
diff --git a/cppcache/integration-test-2/CMakeLists.txt b/cppcache/integration-test-2/CMakeLists.txt
index 322550c..05fa53a 100644
--- a/cppcache/integration-test-2/CMakeLists.txt
+++ b/cppcache/integration-test-2/CMakeLists.txt
@@ -31,7 +31,7 @@ add_executable(integration-test-2
StructTest.cpp
EnableChunkHandlerThreadTest.cpp
DataSerializableTest.cpp
-)
+ FunctionExecutionTest.cpp)
target_compile_definitions(integration-test-2
PUBLIC
diff --git a/cppcache/integration-test-2/FunctionExecutionTest.cpp b/cppcache/integration-test-2/FunctionExecutionTest.cpp
new file mode 100644
index 0000000..8cdedcb
--- /dev/null
+++ b/cppcache/integration-test-2/FunctionExecutionTest.cpp
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include <geode/Cache.hpp>
+#include <geode/ExceptionTypes.hpp>
+#include <geode/FunctionService.hpp>
+#include <geode/PoolManager.hpp>
+#include <geode/RegionFactory.hpp>
+#include <geode/RegionShortcut.hpp>
+
+#include "framework/Cluster.h"
+#include "framework/Gfsh.h"
+
+using apache::geode::client::Cache;
+using apache::geode::client::Cacheable;
+using apache::geode::client::CacheableVector;
+using apache::geode::client::FunctionExecutionException;
+using apache::geode::client::FunctionService;
+using apache::geode::client::Region;
+using apache::geode::client::RegionShortcut;
+using apache::geode::client::ResultCollector;
+
+std::shared_ptr<Region> setupRegion(Cache &cache) {
+ auto region = cache.createRegionFactory(RegionShortcut::PROXY)
+ .setPoolName("default")
+ .create("region");
+
+ return region;
+}
+
+TEST(FunctionExecutionTest, UnknownFunction) {
+ Cluster cluster{LocatorCount{1}, ServerCount{1}};
+ cluster.getGfsh()
+ .create()
+ .region()
+ .withName("region")
+ .withType("REPLICATE")
+ .execute();
+
+ auto cache = cluster.createCache();
+ auto region = setupRegion(cache);
+
+ ASSERT_THROW(FunctionService::onServer(region->getRegionService())
+ .execute("I_Don_t_Exist"),
+ FunctionExecutionException);
+}
+
+class TestResultCollector : public ResultCollector {
+ virtual std::shared_ptr<CacheableVector> getResult(
+ std::chrono::milliseconds) override {
+ return std::shared_ptr<CacheableVector>();
+ }
+
+ virtual void addResult(const std::shared_ptr<Cacheable> &) override {}
+
+ virtual void endResults() override {}
+
+ virtual void clearResults() override {}
+};
+
+TEST(FunctionExecutionTest, UnknownFunctionAsync) {
+ Cluster cluster{LocatorCount{1}, ServerCount{1}};
+ cluster.getGfsh()
+ .create()
+ .region()
+ .withName("region")
+ .withType("REPLICATE")
+ .execute();
+
+ auto cache = cluster.createCache();
+ auto region = setupRegion(cache);
+
+ ASSERT_THROW(FunctionService::onServer(region->getRegionService())
+ .withCollector(std::make_shared<TestResultCollector>())
+ .execute("I_Don_t_Exist"),
+ FunctionExecutionException);
+}
diff --git a/cppcache/src/ExecutionImpl.cpp b/cppcache/src/ExecutionImpl.cpp
index 6f1f651..10de65b 100644
--- a/cppcache/src/ExecutionImpl.cpp
+++ b/cppcache/src/ExecutionImpl.cpp
@@ -131,7 +131,7 @@ std::shared_ptr<ResultCollector> ExecutionImpl::execute(
serverOptimizeForWrite = ((attr->at(2) == 1) ? true : false);
LOGDEBUG(
- "ExecutionImpl::execute got functionAttributes from srver for function = "
+ "ExecutionImpl::execute got functionAttributes from server for function = "
"%s serverHasResult = %d "
" serverIsHA = %d serverOptimizeForWrite = %d ",
func.c_str(), serverHasResult, serverIsHA, serverOptimizeForWrite);
@@ -392,10 +392,15 @@ GfErrType ExecutionImpl::getFuncAttributes(const std::string& func,
reply.getException());
break;
}
+ case TcrMessage::REQUEST_DATA_ERROR: {
+ LOGERROR("Error message from server: " + reply.getValue()->toString());
+ throw FunctionExecutionException(reply.getValue()->toString());
+ }
default: {
LOGERROR("Unknown message type %d while getting function attributes.",
reply.getMessageType());
err = GF_MSG;
+ break;
}
}
return err;
diff --git a/cppcache/src/TcrMessage.cpp b/cppcache/src/TcrMessage.cpp
index 6b7ee76..40a1b69 100644
--- a/cppcache/src/TcrMessage.cpp
+++ b/cppcache/src/TcrMessage.cpp
@@ -197,32 +197,31 @@ void TcrMessage::readLongPart(DataInput& input, uint64_t* intValue) {
*intValue = input.readInt64();
}
-void TcrMessage::readStringPart(DataInput& input, uint32_t* len, char** str) {
- char* ts;
- int32_t sl = input.readInt32();
- ts = new char[sl];
- if (input.read()) throw Exception("String is not an object");
- input.readBytesOnly(reinterpret_cast<int8_t*>(ts), sl);
- *len = sl;
- *str = ts;
+const std::string TcrMessage::readStringPart(DataInput& input) {
+ char* stringBuffer;
+ int32_t stringLength = input.readInt32();
+ stringBuffer = new char[stringLength + 1];
+ stringBuffer[stringLength] = '\0';
+ if (input.read()) {
+ throw Exception("String is not an object");
+ }
+ input.readBytesOnly(reinterpret_cast<int8_t*>(stringBuffer), stringLength);
+ std::string str = stringBuffer;
+ delete[] stringBuffer;
+ return str;
}
+
void TcrMessage::readCqsPart(DataInput& input) {
m_cqs->clear();
readIntPart(input, &m_numCqPart);
for (uint32_t cqCnt = 0; cqCnt < m_numCqPart;) {
- char* cqName;
- uint32_t len;
- readStringPart(input, &len, &cqName);
- std::string cq(cqName, len);
- delete[] cqName;
+ auto cq = readStringPart(input);
cqCnt++;
int32_t cqOp;
readIntPart(input, reinterpret_cast<uint32_t*>(&cqOp));
cqCnt++;
(*m_cqs)[cq] = cqOp;
- // LOGINFO("read cqName[%s],cqOp[%d]", cq.c_str(), cqOp);
}
- // LOGDEBUG("mapsize = %d", m_cqs.size());
}
inline void TcrMessage::readCallbackObjectPart(DataInput& input,
@@ -234,17 +233,10 @@ inline void TcrMessage::readCallbackObjectPart(DataInput& input,
input.readObject(m_callbackArgument);
} else {
if (defaultString) {
- // TODO:
- // m_callbackArgument = CacheableString::create(
- // (char*)input.currentBufferPosition( ), lenObj );
m_callbackArgument = readCacheableString(input, lenObj);
} else {
- // TODO::
- // m_callbackArgument = CacheableBytes::create(
- // input.currentBufferPosition( ), lenObj );
m_callbackArgument = readCacheableBytes(input, lenObj);
}
- // input.advanceCursor( lenObj );
}
}
}
@@ -257,15 +249,10 @@ inline void TcrMessage::readObjectPart(DataInput& input, bool defaultString) {
input.readObject(m_value);
} else {
if (defaultString) {
- // m_value = CacheableString::create(
- // (char*)input.currentBufferPosition( ), lenObj );
m_value = readCacheableString(input, lenObj);
} else {
- // m_value = CacheableBytes::create(
- // input.currentBufferPosition( ), lenObj );
m_value = readCacheableBytes(input, lenObj);
}
- // input.advanceCursor( lenObj );
}
} else if (lenObj == 0 && isObj == 2) { // EMPTY BYTE ARRAY
m_value = CacheableBytes::create();
@@ -1043,13 +1030,12 @@ void TcrMessage::handleByteArrayResponse(
case TcrMessage::UNREGISTER_INTEREST_DATA_ERROR:
case TcrMessage::PUT_DATA_ERROR:
case TcrMessage::KEY_SET_DATA_ERROR:
- case TcrMessage::REQUEST_DATA_ERROR:
case TcrMessage::DESTROY_REGION_DATA_ERROR:
case TcrMessage::CLEAR_REGION_DATA_ERROR:
case TcrMessage::CONTAINS_KEY_DATA_ERROR:
- case TcrMessage::PUT_DELTA_ERROR: {
- // do nothing. (?) TODO Do we need to process further.
- m_shouldIgnore = true;
+ case TcrMessage::PUT_DELTA_ERROR:
+ case TcrMessage::REQUEST_DATA_ERROR: {
+ m_value = std::make_shared<CacheableString>(readStringPart(input));
break;
}
diff --git a/cppcache/src/TcrMessage.hpp b/cppcache/src/TcrMessage.hpp
index 1ac99b4..5a78846 100644
--- a/cppcache/src/TcrMessage.hpp
+++ b/cppcache/src/TcrMessage.hpp
@@ -575,7 +575,7 @@ class APACHE_GEODE_EXPORT TcrMessage {
int32_t parts = 1); // skip num parts then read eventid
void skipParts(DataInput& input, int32_t numParts = 1);
- void readStringPart(DataInput& input, uint32_t* len, char** str);
+ const std::string readStringPart(DataInput& input);
void readCqsPart(DataInput& input);
void readHashMapForGCVersions(apache::geode::client::DataInput& input,
std::shared_ptr<CacheableHashMap>& value);