You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2018/11/28 06:09:10 UTC
[geode] branch develop updated: GEODE-1842: refactor
QueryDataFunction (#2897)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d861afc GEODE-1842: refactor QueryDataFunction (#2897)
d861afc is described below
commit d861afcde83be8ae10e81c9e1612536e99d41ea6
Author: jinmeiliao <ji...@pivotal.io>
AuthorDate: Tue Nov 27 22:09:00 2018 -0800
GEODE-1842: refactor QueryDataFunction (#2897)
* Separate the QueryDataFunction into the real function code and the caller of the function code (DataQueryEngine)
* remove the static calls to get cache and ManagementService.
* refactor the test to add assertions.
---
.../management/DataQueryEngineIntegrationTest.java | 362 +++++++++++++++++++++
.../QueryDataFunctionIntegrationTest.java | 325 ------------------
.../management/internal/beans/BeanUtilFuncs.java | 21 --
.../management/internal/beans/DataQueryEngine.java | 295 +++++++++++++++++
.../internal/beans/DistributedSystemBridge.java | 13 +-
.../internal/beans/QueryDataFunction.java | 300 +----------------
.../sanctioned-geode-core-serializables.txt | 1 -
7 files changed, 676 insertions(+), 641 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/DataQueryEngineIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/DataQueryEngineIntegrationTest.java
new file mode 100644
index 0000000..6a55a2b
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/DataQueryEngineIntegrationTest.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.json.JSONObject;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.internal.ManagementConstants;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.management.internal.beans.DataQueryEngine;
+import org.apache.geode.management.internal.cli.json.TypedJson;
+import org.apache.geode.management.model.EmptyObject;
+import org.apache.geode.management.model.Item;
+import org.apache.geode.management.model.Order;
+import org.apache.geode.management.model.SubOrder;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
+import org.apache.geode.test.junit.categories.GfshTest;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+/**
+ * Functional integration tests for {@link DataQueryEngine}.
+ */
+@Category({GfshTest.class})
+public class DataQueryEngineIntegrationTest {
+
+ private static final String REGION_NAME = "exampleRegion";
+ private static final String QUERY_1 = "SELECT * FROM /exampleRegion";
+
+ /**
+ * Number of rows queryData operation will return. By default it will be 1000
+ */
+ private static final int queryResultSetLimit = ManagementConstants.DEFAULT_QUERY_LIMIT;
+
+ /**
+ * Number of elements to be shown in queryData operation if query results contain collections like
+ * Map, List etc.
+ */
+ private static final int queryCollectionsDepth = TypedJson.DEFAULT_COLLECTION_ELEMENT_LIMIT;
+
+ @Rule
+ public ServerStarterRule server = new ServerStarterRule().withNoCacheServer()
+ .withJMXManager().withRegion(RegionShortcut.REPLICATE, REGION_NAME).withAutoStart();
+ private Region<Object, Object> region;
+ private DataQueryEngine queryEngine;
+
+ @Before
+ public void setUp() throws Exception {
+ region = server.getCache().getRegion(REGION_NAME);
+ queryEngine = new DataQueryEngine((SystemManagementService) server.getManagementService(),
+ server.getCache());
+ }
+
+
+ /**
+ * Tests a model where Objects have a circular reference with object reference. e.g. Order1--
+ * Has--> Items each Item --Has --> OrderN where (OrderN == Order1)
+ * <p>
+ *
+ * RegressionTest for TRAC #51048: Disk Read/ Write shows negative at cluster level JMX
+ */
+ @Test
+ public void testCyclicWithNestedObjectReference() throws Exception {
+ Order order = new Order();
+ order.setId("test");
+
+ for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
+ Item item = new Item(order, "ID_" + subOrderIndex, "Book");
+ order.addItem(item);
+ }
+
+ region.put("order1", order);
+
+ String expectedResult =
+ "{\"result\":[[\"org.apache.geode.management.model.Order\",{\"items\":[\"java.util.Collection\",{\"0\":[\"org.apache.geode.management.model.Item\",{\"itemDescription\":[\"java.lang.String\",\"Book\"],\"itemId\":[\"java.lang.String\",\"ID_1\"],\"order\":[\"org.apache.geode.management.model.Order\",\"org.apache.geode.management.model.Order\"]}],\"1\":[\"org.apache.geode.management.model.Item\",{\"itemDescription\":[\"java.lang.String\",\"Book\"],\"itemId\":[\"java.lang.String\",\"I [...]
+ Object result = queryEngine.queryForJsonResult(QUERY_1, 0, queryResultSetLimit,
+ queryCollectionsDepth);
+ String queryResult = (String) result;
+
+ assertThat(queryResult).isEqualToIgnoringWhitespace(expectedResult);
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+ }
+
+ /**
+ * Tests a model where Objects have a circular reference with their class types. e.g. Order1--
+ * Has--> Items each Item --Has --> OrderN where (OrderN != Order1)
+ */
+ @Test
+ public void testCyclicWithNestedClasses() throws Exception {
+ Order order = new Order();
+ order.setId("test");
+
+ for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
+ Order subOrder = new Order();
+ subOrder.setId("ORDER_ID_" + subOrderIndex);
+
+ Item item = new Item(subOrder, "ID_" + subOrderIndex, "Book");
+ order.addItem(item);
+ }
+
+ region.put("order1", order);
+
+ String queryResult = queryEngine.queryForJsonResult(QUERY_1, 0, queryResultSetLimit,
+ queryCollectionsDepth);
+
+ String expectedResult =
+ "{\"result\":[[\"org.apache.geode.management.model.Order\",{\"items\":[\"java.util.Collection\",{\"0\":[\"org.apache.geode.management.model.Item\",{\"itemDescription\":[\"java.lang.String\",\"Book\"],\"itemId\":[\"java.lang.String\",\"ID_1\"],\"order\":[\"org.apache.geode.management.model.Order\",{\"items\":[\"java.util.Collection\",{}],\"id\":[\"java.lang.String\",\"ORDER_ID_1\"]}]}],\"1\":[\"org.apache.geode.management.model.Item\",{\"itemDescription\":[\"java.lang.String\",\"B [...]
+ assertThat(queryResult).isEqualToIgnoringWhitespace(expectedResult);
+
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+ }
+
+ /**
+ * Tests a model where Objects have a circular reference with their class types. e.g. Order1--
+ * Has--> Items each Item --Has --> OrderN where (OrderN != Order1)
+ */
+ @Test
+ public void testCyclicWithNestedRefernce2ndLayer() throws Exception {
+ Collection<Item> items = new ArrayList<>();
+ Order order = new Order("ORDER_TEST", items);
+
+ for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
+ Order subOrder = new Order();
+ subOrder.setId("ORDER_ID_" + subOrderIndex);
+ subOrder.setItems(items);
+
+ Item item = new Item(subOrder, "ID_" + subOrderIndex, "Book");
+ order.addItem(item);
+ }
+
+ region.put("order1", order);
+
+ String queryResult = queryEngine.queryForJsonResult(QUERY_1, 0, queryResultSetLimit,
+ queryCollectionsDepth);
+ String expectedResult =
+ "{\"result\":[[\"org.apache.geode.management.model.Order\",{\"items\":[\"java.util.Collection\",{\"0\":[\"org.apache.geode.management.model.Item\",{\"itemDescription\":[\"java.lang.String\",\"Book\"],\"itemId\":[\"java.lang.String\",\"ID_1\"],\"order\":[\"org.apache.geode.management.model.Order\",{\"items\":[\"java.util.Collection\",\"java.util.ArrayList\"],\"id\":[\"java.lang.String\",\"ORDER_ID_1\"]}]}],\"1\":[\"org.apache.geode.management.model.Item\",{\"itemDescription\":[\"j [...]
+ assertThat(queryResult).isEqualToIgnoringWhitespace(expectedResult);
+
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+ }
+
+ @Test
+ public void testCyclicWithCollection1stLayer() throws Exception {
+ Collection<Item> items = new ArrayList<>();
+ Order order = new Order("ORDER_TEST", items);
+
+ for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
+ Order subOrder = new Order();
+ subOrder.setId("ORDER_ID_" + subOrderIndex);
+ subOrder.setItems(items);
+
+ Item item = new Item(subOrder, "ID_" + subOrderIndex, "Book");
+ order.addItem(item);
+ }
+
+ region.put("items", items);
+
+ String queryResult = queryEngine.queryForJsonResult(QUERY_1, 0, queryResultSetLimit,
+ queryCollectionsDepth);
+ System.out.println("Query Result: " + queryResult);
+
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+ }
+
+ @Test
+ public void testCyclicCollectionWithMultipleObjects() throws Exception {
+ for (int orderIndex = 1; orderIndex <= 5; orderIndex++) {
+ Collection<Item> items = new ArrayList<>();
+ Order order = new Order("ORDER_TEST_" + orderIndex, items);
+
+ for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
+ Order subOrder = new Order();
+ subOrder.setId("ORDER_ID_" + subOrderIndex);
+ subOrder.setItems(items);
+
+ Item item = new Item(subOrder, "ID_" + subOrderIndex, "Book");
+ order.addItem(item);
+ }
+
+ region.put("items_" + orderIndex, items);
+ }
+
+ String queryResult = queryEngine.queryForJsonResult(QUERY_1, 0, queryResultSetLimit,
+ queryCollectionsDepth);
+ System.out.println("Query Result: " + queryResult);
+
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+ }
+
+ @Test
+ public void testCyclicArrayMultipleObjects() throws Exception {
+ for (int orderIndex = 1; orderIndex <= 5; orderIndex++) {
+ Collection<Item> items = new ArrayList<>();
+ Order order = new Order("ORDER_TEST_" + orderIndex, items);
+
+ for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
+ Order subOrder = new Order();
+ subOrder.setId("ORDER_ID_" + subOrderIndex);
+ subOrder.setItems(items);
+
+ Item item = new Item(subOrder, "ID_" + subOrderIndex, "Book");
+ order.addItem(item);
+ }
+
+ region.put("items_" + orderIndex, items);
+ }
+
+ String queryResult = queryEngine.queryForJsonResult(QUERY_1, 0, queryResultSetLimit,
+ queryCollectionsDepth);
+ System.out.println("Query Result: " + queryResult);
+
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+
+ }
+
+ @Test
+ public void testCyclicArrayMultipleObjectsMemberWise() throws Exception {
+ region.put(1, new Portfolio(0));
+
+ String queryResult = queryEngine.queryForJsonResult(QUERY_1, "server", 0, queryResultSetLimit,
+ queryCollectionsDepth);
+ System.out.println(queryResult);
+
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+ }
+
+ @Test
+ public void testEmptyObject() throws Exception {
+ EmptyObject emptyObject = new EmptyObject();
+ region.put("port", emptyObject);
+ String queryResult = queryEngine.queryForJsonResult(QUERY_1, 0, queryResultSetLimit,
+ queryCollectionsDepth);
+ String expectedResult = "{\"result\":[[\"org.apache.geode.management.model.EmptyObject\",{}]]}";
+ assertThat(queryResult).isEqualToIgnoringWhitespace(expectedResult);
+
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+ }
+
+ @Test
+ public void testSubClassOverridingMethods() throws Exception {
+ SubOrder subOrder = new SubOrder();
+ region.put("port", subOrder);
+ String queryResult = queryEngine.queryForJsonResult(QUERY_1, 0, queryResultSetLimit,
+ queryCollectionsDepth);
+ String expectedResult =
+ "{\"result\":[[\"org.apache.geode.management.model.SubOrder\",{\"items\":[\"java.util.Collection\",{}],\"id\":[\"java.lang.String\",\"null1\"]}]]}";
+ assertThat(queryResult).isEqualToIgnoringWhitespace(expectedResult);
+
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+ }
+
+ @Test
+ public void testNestedPDXObject() throws Exception {
+ PdxInstanceFactory factory = PdxInstanceFactoryImpl.newCreator("Portfolio", false,
+ (InternalCache) region.getCache());
+
+ factory.writeInt("ID", 111);
+ factory.writeString("status", "active");
+ factory.writeString("secId", "IBM");
+
+ PdxInstance pdxInstance = factory.create();
+
+ region.put("port", pdxInstance);
+
+ String queryResult = queryEngine.queryForJsonResult(QUERY_1, 0, queryResultSetLimit,
+ queryCollectionsDepth);
+ String expectedResult =
+ "{\"result\":[[\"org.apache.geode.pdx.PdxInstance\",{\"ID\":[\"java.lang.Integer\",111],\"status\":[\"java.lang.String\",\"active\"],\"secId\":[\"java.lang.String\",\"IBM\"]}]]}";
+ assertThat(queryResult).isEqualToIgnoringWhitespace(expectedResult);
+
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+ }
+
+ @Test
+ public void testArrayWithNullValues() throws Exception {
+ SubOrder[] subOrderArray = new SubOrder[2];
+ subOrderArray[0] = new SubOrder();
+ subOrderArray[1] = null;
+
+ region.put("p1", subOrderArray);
+
+ String queryResult = queryEngine.queryForJsonResult(QUERY_1, 0, queryResultSetLimit,
+ queryCollectionsDepth);
+ String expectedResult =
+ "{\"result\":[[\"org.apache.geode.management.model.SubOrder[]\",[{\"items\":[\"java.util.Collection\",{}],\"id\":[\"java.lang.String\",\"null1\"]},null]]]}";
+ assertThat(queryResult).isEqualToIgnoringWhitespace(expectedResult);
+
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+ }
+
+ @Test
+ public void testWithSqlDate() throws Exception {
+ SubOrder[] subOrderArray = new SubOrder[2];
+ subOrderArray[0] = new SubOrder();
+ subOrderArray[1] = null;
+
+ region.put("p1", subOrderArray);
+
+ String queryResult = queryEngine.queryForJsonResult(QUERY_1, 0, queryResultSetLimit,
+ queryCollectionsDepth);
+ String expectedResult =
+ "{\"result\":[[\"org.apache.geode.management.model.SubOrder[]\",[{\"items\":[\"java.util.Collection\",{}],\"id\":[\"java.lang.String\",\"null1\"]},null]]]}";
+ assertThat(queryResult).isEqualToIgnoringWhitespace(expectedResult);
+
+ // If not correct JSON format this will throw a JSONException
+ new JSONObject(queryResult);
+ }
+
+
+ public static class Root {
+ public Root getRoot() {
+ return this;
+ }
+
+ public Child getChild() { // getter that returns unique instances
+ return new Child();
+ }
+ }
+
+ public static class Child {
+ public Child getChild() {
+ return this;
+ }
+ }
+
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/QueryDataFunctionIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/QueryDataFunctionIntegrationTest.java
deleted file mode 100644
index 9803700..0000000
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/QueryDataFunctionIntegrationTest.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management;
-
-import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_TIME_STATISTICS;
-import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
-import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_HTTP_PORT;
-import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
-import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLE_RATE;
-import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Properties;
-
-import org.json.JSONObject;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.query.data.Portfolio;
-import org.apache.geode.cache.query.data.Position;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.management.internal.ManagementConstants;
-import org.apache.geode.management.internal.beans.QueryDataFunction;
-import org.apache.geode.management.internal.cli.json.TypedJson;
-import org.apache.geode.management.model.EmptyObject;
-import org.apache.geode.management.model.Item;
-import org.apache.geode.management.model.Order;
-import org.apache.geode.management.model.SubOrder;
-import org.apache.geode.pdx.PdxInstance;
-import org.apache.geode.pdx.PdxInstanceFactory;
-import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
-import org.apache.geode.test.junit.categories.GfshTest;
-
-/**
- * Functional integration tests for {@link QueryDataFunction}.
- *
- * TODO: this test should really have some assertions
- *
- * @since GemFire 8.1
- */
-@Category({GfshTest.class})
-public class QueryDataFunctionIntegrationTest {
-
- private static final String REPLICATED_REGION = "exampleRegion";
- private static final String QUERY_1 = "SELECT * FROM /exampleRegion";
-
- /**
- * Number of rows queryData operation will return. By default it will be 1000
- */
- private static final int queryResultSetLimit = ManagementConstants.DEFAULT_QUERY_LIMIT;
-
- /**
- * Number of elements to be shown in queryData operation if query results contain collections like
- * Map, List etc.
- */
- private static final int queryCollectionsDepth = TypedJson.DEFAULT_COLLECTION_ELEMENT_LIMIT;
-
- private InternalDistributedSystem system;
- private Region<Object, Object> replicatedRegion;
- private DistributedMember member;
- private Cache cache;
-
- @Before
- public void setUp() throws Exception {
- Properties config = new Properties();
- config.setProperty(MCAST_PORT, "0");
- config.setProperty(ENABLE_TIME_STATISTICS, "true");
- config.setProperty(STATISTIC_SAMPLING_ENABLED, "false");
- config.setProperty(STATISTIC_SAMPLE_RATE, "60000");
- config.setProperty(JMX_MANAGER, "true");
- config.setProperty(JMX_MANAGER_START, "true");
- config.setProperty(JMX_MANAGER_HTTP_PORT, "0");
- config.setProperty(JMX_MANAGER_PORT, "0");
-
- system = (InternalDistributedSystem) DistributedSystem.connect(config);
- member = system.getDistributedMember();
- Cache cache = new CacheFactory().create();
-
- RegionFactory regionFactory = cache.createRegionFactory(RegionShortcut.REPLICATE);
- replicatedRegion = regionFactory.<String, Object>create(REPLICATED_REGION);
- }
-
- @After
- public void tearDown() throws Exception {
- system.disconnect();
- }
-
- /**
- * Tests a model where Objects have a circular reference with object reference. e.g. Order1--
- * Has--> Items each Item --Has --> OrderN where (OrderN == Order1)
- * <p>
- *
- * RegressionTest for TRAC #51048: Disk Read/ Write shows negative at cluster level JMX
- */
- @Test
- public void testCyclicWithNestedObjectReference() throws Exception {
- Order order = new Order();
- order.setId("test");
-
- for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
- Item item = new Item(order, "ID_" + subOrderIndex, "Book");
- order.addItem(item);
- }
-
- replicatedRegion.put("order1", order);
-
- queryData(QUERY_1, "", 0);
- }
-
- /**
- * Tests a model where Objects have a circular reference with their class types. e.g. Order1--
- * Has--> Items each Item --Has --> OrderN where (OrderN != Order1)
- */
- @Test
- public void testCyclicWithNestedClasses() throws Exception {
- Order order = new Order();
- order.setId("test");
-
- for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
- Order subOrder = new Order();
- subOrder.setId("ORDER_ID_" + subOrderIndex);
-
- Item item = new Item(subOrder, "ID_" + subOrderIndex, "Book");
- order.addItem(item);
- }
-
- replicatedRegion.put("order1", order);
-
- queryData(QUERY_1, "", 0);
- }
-
- /**
- * Tests a model where Objects have a circular reference with their class types. e.g. Order1--
- * Has--> Items each Item --Has --> OrderN where (OrderN != Order1)
- */
- @Test
- public void testCyclicWithNestedRefernce2ndLayer() throws Exception {
- Collection<Item> items = new ArrayList<>();
- Order order = new Order("ORDER_TEST", items);
-
- for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
- Order subOrder = new Order();
- subOrder.setId("ORDER_ID_" + subOrderIndex);
- subOrder.setItems(items);
-
- Item item = new Item(subOrder, "ID_" + subOrderIndex, "Book");
- order.addItem(item);
- }
-
- replicatedRegion.put("order1", order);
-
- queryData(QUERY_1, "", 0);
- }
-
- @Test
- public void testCyclicWithCollection1stLayer() throws Exception {
- Collection<Item> items = new ArrayList<>();
- Order order = new Order("ORDER_TEST", items);
-
- for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
- Order subOrder = new Order();
- subOrder.setId("ORDER_ID_" + subOrderIndex);
- subOrder.setItems(items);
-
- Item item = new Item(subOrder, "ID_" + subOrderIndex, "Book");
- order.addItem(item);
- }
-
- replicatedRegion.put("items", items);
-
- queryData(QUERY_1, "", 0);
- }
-
- @Test
- public void testCyclicCollectionWithMultipleObjects() throws Exception {
- for (int orderIndex = 1; orderIndex <= 5; orderIndex++) {
- Collection<Item> items = new ArrayList<>();
- Order order = new Order("ORDER_TEST_" + orderIndex, items);
-
- for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
- Order subOrder = new Order();
- subOrder.setId("ORDER_ID_" + subOrderIndex);
- subOrder.setItems(items);
-
- Item item = new Item(subOrder, "ID_" + subOrderIndex, "Book");
- order.addItem(item);
- }
-
- replicatedRegion.put("items_" + orderIndex, items);
- }
-
- queryData(QUERY_1, "", 0);
- }
-
- @Test
- public void testCyclicArrayMultipleObjects() throws Exception {
- for (int orderIndex = 1; orderIndex <= 5; orderIndex++) {
- Collection<Item> items = new ArrayList<>();
- Order order = new Order("ORDER_TEST_" + orderIndex, items);
-
- for (int subOrderIndex = 1; subOrderIndex <= 5; subOrderIndex++) {
- Order subOrder = new Order();
- subOrder.setId("ORDER_ID_" + subOrderIndex);
- subOrder.setItems(items);
-
- Item item = new Item(subOrder, "ID_" + subOrderIndex, "Book");
- order.addItem(item);
- }
-
- replicatedRegion.put("items_" + orderIndex, items);
- }
-
- queryData(QUERY_1, "", 0);
- }
-
- @Test
- public void testCyclicArrayMultipleObjectsMemberWise() throws Exception {
- Portfolio[] portfolios = createPortfoliosAndPositions(1);
- int portfolioId = 1;
- for (Portfolio portfolio : portfolios) {
- replicatedRegion.put(portfolioId, portfolio);
- portfolioId++;
- }
-
- queryData(QUERY_1, member.getId(), 0);
- }
-
- @Test
- public void testEmptyObject() throws Exception {
- EmptyObject emptyObject = new EmptyObject();
- replicatedRegion.put("port", emptyObject);
- queryData(QUERY_1, "", 0);
- }
-
- @Test
- public void testSubClassOverridingMethods() throws Exception {
- SubOrder subOrder = new SubOrder();
- replicatedRegion.put("port", subOrder);
- queryData(QUERY_1, "", 0);
- }
-
- @Test
- public void testNestedPDXObject() throws Exception {
- PdxInstanceFactory factory = PdxInstanceFactoryImpl.newCreator("Portfolio", false,
- (InternalCache) replicatedRegion.getCache());
-
- factory.writeInt("ID", 111);
- factory.writeString("status", "active");
- factory.writeString("secId", "IBM");
-
- PdxInstance pdxInstance = factory.create();
-
- replicatedRegion.put("port", pdxInstance);
-
- queryData(QUERY_1, "", 0);
- }
-
- @Test
- public void testArrayWithNullValues() throws Exception {
- SubOrder[] subOrderArray = new SubOrder[2];
- subOrderArray[0] = new SubOrder();
- subOrderArray[1] = null;
-
- replicatedRegion.put("p1", subOrderArray);
-
- queryData(QUERY_1, "", 0);
- }
-
- @Test
- public void testWithSqlDate() throws Exception {
- SubOrder[] subOrderArray = new SubOrder[2];
- subOrderArray[0] = new SubOrder();
- subOrderArray[1] = null;
-
- replicatedRegion.put("p1", subOrderArray);
-
- queryData(QUERY_1, "", 0);
- }
-
- private void queryData(final String query, final String members, final int limit)
- throws Exception {
- Object result = QueryDataFunction.queryData(query, members, limit, false, queryResultSetLimit,
- queryCollectionsDepth);
- String queryResult = (String) result;
- System.out.println("Query Result: " + queryResult);
-
- // If not correct JSON format this will throw a JSONException
- JSONObject jsonObject = new JSONObject(queryResult);
- System.out.println("Query Result: " + jsonObject);
- }
-
- private Portfolio[] createPortfoliosAndPositions(final int count) {
- Position.cnt = 0; // reset Portfolio counter
- Portfolio[] portfolios = new Portfolio[count];
- for (int i = 0; i < count; i++) {
- portfolios[i] = new Portfolio(i);
- }
- return portfolios;
- }
-}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/BeanUtilFuncs.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/BeanUtilFuncs.java
index 14d9005..63d2ef4 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/BeanUtilFuncs.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/BeanUtilFuncs.java
@@ -22,16 +22,11 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
-import java.util.Set;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.management.GemFireProperties;
-import org.apache.geode.management.internal.ManagementAgent;
-import org.apache.geode.management.internal.cli.CliUtil;
/**
* Various Utility Functions to be used by MBeans
@@ -119,22 +114,6 @@ public class BeanUtilFuncs {
return tailSystemLog(logFile, numLines);
}
- public static DistributedMember getDistributedMemberByNameOrId(String memberNameOrId) {
- DistributedMember memberFound = null;
-
- if (memberNameOrId != null) {
- InternalCache cache = ManagementAgent.getCache();
- Set<DistributedMember> memberSet = CliUtil.getAllMembers(cache);
- for (DistributedMember member : memberSet) {
- if (memberNameOrId.equals(member.getId()) || memberNameOrId.equals(member.getName())) {
- memberFound = member;
- break;
- }
- }
- }
- return memberFound;
- }
-
public static GemFireProperties initGemfireProperties(DistributionConfig config) {
String memberGroups = "";
String configFile = null;
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DataQueryEngine.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DataQueryEngine.java
new file mode 100644
index 0000000..22bc140
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DataQueryEngine.java
@@ -0,0 +1,295 @@
+package org.apache.geode.management.internal.beans;
+
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.cache.query.QueryInvalidException;
+import org.apache.geode.cache.query.internal.CompiledValue;
+import org.apache.geode.cache.query.internal.QCompiler;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.management.DistributedRegionMXBean;
+import org.apache.geode.management.internal.ManagementConstants;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.management.internal.cli.CliUtil;
+import org.apache.geode.management.internal.cli.json.GfJsonException;
+import org.apache.geode.management.internal.cli.json.GfJsonObject;
+
+/**
+ * this is used by DistributedSystemBridge.queryData() call. It calls QueryDataFunction on each
+ * member in the input
+ */
+public class DataQueryEngine {
+ private static final Logger logger = LogService.getLogger();
+
+ // these numbers represents function argument index
+ private static final int DISPLAY_MEMBERWISE = 0;
+ private static final int QUERY = 1;
+ private static final int REGION = 2;
+ private static final int LIMIT = 3;
+ private static final int QUERY_RESULTSET_LIMIT = 4;
+ private static final int QUERY_COLLECTIONS_DEPTH = 5;
+
+ private SystemManagementService service;
+ private InternalCache cache;
+
+ public DataQueryEngine(SystemManagementService service, InternalCache cache) {
+ this.service = service;
+ this.cache = cache;
+ }
+
+ public String queryForJsonResult(final String query, final int limit,
+ final int queryResultSetLimit, final int queryCollectionsDepth)
+ throws Exception {
+ return (String) queryData(query, null, limit, false, queryResultSetLimit,
+ queryCollectionsDepth);
+ }
+
+ public String queryForJsonResult(final String query, String members, final int limit,
+ final int queryResultSetLimit, final int queryCollectionsDepth)
+ throws Exception {
+ return (String) queryData(query, members, limit, false, queryResultSetLimit,
+ queryCollectionsDepth);
+ }
+
+ public byte[] queryForCompressedResult(final String query, final int limit,
+ final int queryResultSetLimit, final int queryCollectionsDepth)
+ throws Exception {
+ return (byte[]) queryData(query, null, limit, true, queryResultSetLimit, queryCollectionsDepth);
+ }
+
+ public Object queryData(final String query, final String members, final int limit,
+ final boolean zipResult, final int queryResultSetLimit, final int queryCollectionsDepth)
+ throws Exception {
+
+ if (query == null || query.isEmpty()) {
+ return new JsonisedErrorMessage("Query is either empty or Null")
+ .toString();
+ }
+
+ Set<DistributedMember> inputMembers = null;
+ if (StringUtils.isNotBlank(members)) {
+ inputMembers = CliUtil.findMembers(null, members.split(","), cache);
+ if (inputMembers.size() == 0) {
+ return new JsonisedErrorMessage(
+ String.format("Query is invalid due to invalid member : %s", members)).toString();
+ }
+ }
+
+ try {
+ Set<String> regionsInQuery = compileQuery(query);
+
+ // Validate region existence
+ if (regionsInQuery.size() > 0) {
+ for (String regionPath : regionsInQuery) {
+ DistributedRegionMXBean regionMBean = service.getDistributedRegionMXBean(regionPath);
+ if (regionMBean == null) {
+ return new JsonisedErrorMessage(
+ String.format("Cannot find regions %s in any of the members", regionPath))
+ .toString();
+ } else {
+ Set<DistributedMember> associatedMembers =
+ CliUtil.getRegionAssociatedMembers(regionPath, cache, true);
+
+ if (inputMembers != null && inputMembers.size() > 0) {
+ if (!associatedMembers.containsAll(inputMembers)) {
+ return new JsonisedErrorMessage(
+ String.format("Cannot find regions %s in specified members", regionPath))
+ .toString();
+ }
+ }
+ }
+ }
+ } else {
+ return new JsonisedErrorMessage(String.format("Query is invalid due to error : %s",
+ "Region mentioned in query probably missing /")).toString();
+ }
+
+ // Validate
+ if (regionsInQuery.size() > 1 && inputMembers == null) {
+ for (String regionPath : regionsInQuery) {
+ DistributedRegionMXBean regionMBean = service.getDistributedRegionMXBean(regionPath);
+
+ if (regionMBean.getRegionType().equals(DataPolicy.PARTITION.toString())
+ || regionMBean.getRegionType().equals(DataPolicy.PERSISTENT_PARTITION.toString())) {
+ return new JsonisedErrorMessage(
+ "Join operation can only be executed on targeted members, please give member input")
+ .toString();
+ }
+ }
+ }
+
+ String randomRegion = regionsInQuery.iterator().next();
+
+ // get the first available member
+ Set<DistributedMember> associatedMembers =
+ CliUtil.getQueryRegionsAssociatedMembers(regionsInQuery, cache, false);
+
+ if (associatedMembers != null && associatedMembers.size() > 0) {
+ Object[] functionArgs = new Object[6];
+ if (inputMembers != null && inputMembers.size() > 0) {// on input
+ // members
+
+ functionArgs[DISPLAY_MEMBERWISE] = true;
+ functionArgs[QUERY] = query;
+ functionArgs[REGION] = randomRegion;
+ functionArgs[LIMIT] = limit;
+ functionArgs[QUERY_RESULTSET_LIMIT] = queryResultSetLimit;
+ functionArgs[QUERY_COLLECTIONS_DEPTH] = queryCollectionsDepth;
+ return callFunction(functionArgs, inputMembers, zipResult);
+ } else { // Query on any random member
+ functionArgs[DISPLAY_MEMBERWISE] = false;
+ functionArgs[QUERY] = query;
+ functionArgs[REGION] = randomRegion;
+ functionArgs[LIMIT] = limit;
+ functionArgs[QUERY_RESULTSET_LIMIT] = queryResultSetLimit;
+ functionArgs[QUERY_COLLECTIONS_DEPTH] = queryCollectionsDepth;
+ return callFunction(functionArgs, associatedMembers, zipResult);
+ }
+
+ } else {
+ return new JsonisedErrorMessage(String
+ .format("Cannot find regions %s in any of the members", regionsInQuery.toString()))
+ .toString();
+ }
+
+ } catch (QueryInvalidException qe) {
+ return new JsonisedErrorMessage(
+ String.format("Query is invalid due to error : %s", qe.getMessage()))
+ .toString();
+ }
+ }
+
+ private static Object callFunction(final Object functionArgs,
+ final Set<DistributedMember> members, final boolean zipResult) throws Exception {
+
+ try {
+ if (members.size() == 1) {
+ DistributedMember member = members.iterator().next();
+ ResultCollector collector = FunctionService.onMember(member).setArguments(functionArgs)
+ .execute(ManagementConstants.QUERY_DATA_FUNCTION);
+ List list = (List) collector.getResult();
+ Object object = null;
+ if (list.size() > 0) {
+ object = list.get(0);
+ }
+
+ if (object instanceof Throwable) {
+ throw (Throwable) object;
+ }
+
+ byte[] result = (byte[]) object;
+ if (zipResult) { // The result is already compressed
+ return result;
+ } else {
+ Object[] functionArgsList = (Object[]) functionArgs;
+ boolean showMember = (Boolean) functionArgsList[DISPLAY_MEMBERWISE];
+ if (showMember) {// Added to show a single member similar to multiple
+ // member.
+ // Note , if no member is selected this is the code path executed. A
+ // random associated member is chosen.
+ List<String> decompressedList = new ArrayList<>();
+ decompressedList.add(BeanUtilFuncs.decompress(result));
+ return wrapResult(decompressedList.toString());
+ }
+ return BeanUtilFuncs.decompress(result);
+ }
+
+ } else { // More than 1 Member
+ ResultCollector coll = FunctionService.onMembers(members).setArguments(functionArgs)
+ .execute(ManagementConstants.QUERY_DATA_FUNCTION);
+
+ List list = (List) coll.getResult();
+ Object object = list.get(0);
+ if (object instanceof Throwable) {
+ throw (Throwable) object;
+ }
+
+ Iterator<byte[]> it = list.iterator();
+ List<String> decompressedList = new ArrayList<>();
+
+ while (it.hasNext()) {
+ String decompressedStr;
+ decompressedStr = BeanUtilFuncs.decompress(it.next());
+ decompressedList.add(decompressedStr);
+ }
+
+ if (zipResult) {
+ return BeanUtilFuncs.compress(wrapResult(decompressedList.toString()));
+ } else {
+ return wrapResult(decompressedList.toString());
+ }
+
+ }
+ } catch (FunctionException fe) {
+ throw new Exception(
+ String.format("Query could not be executed due to : %s", fe.getMessage()));
+ } catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ } catch (Throwable e) {
+ SystemFailure.checkFailure();
+ throw new Exception(String.format("Query could not be executed due to : %s", e.getMessage()));
+ }
+ }
+
+ private static String wrapResult(final String str) {
+ StringWriter w = new StringWriter();
+ synchronized (w.getBuffer()) {
+ w.write("{\"result\":");
+ w.write(str);
+ w.write("}");
+ return w.toString();
+ }
+ }
+
+ private Set<String> compileQuery(final String query)
+ throws QueryInvalidException {
+ QCompiler compiler = new QCompiler();
+ Set<String> regionsInQuery;
+ try {
+ CompiledValue compiledQuery = compiler.compileQuery(query);
+ Set<String> regions = new HashSet<>();
+ compiledQuery.getRegionsInQuery(regions, null);
+ regionsInQuery = Collections.unmodifiableSet(regions);
+ return regionsInQuery;
+ } catch (QueryInvalidException qe) {
+ logger.error("{} Failed, Error {}", query, qe.getMessage(), qe);
+ throw qe;
+ }
+ }
+
+ static class JsonisedErrorMessage {
+
+ private static String message = "message";
+
+ private GfJsonObject gFJsonObject = new GfJsonObject();
+
+ public JsonisedErrorMessage(final String errorMessage) throws Exception {
+ try {
+ gFJsonObject.put(message, errorMessage);
+ } catch (GfJsonException e) {
+ throw new Exception(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return gFJsonObject.toString();
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
index c9a2f8a..a691f80 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java
@@ -83,6 +83,7 @@ import org.apache.geode.management.internal.beans.stats.GatewayReceiverClusterSt
import org.apache.geode.management.internal.beans.stats.GatewaySenderClusterStatsMonitor;
import org.apache.geode.management.internal.beans.stats.MemberClusterStatsMonitor;
import org.apache.geode.management.internal.beans.stats.ServerClusterStatsMonitor;
+import org.apache.geode.management.internal.cli.CliUtil;
import org.apache.geode.management.internal.cli.json.TypedJson;
/**
@@ -208,6 +209,11 @@ public class DistributedSystemBridge {
private int queryCollectionsDepth = TypedJson.DEFAULT_COLLECTION_ELEMENT_LIMIT;
/**
+ * used to issue queries
+ */
+ private DataQueryEngine dataQueryEngine;
+
+ /**
* Helper method to get a member bean reference given a member name or id
*
* @param member name or id of the member
@@ -240,6 +246,7 @@ public class DistributedSystemBridge {
this.dm = system.getDistributionManager();
this.alertLevel = ManagementConstants.DEFAULT_ALERT_LEVEL;
this.thisMemberName = MBeanJMXAdapter.getMemberMBeanName(system.getDistributedMember());
+ this.dataQueryEngine = new DataQueryEngine(service, cache);
this.distributedSystemId = this.system.getConfig().getDistributedSystemId();
@@ -1129,7 +1136,7 @@ public class DistributedSystemBridge {
public ObjectName[] listGatewaySenderObjectNames(String member) throws Exception {
validateMember(member);
- DistributedMember distributedMember = BeanUtilFuncs.getDistributedMemberByNameOrId(member);
+ DistributedMember distributedMember = CliUtil.getDistributedMemberByNameOrId(member, cache);
List<ObjectName> listName = null;
@@ -1416,14 +1423,14 @@ public class DistributedSystemBridge {
}
public String queryData(String query, String members, int limit) throws Exception {
- Object result = QueryDataFunction.queryData(query, members, limit, false, queryResultSetLimit,
+ Object result = dataQueryEngine.queryData(query, members, limit, false, queryResultSetLimit,
queryCollectionsDepth);
return (String) result;
}
public byte[] queryDataForCompressedResult(String query, String members, int limit)
throws Exception {
- Object result = QueryDataFunction.queryData(query, members, limit, true, queryResultSetLimit,
+ Object result = dataQueryEngine.queryData(query, members, limit, true, queryResultSetLimit,
queryCollectionsDepth);
return (byte[]) result;
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java
index e265817..997c283 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/QueryDataFunction.java
@@ -14,22 +14,14 @@
*/
package org.apache.geode.management.internal.beans;
-import java.io.Serializable;
-import java.io.StringWriter;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
import java.util.Set;
-import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
-import org.apache.geode.SystemFailure;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.Function;
@@ -39,13 +31,9 @@ import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.query.Query;
-import org.apache.geode.cache.query.QueryInvalidException;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
-import org.apache.geode.cache.query.internal.CompiledValue;
import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.cache.query.internal.QCompiler;
-import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.InternalEntity;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.InternalCache;
@@ -54,19 +42,12 @@ import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.execute.InternalFunction;
import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.management.DistributedRegionMXBean;
-import org.apache.geode.management.ManagementService;
-import org.apache.geode.management.internal.ManagementAgent;
import org.apache.geode.management.internal.ManagementConstants;
-import org.apache.geode.management.internal.SystemManagementService;
-import org.apache.geode.management.internal.cli.CliUtil;
-import org.apache.geode.management.internal.cli.json.GfJsonException;
-import org.apache.geode.management.internal.cli.json.GfJsonObject;
import org.apache.geode.management.internal.cli.json.TypedJson;
/**
* This function is executed on one or multiple members based on the member input to
- * DistributedSystemMXBean.queryData()
+ * DataQueryEngine.queryData()
*/
@SuppressWarnings({"deprecation", "unchecked"})
public class QueryDataFunction implements Function, InternalEntity {
@@ -78,7 +59,6 @@ public class QueryDataFunction implements Function, InternalEntity {
private static final String MEMBER_KEY = "member";
private static final String RESULT_KEY = "result";
private static final String NO_DATA_FOUND = "No Data Found";
- private static final String QUERY_EXEC_SUCCESS = "Query Executed Successfully";
private static final int DISPLAY_MEMBERWISE = 0;
private static final int QUERY = 1;
private static final int REGION = 2;
@@ -123,11 +103,11 @@ public class QueryDataFunction implements Function, InternalEntity {
return ManagementConstants.QUERY_DATA_FUNCTION;
}
- private QueryDataFunctionResult selectWithType(final FunctionContext context, String queryString,
+ // return the compressed result data
+ private byte[] selectWithType(final FunctionContext context, String queryString,
final boolean showMember, final String regionName, final int limit,
final int queryResultSetLimit, final int queryCollectionsDepth) throws Exception {
- InternalCache cache =
- ((InternalCache) context.getCache()).getCacheForProcessingClientRequests();
+ InternalCache cache = (InternalCache) context.getCache();
Function localQueryFunc = new LocalQueryFunction("LocalQueryFunction", regionName, showMember)
.setOptimizeForWrite(true);
queryString = applyLimitClause(queryString, limit, queryResultSetLimit);
@@ -198,11 +178,10 @@ public class QueryDataFunction implements Function, InternalEntity {
}
if (noDataFound) {
- return new QueryDataFunctionResult(QUERY_EXEC_SUCCESS,
- BeanUtilFuncs.compress(new JsonisedErrorMessage(NO_DATA_FOUND).toString()));
+ return BeanUtilFuncs
+ .compress(new DataQueryEngine.JsonisedErrorMessage(NO_DATA_FOUND).toString());
}
- return new QueryDataFunctionResult(QUERY_EXEC_SUCCESS,
- BeanUtilFuncs.compress(result.toString()));
+ return BeanUtilFuncs.compress(result.toString());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
throw e;
@@ -240,245 +219,7 @@ public class QueryDataFunction implements Function, InternalEntity {
return query;
}
- private static Object callFunction(final Object functionArgs,
- final Set<DistributedMember> members, final boolean zipResult) throws Exception {
- try {
- if (members.size() == 1) {
- DistributedMember member = members.iterator().next();
- ResultCollector collector = FunctionService.onMember(member).setArguments(functionArgs)
- .execute(ManagementConstants.QUERY_DATA_FUNCTION);
- List list = (List) collector.getResult();
- Object object = null;
- if (list.size() > 0) {
- object = list.get(0);
- }
-
- if (object instanceof Throwable) {
- throw (Throwable) object;
- }
-
- QueryDataFunctionResult result = (QueryDataFunctionResult) object;
- if (zipResult) { // The result is already compressed
- return result.compressedBytes;
- } else {
- Object[] functionArgsList = (Object[]) functionArgs;
- boolean showMember = (Boolean) functionArgsList[DISPLAY_MEMBERWISE];
- if (showMember) {// Added to show a single member similar to multiple
- // member.
- // Note , if no member is selected this is the code path executed. A
- // random associated member is chosen.
- List<String> decompressedList = new ArrayList<>();
- decompressedList.add(BeanUtilFuncs.decompress(result.compressedBytes));
- return wrapResult(decompressedList.toString());
- }
- return BeanUtilFuncs.decompress(result.compressedBytes);
- }
-
- } else { // More than 1 Member
- ResultCollector coll = FunctionService.onMembers(members).setArguments(functionArgs)
- .execute(ManagementConstants.QUERY_DATA_FUNCTION);
-
- List list = (List) coll.getResult();
- Object object = list.get(0);
- if (object instanceof Throwable) {
- throw (Throwable) object;
- }
-
- Iterator<QueryDataFunctionResult> it = list.iterator();
- List<String> decompressedList = new ArrayList<>();
-
- while (it.hasNext()) {
- String decompressedStr;
- decompressedStr = BeanUtilFuncs.decompress(it.next().compressedBytes);
- decompressedList.add(decompressedStr);
- }
-
- if (zipResult) {
- return BeanUtilFuncs.compress(wrapResult(decompressedList.toString()));
- } else {
- return wrapResult(decompressedList.toString());
- }
-
- }
- } catch (FunctionException fe) {
- throw new Exception(
- String.format("Query could not be executed due to : %s", fe.getMessage()));
- } catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- } catch (Throwable e) {
- SystemFailure.checkFailure();
- throw new Exception(String.format("Query could not be executed due to : %s", e.getMessage()));
- }
- }
-
- private static String wrapResult(final String str) {
- StringWriter w = new StringWriter();
- synchronized (w.getBuffer()) {
- w.write("{\"result\":");
- w.write(str);
- w.write("}");
- return w.toString();
- }
- }
-
- public static Object queryData(final String query, final String members, final int limit,
- final boolean zipResult, final int queryResultSetLimit, final int queryCollectionsDepth)
- throws Exception {
-
- if (query == null || query.isEmpty()) {
- return new JsonisedErrorMessage("Query is either empty or Null")
- .toString();
- }
-
- Set<DistributedMember> inputMembers = null;
- if (StringUtils.isNotBlank(members)) {
- inputMembers = new HashSet<>();
- StringTokenizer st = new StringTokenizer(members, ",");
- while (st.hasMoreTokens()) {
- String member = st.nextToken();
- DistributedMember distributedMember = BeanUtilFuncs.getDistributedMemberByNameOrId(member);
- inputMembers.add(distributedMember);
- if (distributedMember == null) {
- return new JsonisedErrorMessage(
- String.format("Query is invalid due to invalid member : %s", member)).toString();
- }
- }
- }
-
- InternalCache cache = ManagementAgent.getCache();
- try {
-
- SystemManagementService service =
- (SystemManagementService) ManagementService.getExistingManagementService(cache);
- Set<String> regionsInQuery = compileQuery(cache, query);
-
- // Validate region existence
- if (regionsInQuery.size() > 0) {
- for (String regionPath : regionsInQuery) {
- DistributedRegionMXBean regionMBean = service.getDistributedRegionMXBean(regionPath);
- if (regionMBean == null) {
- return new JsonisedErrorMessage(
- String.format("Cannot find regions %s in any of the members", regionPath))
- .toString();
- } else {
- Set<DistributedMember> associatedMembers =
- CliUtil.getRegionAssociatedMembers(regionPath, cache, true);
-
- if (inputMembers != null && inputMembers.size() > 0) {
- if (!associatedMembers.containsAll(inputMembers)) {
- return new JsonisedErrorMessage(
- String.format("Cannot find regions %s in specified members", regionPath))
- .toString();
- }
- }
- }
- }
- } else {
- return new JsonisedErrorMessage(String.format("Query is invalid due to error : %s",
- "Region mentioned in query probably missing /")).toString();
- }
-
- // Validate
- if (regionsInQuery.size() > 1 && inputMembers == null) {
- for (String regionPath : regionsInQuery) {
- DistributedRegionMXBean regionMBean = service.getDistributedRegionMXBean(regionPath);
-
- if (regionMBean.getRegionType().equals(DataPolicy.PARTITION.toString())
- || regionMBean.getRegionType().equals(DataPolicy.PERSISTENT_PARTITION.toString())) {
- return new JsonisedErrorMessage(
- "Join operation can only be executed on targeted members, please give member input")
- .toString();
- }
- }
- }
-
- String randomRegion = regionsInQuery.iterator().next();
-
- // get the first available member
- Set<DistributedMember> associatedMembers =
- CliUtil.getQueryRegionsAssociatedMembers(regionsInQuery, cache, false);
-
- if (associatedMembers != null && associatedMembers.size() > 0) {
- Object[] functionArgs = new Object[6];
- if (inputMembers != null && inputMembers.size() > 0) {// on input
- // members
-
- functionArgs[DISPLAY_MEMBERWISE] = true;
- functionArgs[QUERY] = query;
- functionArgs[REGION] = randomRegion;
- functionArgs[LIMIT] = limit;
- functionArgs[QUERY_RESULTSET_LIMIT] = queryResultSetLimit;
- functionArgs[QUERY_COLLECTIONS_DEPTH] = queryCollectionsDepth;
- return callFunction(functionArgs, inputMembers, zipResult);
- } else { // Query on any random member
- functionArgs[DISPLAY_MEMBERWISE] = false;
- functionArgs[QUERY] = query;
- functionArgs[REGION] = randomRegion;
- functionArgs[LIMIT] = limit;
- functionArgs[QUERY_RESULTSET_LIMIT] = queryResultSetLimit;
- functionArgs[QUERY_COLLECTIONS_DEPTH] = queryCollectionsDepth;
- return callFunction(functionArgs, associatedMembers, zipResult);
- }
-
- } else {
- return new JsonisedErrorMessage(String
- .format("Cannot find regions %s in any of the members", regionsInQuery.toString()))
- .toString();
- }
-
- } catch (QueryInvalidException qe) {
- return new JsonisedErrorMessage(
- String.format("Query is invalid due to error : %s", qe.getMessage()))
- .toString();
- }
- }
-
- private static class JsonisedErrorMessage {
-
- private static String message = "message";
-
- private GfJsonObject gFJsonObject = new GfJsonObject();
-
- public JsonisedErrorMessage(final String errorMessage) throws Exception {
- try {
- gFJsonObject.put(message, errorMessage);
- } catch (GfJsonException e) {
- throw new Exception(e);
- }
- }
-
- @Override
- public String toString() {
- return gFJsonObject.toString();
- }
- }
-
- /**
- * Compile the query and return a set of regions involved in the query It throws an
- * QueryInvalidException if the query is not proper
- *
- * @param cache current cache
- * @param query input query
- *
- * @return a set of regions involved in the query
- */
- private static Set<String> compileQuery(final InternalCache cache, final String query)
- throws QueryInvalidException {
- QCompiler compiler = new QCompiler();
- Set<String> regionsInQuery;
- try {
- CompiledValue compiledQuery = compiler.compileQuery(query);
- Set<String> regions = new HashSet<>();
- compiledQuery.getRegionsInQuery(regions, null);
- regionsInQuery = Collections.unmodifiableSet(regions);
- return regionsInQuery;
- } catch (QueryInvalidException qe) {
- logger.error("{} Failed, Error {}", query, qe.getMessage(), qe);
- throw qe;
- }
- }
/**
* Function to gather data locally. This function is required to execute query with region context
@@ -490,11 +231,10 @@ public class QueryDataFunction implements Function, InternalEntity {
private final String id;
private boolean optimizeForWrite = false;
- private boolean showMembers = false;
+ private boolean showMembers;
private String regionName;
public LocalQueryFunction(final String id, final String regionName, final boolean showMembers) {
- super();
this.id = id;
this.regionName = regionName;
this.showMembers = showMembers;
@@ -522,8 +262,7 @@ public class QueryDataFunction implements Function, InternalEntity {
@Override
public void execute(final FunctionContext context) {
- InternalCache cache =
- ((InternalCache) context.getCache()).getCacheForProcessingClientRequests();
+ InternalCache cache = (InternalCache) context.getCache();
QueryService queryService = cache.getQueryService();
String qstr = (String) context.getArguments();
Region r = cache.getRegion(regionName);
@@ -548,25 +287,4 @@ public class QueryDataFunction implements Function, InternalEntity {
return this.id;
}
}
-
- private static class QueryDataFunctionResult implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final String message;
- private final byte[] compressedBytes;
-
- public QueryDataFunctionResult(final String message, final byte[] compressedBytes) {
- this.message = message;
- this.compressedBytes = compressedBytes;
- }
-
- public String getMessage() {
- return message;
- }
-
- public byte[] getCompressedBytes() {
- return compressedBytes;
- }
- }
}
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index e73b202..cf04eb9 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -489,7 +489,6 @@ org/apache/geode/management/internal/NotificationKey,false,currentTime:long,obje
org/apache/geode/management/internal/beans/FileUploader$RemoteFile,false,filename:java/lang/String,outputStream:com/healthmarketscience/rmiio/RemoteOutputStream
org/apache/geode/management/internal/beans/QueryDataFunction,true,1
org/apache/geode/management/internal/beans/QueryDataFunction$LocalQueryFunction,true,1,id:java/lang/String,optimizeForWrite:boolean,regionName:java/lang/String,showMembers:boolean,this$0:org/apache/geode/management/internal/beans/QueryDataFunction
-org/apache/geode/management/internal/beans/QueryDataFunction$QueryDataFunctionResult,true,1,compressedBytes:byte[],message:java/lang/String
org/apache/geode/management/internal/beans/stats/StatType,false
org/apache/geode/management/internal/cli/AbstractCliAroundInterceptor$Response,false,text:java/lang/String
org/apache/geode/management/internal/cli/CliUtil$DeflaterInflaterData,true,1104813333595216795,data:byte[],dataLength:int