You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by tk...@apache.org on 2023/11/21 19:57:37 UTC
(phoenix) branch PHOENIX-628-feature updated: PHOENIX-7073 : Parse JSON columns on the server side (#1712)
This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-628-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-628-feature by this push:
new 2bde91b1d8 PHOENIX-7073 : Parse JSON columns on the server side (#1712)
2bde91b1d8 is described below
commit 2bde91b1d851bbdf6214cce1811969c9b2127f18
Author: RanganathG <ra...@gmail.com>
AuthorDate: Wed Nov 22 01:27:31 2023 +0530
PHOENIX-7073 : Parse JSON columns on the server side (#1712)
* PHOENIX-7073 : Parse JSON columns on the server side
---
.../phoenix/end2end/json/JsonFunctionsIT.java | 241 +++++++++++++++--
.../apache/phoenix/compile/ProjectionCompiler.java | 292 ++++++++++++++-------
.../coprocessor/BaseScannerRegionObserver.java | 4 +-
.../org/apache/phoenix/iterate/ExplainTable.java | 4 +
.../iterate/NonAggregateRegionScannerFactory.java | 110 ++++++--
.../phoenix/iterate/RegionScannerFactory.java | 52 ++--
6 files changed, 534 insertions(+), 169 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
index f01f278b58..7dd083ea97 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/json/JsonFunctionsIT.java
@@ -23,9 +23,9 @@ import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.spi.json.GsonJsonProvider;
import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hbase.TableName;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
@@ -33,13 +33,14 @@ import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.TestUtil;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -48,8 +49,12 @@ import java.sql.SQLException;
import java.util.Properties;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+@Category(ParallelStatsDisabledTest.class)
public class JsonFunctionsIT extends ParallelStatsDisabledIT {
public static String BASIC_JSON = "json/json_functions_basic.json";
public static String FUNCTIONS_TEST_JSON = "json/json_functions_tests.json";
@@ -70,35 +75,43 @@ public class JsonFunctionsIT extends ParallelStatsDisabledIT {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String tableName = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- String ddl = "create table " + tableName + " (pk integer primary key, col integer, jsoncol json)";
+ String ddl = "create table " + tableName + " (pk integer primary key, randomVal integer ,col integer, jsoncol json)";
conn.createStatement().execute(ddl);
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?)");
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?,?)");
stmt.setInt(1, 1);
- stmt.setInt(2, 2);
- stmt.setString(3, basicJson);
+ stmt.setInt(2, 123);
+ stmt.setInt(3, 2);
+ stmt.setString(4, basicJson);
stmt.execute();
conn.commit();
- TestUtil.dumpTable(conn, TableName.valueOf(tableName));
- String queryTemplate ="SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town'), " +
- "JSON_VALUE(jsoncol, '$.info.tags[1]'), JSON_QUERY(jsoncol, '$.info.tags'), JSON_QUERY(jsoncol, '$.info') " +
- " FROM " + tableName +
- " WHERE JSON_VALUE(jsoncol, '$.name') = '%s'";
+ String queryTemplate ="SELECT pk, randomVal, JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town'), " +
+ "JSON_VALUE(jsoncol, '$.info.tags[0]'), JSON_QUERY(jsoncol, '$.info.tags'), JSON_QUERY(jsoncol, '$.info'), " +
+ "JSON_VALUE(jsoncol, '$.info.tags[1]') " +
+ " FROM " + tableName +
+ " WHERE JSON_VALUE(jsoncol, '$.name') = '%s'";
String query = String.format(queryTemplate, "AndersenFamily");
ResultSet rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
- assertEquals("Basic", rs.getString(1));
- assertEquals("Bristol", rs.getString(2));
- assertEquals("Water polo", rs.getString(3));
+ assertEquals("1", rs.getString(1));
+ assertEquals("123", rs.getString(2));
+ assertEquals("Basic", rs.getString(3));
+ assertEquals("Bristol", rs.getString(4));
+ assertEquals("Sport", rs.getString(5));
// returned format is different
- compareJson(rs.getString(4), basicJson, "$.info.tags");
- compareJson(rs.getString(5), basicJson, "$.info");
+ compareJson(rs.getString(6), basicJson, "$.info.tags");
+ compareJson(rs.getString(7), basicJson, "$.info");
+ assertEquals("Water polo", rs.getString(8));
assertFalse(rs.next());
// Now check for empty match
query = String.format(queryTemplate, "Windsors");
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
+
+ // check if the explain plan indicates server side execution
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ assertTrue(QueryUtil.getExplainPlan(rs).contains(" SERVER JSON FUNCTION PROJECTION"));
}
}
@@ -124,10 +137,10 @@ public class JsonFunctionsIT extends ParallelStatsDisabledIT {
conn.createStatement().execute("UPSERT INTO " + tableName + " SELECT pk, col, JSON_MODIFY(jsoncol, '$.info.tags[2]', '\"UpsertSelectVal\"') from " + tableName);
String queryTemplate ="SELECT JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town'), " +
- "JSON_VALUE(jsoncol, '$.info.tags[1]'), JSON_QUERY(jsoncol, '$.info.tags'), JSON_QUERY(jsoncol, '$.info'), " +
- "JSON_VALUE(jsoncol, '$.info.tags[2]') " +
- " FROM " + tableName +
- " WHERE JSON_VALUE(jsoncol, '$.name') = '%s'";
+ "JSON_VALUE(jsoncol, '$.info.tags[1]'), JSON_QUERY(jsoncol, '$.info.tags'), JSON_QUERY(jsoncol, '$.info'), " +
+ "JSON_VALUE(jsoncol, '$.info.tags[2]') " +
+ " FROM " + tableName +
+ " WHERE JSON_VALUE(jsoncol, '$.name') = '%s'";
String query = String.format(queryTemplate, "AndersenFamily");
ResultSet rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
@@ -286,8 +299,8 @@ public class JsonFunctionsIT extends ParallelStatsDisabledIT {
String
selectSql =
"SELECT JSON_VALUE(JSONCOL,'$.type'), " +
- "JSON_VALUE(JSONCOL,'$.info.address.town') FROM " + tableName +
- " WHERE JSON_VALUE(JSONCOL,'$.type') = 'Basic'";
+ "JSON_VALUE(JSONCOL,'$.info.address.town') FROM " + tableName +
+ " WHERE JSON_VALUE(JSONCOL,'$.type') = 'Basic'";
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
IndexToolIT.assertExplainPlan(false, actualExplainPlan, tableName, indexName);
@@ -439,4 +452,186 @@ public class JsonFunctionsIT extends ParallelStatsDisabledIT {
Object read = JsonPath.using(conf).parse(json).read(jsonPath);
return read.toString();
}
+
+ /**
+ * This test case is used to check if the Server Side execution optimization doesn't take place
+ * when we include the complte JSON column. The case for optimization is covered in
+ * {@link #testSimpleJsonValue()}
+ * @throws Exception
+ */
+ @Test
+ public void testJsonFunctionOptimization() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName + " (pk integer primary key, col integer, jsoncol json)";
+ conn.createStatement().execute(ddl);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?)");
+ stmt.setInt(1, 1);
+ stmt.setInt(2, 2);
+ stmt.setString(3, basicJson);
+ stmt.execute();
+ conn.commit();
+ String queryTemplate ="SELECT jsoncol, JSON_VALUE(jsoncol, '$.type'), JSON_VALUE(jsoncol, '$.info.address.town'), " +
+ "JSON_VALUE(jsoncol, '$.info.tags[1]'), JSON_QUERY(jsoncol, '$.info.tags'), JSON_QUERY(jsoncol, '$.info') " +
+ " FROM " + tableName +
+ " WHERE JSON_VALUE(jsoncol, '$.name') = '%s'";
+ String query = String.format(queryTemplate, "AndersenFamily");
+ // check if the explain plan indicates server side execution
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ assertFalse(QueryUtil.getExplainPlan(rs).contains(" SERVER JSON FUNCTION PROJECTION"));
+ }
+ }
+
+ @Test
+ public void testArrayIndexAndJsonFunctionExpressions() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tableName + " (pk integer primary key, col integer, jsoncol json, arr INTEGER ARRAY)";
+ conn.createStatement().execute(ddl);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?,?)");
+ stmt.setInt(1, 1);
+ stmt.setInt(2, 2);
+ stmt.setString(3, basicJson);
+ Array array = conn.createArrayOf("INTEGER", new Integer[]{1, 2});
+ stmt.setArray(4, array);
+ stmt.execute();
+ conn.commit();
+ String query ="SELECT arr, arr[1], jsoncol, JSON_VALUE(jsoncol, '$.type')" +
+ " FROM " + tableName +
+ " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+ // Since we are using complete array and json col, no server side execution
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ String explainPlan = QueryUtil.getExplainPlan(rs);
+ assertFalse(explainPlan.contains(" SERVER JSON FUNCTION PROJECTION"));
+ assertFalse(explainPlan.contains(" SERVER ARRAY ELEMENT PROJECTION"));
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(conn.createArrayOf("INTEGER", new Integer[]{1, 2}), rs.getArray(1));
+ assertEquals(rs.getInt(2), 1);
+ assertEquals(rs.getString(4), "Basic");
+
+ // since we are using Array Index and Json function without full column, optimization
+ // should happen
+ query ="SELECT arr[1], JSON_VALUE(jsoncol, '$.type')" +
+ " FROM " + tableName +
+ " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ explainPlan = QueryUtil.getExplainPlan(rs);
+ assertTrue(explainPlan.contains(" SERVER JSON FUNCTION PROJECTION"));
+ assertTrue(explainPlan.contains(" SERVER ARRAY ELEMENT PROJECTION"));
+
+ // only Array optimization and not Json
+ query ="SELECT arr[1], jsoncol, JSON_VALUE(jsoncol, '$.type')" +
+ " FROM " + tableName +
+ " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ explainPlan = QueryUtil.getExplainPlan(rs);
+ assertFalse(explainPlan.contains(" SERVER JSON FUNCTION PROJECTION"));
+ assertTrue(explainPlan.contains(" SERVER ARRAY ELEMENT PROJECTION"));
+
+ // only Json optimization and not Array Index
+ query ="SELECT arr, arr[1], JSON_VALUE(jsoncol, '$.type')" +
+ " FROM " + tableName +
+ " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ explainPlan = QueryUtil.getExplainPlan(rs);
+ assertTrue(explainPlan.contains(" SERVER JSON FUNCTION PROJECTION"));
+ assertFalse(explainPlan.contains(" SERVER ARRAY ELEMENT PROJECTION"));
+ }
+ }
+
+ @Test
+ public void testServerFunctionsInDifferentOrders() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String
+ ddl =
+ "create table " + tableName + " (pk integer primary key, col integer, jsoncol json, arr INTEGER ARRAY, arr2 INTEGER ARRAY)";
+ conn.createStatement().execute(ddl);
+ PreparedStatement
+ stmt =
+ conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?,?,?)");
+ stmt.setInt(1, 1);
+ stmt.setInt(2, 2);
+ stmt.setString(3, basicJson);
+ Array array = conn.createArrayOf("INTEGER", new Integer[] { 1, 2 });
+ stmt.setArray(4, array);
+ Array array2 = conn.createArrayOf("INTEGER", new Integer[] { 3, 4 });
+ stmt.setArray(5, array2);
+ stmt.execute();
+ conn.commit();
+
+ // Firt Array elements, JSON_VALUE and then JSON_QUERY
+ String
+ query =
+ "SELECT arr, arr[1], arr2, arr2[1], jsoncol, " +
+ "JSON_VALUE(jsoncol, '$.type'), " +
+ "JSON_QUERY(jsoncol, '$.info') " +
+ " FROM " + tableName + " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(rs.getArray(1), conn.createArrayOf("INTEGER", new Integer[] { 1, 2 }));
+ assertEquals(rs.getInt(2), 1);
+ assertEquals(rs.getArray(3), conn.createArrayOf("INTEGER", new Integer[] { 3, 4 }));
+ assertEquals(rs.getInt(4), 3);
+ compareJson(rs.getString(5), basicJson, "$");
+ assertEquals(rs.getString(6), "Basic");
+ compareJson(rs.getString(7), basicJson, "$.info");
+
+ // First JSON_VALUE, JSON_QUERY, ARRAY
+ query =
+ "SELECT jsoncol, JSON_VALUE(jsoncol, '$.type'), " +
+ "JSON_QUERY(jsoncol, '$.info'), arr, arr[1], arr2, arr2[1] " +
+ " FROM " + tableName + " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ compareJson(rs.getString(1), basicJson, "$");
+ assertEquals(rs.getString(2), "Basic");
+ compareJson(rs.getString(3), basicJson, "$.info");
+ assertEquals(rs.getArray(4), conn.createArrayOf("INTEGER", new Integer[] { 1, 2 }));
+ assertEquals(rs.getInt(5), 1);
+ assertEquals(rs.getArray(6), conn.createArrayOf("INTEGER", new Integer[] { 3, 4 }));
+ assertEquals(rs.getInt(7), 3);
+
+ // First JSON_QUERY, ARRAY, JSON_VALUE
+ query =
+ "SELECT JSON_QUERY(jsoncol, '$.info'), arr, arr[1], arr2, arr2[1], jsoncol, " +
+ "JSON_VALUE(jsoncol, '$.type') " +
+ " FROM " + tableName + " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ compareJson(rs.getString(1), basicJson, "$.info");
+ assertEquals(rs.getArray(2), conn.createArrayOf("INTEGER", new Integer[] { 1, 2 }));
+ assertEquals(rs.getInt(3), 1);
+ assertEquals(rs.getArray(4), conn.createArrayOf("INTEGER", new Integer[] { 3, 4 }));
+ assertEquals(rs.getInt(5), 3);
+ compareJson(rs.getString(6), basicJson, "$");
+ assertEquals(rs.getString(7), "Basic");
+
+ //JUMBLED FUNCTIONS
+ query =
+ "SELECT JSON_QUERY(jsoncol, '$.info.tags'), " +
+ "JSON_VALUE(jsoncol, '$.info.address.town'), arr, arr[1], " +
+ "JSON_QUERY(jsoncol, '$.info'), arr2, " +
+ "JSON_VALUE(jsoncol, '$.info.tags[0]'), arr2[1], jsoncol, " +
+ "JSON_VALUE(jsoncol, '$.type') " +
+ " FROM " + tableName + " WHERE JSON_VALUE(jsoncol, '$.name') = 'AndersenFamily'";
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ compareJson(rs.getString(1), basicJson, "$.info.tags");
+ assertEquals(rs.getString(2),"Bristol");
+ assertEquals(rs.getArray(3), conn.createArrayOf("INTEGER", new Integer[] { 1, 2 }));
+ assertEquals(rs.getInt(4), 1);
+ compareJson(rs.getString(5), basicJson, "$.info");
+ assertEquals(rs.getArray(6), conn.createArrayOf("INTEGER", new Integer[] { 3, 4 }));
+ assertEquals(rs.getString(7), "Sport");
+ assertEquals(rs.getInt(8), 3);
+ compareJson(rs.getString(9), basicJson, "$");
+ assertEquals(rs.getString(10), "Basic");
+
+ }
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index 9e6b90cded..0db4025e1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -47,6 +47,8 @@ import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.expression.function.ArrayIndexFunction;
+import org.apache.phoenix.expression.function.JsonQueryFunction;
+import org.apache.phoenix.expression.function.JsonValueFunction;
import org.apache.phoenix.expression.visitor.ExpressionVisitor;
import org.apache.phoenix.expression.visitor.ProjectedColumnExpressionVisitor;
import org.apache.phoenix.expression.visitor.ReplaceArrayFunctionExpressionVisitor;
@@ -87,6 +89,7 @@ import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PJson;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -391,16 +394,18 @@ public class ProjectionCompiler {
public static RowProjector compile(StatementContext context, SelectStatement statement,
GroupBy groupBy, List<? extends PDatum> targetColumns, Expression where,
boolean wildcardIncludesDynamicCols) throws SQLException {
- List<KeyValueColumnExpression> arrayKVRefs = new ArrayList<>();
- List<ProjectedColumnExpression> arrayProjectedColumnRefs = new ArrayList<>();
- List<Expression> arrayKVFuncs = new ArrayList<>();
- List<Expression> arrayOldFuncs = new ArrayList<>();
- Map<Expression, Integer> arrayExpressionCounts = new HashMap<>();
+ List<KeyValueColumnExpression> serverParsedKVRefs = new ArrayList<>();
+ List<ProjectedColumnExpression> serverParsedProjectedColumnRefs = new ArrayList<>();
+ List<Expression> serverParsedKVFuncs = new ArrayList<>();
+ List<Expression> serverParsedOldFuncs = new ArrayList<>();
+ Map<Expression, Integer> serverParsedExpressionCounts = new HashMap<>();
List<AliasedNode> aliasedNodes = statement.getSelect();
// Setup projected columns in Scan
- SelectClauseVisitor selectVisitor = new SelectClauseVisitor(context, groupBy, arrayKVRefs,
- arrayKVFuncs, arrayExpressionCounts, arrayProjectedColumnRefs, arrayOldFuncs,
- statement);
+ SelectClauseVisitor
+ selectVisitor =
+ new SelectClauseVisitor(context, groupBy, serverParsedKVRefs, serverParsedKVFuncs,
+ serverParsedExpressionCounts, serverParsedProjectedColumnRefs,
+ serverParsedOldFuncs, statement);
List<ExpressionProjector> projectedColumns = new ArrayList<>();
ColumnResolver resolver = context.getResolver();
TableRef tableRef = context.getCurrentTable();
@@ -497,42 +502,106 @@ public class ProjectionCompiler {
index++;
}
- for (int i = arrayProjectedColumnRefs.size() - 1; i >= 0; i--) {
- Expression expression = arrayProjectedColumnRefs.get(i);
- Integer count = arrayExpressionCounts.get(expression);
+ for (int i = serverParsedProjectedColumnRefs.size() - 1; i >= 0; i--) {
+ Expression expression = serverParsedProjectedColumnRefs.get(i);
+ Integer count = serverParsedExpressionCounts.get(expression);
if (count != 0) {
- arrayKVRefs.remove(i);
- arrayKVFuncs.remove(i);
- arrayOldFuncs.remove(i);
+ serverParsedKVRefs.remove(i);
+ serverParsedKVFuncs.remove(i);
+ serverParsedOldFuncs.remove(i);
}
}
- if (arrayKVFuncs.size() > 0 && arrayKVRefs.size() > 0) {
- serailizeArrayIndexInformationAndSetInScan(context, arrayKVFuncs, arrayKVRefs);
+ if (serverParsedKVFuncs.size() > 0 && serverParsedKVRefs.size() > 0) {
+ String[]
+ scanAttributes =
+ new String[] { BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX,
+ BaseScannerRegionObserver.JSON_VALUE_FUNCTION,
+ BaseScannerRegionObserver.JSON_QUERY_FUNCTION };
+ Map<String, Class> attributeToFunctionMap = new HashMap<String, Class>() {{
+ put(scanAttributes[0], ArrayIndexFunction.class);
+ put(scanAttributes[1], JsonValueFunction.class);
+ put(scanAttributes[2], JsonQueryFunction.class);
+ }};
+ // This map is to keep track of the positions that get swapped with rearranging
+ // the functions in the serialized data to server.
+ Map<Integer, Integer> initialToShuffledPositionMap = new HashMap<>();
+ Map<String, List<Expression>>
+ serverAttributeToFuncExpressionMap =
+ new HashMap<String, List<Expression>>() {{
+ for (String attribute : attributeToFunctionMap.keySet()) {
+ put(attribute, new ArrayList<>());
+ }
+ }};
+ Map<String, List<KeyValueColumnExpression>>
+ serverAttributeToKVExpressionMap =
+ new HashMap<String, List<KeyValueColumnExpression>>() {{
+ for (String attribute : attributeToFunctionMap.keySet()) {
+ put(attribute, new ArrayList<>());
+ }
+ }};
+ int counter = 0;
+ for (String attribute : scanAttributes) {
+ for (int i = 0; i < serverParsedKVFuncs.size(); i++) {
+ if (attributeToFunctionMap.get(attribute)
+ .isInstance(serverParsedKVFuncs.get(i))) {
+ initialToShuffledPositionMap.put(i, counter++);
+ serverAttributeToFuncExpressionMap.get(attribute)
+ .add(serverParsedKVFuncs.get(i));
+ serverAttributeToKVExpressionMap.get(attribute)
+ .add(serverParsedKVRefs.get(i));
+ }
+ }
+ }
+ for (Map.Entry<String, Class> entry : attributeToFunctionMap.entrySet()) {
+ if (serverAttributeToFuncExpressionMap.get(entry.getKey()).size() > 0) {
+ serializeServerParsedExpressionInformationAndSetInScan(context, entry.getKey(),
+ serverAttributeToFuncExpressionMap.get(entry.getKey()),
+ serverAttributeToKVExpressionMap.get(entry.getKey()));
+ }
+ }
KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
- for (Expression expression : arrayKVRefs) {
+ for (Expression expression : serverParsedKVRefs) {
builder.addField(expression);
}
KeyValueSchema kvSchema = builder.build();
ValueBitSet arrayIndexesBitSet = ValueBitSet.newInstance(kvSchema);
builder = new KeyValueSchemaBuilder(0);
- for (Expression expression : arrayKVFuncs) {
+ for (Expression expression : serverParsedKVFuncs) {
builder.addField(expression);
}
KeyValueSchema arrayIndexesSchema = builder.build();
Map<Expression, Expression> replacementMap = new HashMap<>();
- for(int i = 0; i < arrayOldFuncs.size(); i++){
- Expression function =arrayKVFuncs.get(i);
- replacementMap.put(arrayOldFuncs.get(i), new ArrayIndexExpression(i, function.getDataType(), arrayIndexesBitSet, arrayIndexesSchema));
+ for (int i = 0; i < serverParsedOldFuncs.size(); i++) {
+ Expression function = serverParsedKVFuncs.get(i);
+ if (isJsonFunction(function)) {
+ replacementMap.put(serverParsedOldFuncs.get(i),
+ new JsonPathExpression(initialToShuffledPositionMap.get(i),
+ function.getDataType(), arrayIndexesBitSet,
+ arrayIndexesSchema));
+ } else {
+ replacementMap.put(serverParsedOldFuncs.get(i),
+ new ArrayIndexExpression(initialToShuffledPositionMap.get(i),
+ function.getDataType(), arrayIndexesBitSet,
+ arrayIndexesSchema));
+ }
}
- ReplaceArrayFunctionExpressionVisitor visitor = new ReplaceArrayFunctionExpressionVisitor(replacementMap);
+ ReplaceArrayFunctionExpressionVisitor
+ visitor =
+ new ReplaceArrayFunctionExpressionVisitor(replacementMap);
for (int i = 0; i < projectedColumns.size(); i++) {
ExpressionProjector projector = projectedColumns.get(i);
- projectedColumns.set(i, new ExpressionProjector(projector.getName(),
- projector.getLabel(),
- tableRef.getTableAlias() == null ? (table.getName() == null ? "" : table.getName().getString()) : tableRef.getTableAlias(), projector.getExpression().accept(visitor), projector.isCaseSensitive()));
+ projectedColumns.set(i,
+ new ExpressionProjector(projector.getName(), projector.getLabel(),
+ tableRef.getTableAlias() == null ?
+ (table.getName() == null ?
+ "" :
+ table.getName().getString()) :
+ tableRef.getTableAlias(),
+ projector.getExpression().accept(visitor),
+ projector.isCaseSensitive()));
}
}
@@ -641,19 +710,32 @@ public class ProjectionCompiler {
return null;
}
}
- private static void serailizeArrayIndexInformationAndSetInScan(StatementContext context, List<Expression> arrayKVFuncs,
- List<KeyValueColumnExpression> arrayKVRefs) {
+
+ static class JsonPathExpression extends ArrayIndexExpression {
+
+ public JsonPathExpression(int position, PDataType type, ValueBitSet arrayIndexesBitSet,
+ KeyValueSchema arrayIndexesSchema) {
+ super(position, type, arrayIndexesBitSet, arrayIndexesSchema);
+ }
+ }
+
+ private static void serializeServerParsedExpressionInformationAndSetInScan(
+ StatementContext context, String serverParsedExpressionAttribute,
+ List<Expression> serverParsedKVFuncs,
+ List<KeyValueColumnExpression> serverParsedKVRefs) {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
DataOutputStream output = new DataOutputStream(stream);
- // Write the arrayKVRef size followed by the keyvalues that needs to be of type arrayindex function
- WritableUtils.writeVInt(output, arrayKVRefs.size());
- for (Expression expression : arrayKVRefs) {
+ // Write the KVRef size followed by the keyvalues that needs to be of
+ // type arrayindex or json function based on serverParsedExpressionAttribute
+ WritableUtils.writeVInt(output, serverParsedKVRefs.size());
+ for (Expression expression : serverParsedKVRefs) {
expression.write(output);
}
- // then write the number of arrayindex functions followeed by the expression itself
- WritableUtils.writeVInt(output, arrayKVFuncs.size());
- for (Expression expression : arrayKVFuncs) {
+ // then write the number of arrayindex or json functions followed
+ // by the expression itself
+ WritableUtils.writeVInt(output, serverParsedKVFuncs.size());
+ for (Expression expression : serverParsedKVFuncs) {
expression.write(output);
}
@@ -666,7 +748,7 @@ public class ProjectionCompiler {
throw new RuntimeException(e);
}
}
- context.getScan().setAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX, stream.toByteArray());
+ context.getScan().setAttribute(serverParsedExpressionAttribute, stream.toByteArray());
}
private static class SelectClauseVisitor extends ExpressionCompiler {
@@ -677,21 +759,27 @@ public class ProjectionCompiler {
*/
private boolean isCaseSensitive;
private int elementCount;
- private List<KeyValueColumnExpression> arrayKVRefs;
- private List<Expression> arrayKVFuncs;
- private List<Expression> arrayOldFuncs;
- private List<ProjectedColumnExpression> arrayProjectedColumnRefs;
- private Map<Expression, Integer> arrayExpressionCounts;
- private SelectStatement statement;
-
- private SelectClauseVisitor(StatementContext context, GroupBy groupBy,
- List<KeyValueColumnExpression> arrayKVRefs, List<Expression> arrayKVFuncs, Map<Expression, Integer> arrayExpressionCounts, List<ProjectedColumnExpression> arrayProjectedColumnRefs, List<Expression> arrayOldFuncs, SelectStatement statement) {
+ // Looks at PHOENIX-2160 for the context and use of the below variables.
+ // These are used for reference counting and converting to KeyValueColumnExpressions
+ private List<KeyValueColumnExpression> serverParsedKVRefs;
+ private List<Expression> serverParsedKVFuncs;
+ private List<Expression> serverParsedOldFuncs;
+ private List<ProjectedColumnExpression> serverParsedProjectedColumnRefs;
+ private Map<Expression, Integer> serverParsedExpressionCounts;
+ private SelectStatement statement;
+
+ private SelectClauseVisitor(StatementContext context, GroupBy groupBy,
+ List<KeyValueColumnExpression> serverParsedKVRefs,
+ List<Expression> serverParsedKVFuncs,
+ Map<Expression, Integer> serverParsedExpressionCounts,
+ List<ProjectedColumnExpression> serverParsedProjectedColumnRefs,
+ List<Expression> serverParsedOldFuncs, SelectStatement statement) {
super(context, groupBy);
- this.arrayKVRefs = arrayKVRefs;
- this.arrayKVFuncs = arrayKVFuncs;
- this.arrayOldFuncs = arrayOldFuncs;
- this.arrayExpressionCounts = arrayExpressionCounts;
- this.arrayProjectedColumnRefs = arrayProjectedColumnRefs;
+ this.serverParsedKVRefs = serverParsedKVRefs;
+ this.serverParsedKVFuncs = serverParsedKVFuncs;
+ this.serverParsedOldFuncs = serverParsedOldFuncs;
+ this.serverParsedExpressionCounts = serverParsedExpressionCounts;
+ this.serverParsedProjectedColumnRefs = serverParsedProjectedColumnRefs;
this.statement = statement;
reset();
}
@@ -713,13 +801,18 @@ public class ProjectionCompiler {
@Override
public Expression visit(ColumnParseNode node) throws SQLException {
Expression expression = super.visit(node);
- if (expression.getDataType().isArrayType()) {
- Integer count = arrayExpressionCounts.get(expression);
- arrayExpressionCounts.put(expression, count != null ? (count + 1) : 1);
+ if (parseOnServer(expression)) {
+ Integer count = serverParsedExpressionCounts.get(expression);
+ serverParsedExpressionCounts.put(expression, count != null ? (count + 1) : 1);
}
return expression;
}
-
+
+ private static boolean parseOnServer(Expression expression) {
+ return expression.getDataType().isArrayType() || expression.getDataType()
+ .equals(PJson.INSTANCE);
+ }
+
@Override
public void addElement(List<Expression> l, Expression element) {
elementCount++;
@@ -740,57 +833,70 @@ public class ProjectionCompiler {
@Override
public Expression visitLeave(FunctionParseNode node, final List<Expression> children) throws SQLException {
- // this need not be done for group by clause with array. Hence the below check
- if (!statement.isAggregate() && ArrayIndexFunction.NAME.equals(node.getName()) && children.get(0) instanceof ProjectedColumnExpression) {
- final List<KeyValueColumnExpression> indexKVs = Lists.newArrayList();
- final List<ProjectedColumnExpression> indexProjectedColumns = Lists.newArrayList();
- final List<Expression> copyOfChildren = new ArrayList<>(children);
- // Create anon visitor to find reference to array in a generic way
- children.get(0).accept(new ProjectedColumnExpressionVisitor() {
- @Override
- public Void visit(ProjectedColumnExpression expression) {
- if (expression.getDataType().isArrayType()) {
- indexProjectedColumns.add(expression);
- PColumn col = expression.getColumn();
- // hack'ish... For covered columns with local indexes we defer to the server.
- if (col instanceof ProjectedColumn && ((ProjectedColumn) col)
- .getSourceColumnRef() instanceof IndexUncoveredDataColumnRef) {
- return null;
- }
- PTable table = context.getCurrentTable().getTable();
- KeyValueColumnExpression keyValueColumnExpression;
- if (table.getImmutableStorageScheme() != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+ // this need not be done for group by clause with array or json. Hence, the below check
+ if (!statement.isAggregate() && (ArrayIndexFunction.NAME.equals(
+ node.getName()) || isJsonFunction(node)) &&
+ children.get(0) instanceof ProjectedColumnExpression) {
+ final List<KeyValueColumnExpression> indexKVs = Lists.newArrayList();
+ final List<ProjectedColumnExpression> indexProjectedColumns = Lists.newArrayList();
+ final List<Expression> copyOfChildren = new ArrayList<>(children);
+ // Create anon visitor to find reference to array or json in a generic way
+ children.get(0).accept(new ProjectedColumnExpressionVisitor() {
+ @Override
+ public Void visit(ProjectedColumnExpression expression) {
+ if (expression.getDataType().isArrayType() || expression.getDataType()
+ .equals(PJson.INSTANCE)) {
+ indexProjectedColumns.add(expression);
+ PColumn col = expression.getColumn();
+ // hack'ish... For covered columns with local indexes we defer to the server.
+ if (col instanceof ProjectedColumn && ((ProjectedColumn) col).getSourceColumnRef() instanceof IndexUncoveredDataColumnRef) {
+ return null;
+ }
+ PTable table = context.getCurrentTable().getTable();
+ KeyValueColumnExpression keyValueColumnExpression;
+ if (table.getImmutableStorageScheme() != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
keyValueColumnExpression =
new SingleCellColumnExpression(col,
col.getName().getString(),
table.getEncodingScheme(),
table.getImmutableStorageScheme());
- } else {
- keyValueColumnExpression = new KeyValueColumnExpression(col);
- }
- indexKVs.add(keyValueColumnExpression);
- copyOfChildren.set(0, keyValueColumnExpression);
- Integer count = arrayExpressionCounts.get(expression);
- arrayExpressionCounts.put(expression, count != null ? (count - 1) : -1);
- }
- return null;
- }
- });
-
- Expression func = super.visitLeave(node,children);
- // Add the keyvalues which is of type array
- if (!indexKVs.isEmpty()) {
- arrayKVRefs.addAll(indexKVs);
- arrayProjectedColumnRefs.addAll(indexProjectedColumns);
+ } else {
+ keyValueColumnExpression = new KeyValueColumnExpression(col);
+ }
+ indexKVs.add(keyValueColumnExpression);
+ copyOfChildren.set(0, keyValueColumnExpression);
+ Integer count = serverParsedExpressionCounts.get(expression);
+ serverParsedExpressionCounts.put(expression,
+ count != null ? (count - 1) : -1);
+ }
+ return null;
+ }
+ });
+
+ Expression func = super.visitLeave(node, children);
+ // Add the keyvalues which is of type array or json
+ if (!indexKVs.isEmpty()) {
+ serverParsedKVRefs.addAll(indexKVs);
+ serverParsedProjectedColumnRefs.addAll(indexProjectedColumns);
Expression funcModified = super.visitLeave(node, copyOfChildren);
- // Track the array index function also
- arrayKVFuncs.add(funcModified);
- arrayOldFuncs.add(func);
+ // Track the array index or json function also
+ serverParsedKVFuncs.add(funcModified);
+ serverParsedOldFuncs.add(func);
}
return func;
} else {
- return super.visitLeave(node,children);
+ return super.visitLeave(node, children);
}
}
}
+
+ private static boolean isJsonFunction(FunctionParseNode node) {
+ return JsonValueFunction.NAME.equals(node.getName()) || JsonQueryFunction.NAME.equals(
+ node.getName());
+ }
+
+ private static boolean isJsonFunction(Expression function) {
+ return JsonValueFunction.class.isInstance(function) || JsonQueryFunction.class.isInstance(
+ function);
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 7493acceac..23ae0dbf8e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -106,6 +106,8 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
public static final String INDEX_LIMIT = "_IndexLimit";
public static final String INDEX_FILTER_STR = "_IndexFilterStr";
+ public static final String JSON_VALUE_FUNCTION = "_JsonValueFunction";
+ public static final String JSON_QUERY_FUNCTION = "_JsonQueryFunction";
/*
* Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
* Needed for backward compatibility purposes. TODO: get rid of this in next major release.
@@ -232,7 +234,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTable().getNameAsString());
throw new DoNotRetryIOException(cause.getMessage(), cause);
}
- if(isLocalIndex) {
+ if (isLocalIndex) {
ScanUtil.setupLocalIndexScan(scan);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 494226327b..83616ed3e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -297,6 +297,10 @@ public abstract class ExplainTable {
explainPlanAttributesBuilder.setServerArrayElementProjection(true);
}
}
+ if (scan.getAttribute(BaseScannerRegionObserver.JSON_VALUE_FUNCTION) != null
+ || scan.getAttribute(BaseScannerRegionObserver.JSON_QUERY_FUNCTION) != null) {
+ planSteps.add(" SERVER JSON FUNCTION PROJECTION");
+ }
}
/**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 325e242f31..8094b22e9c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -28,6 +28,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -52,6 +53,9 @@ import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.expression.function.ArrayIndexFunction;
+import org.apache.phoenix.expression.function.JsonQueryFunction;
+import org.apache.phoenix.expression.function.JsonValueFunction;
+import org.apache.phoenix.expression.function.ScalarFunction;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.VersionUtil;
@@ -109,13 +113,16 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
- Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet();
+ Set<KeyValueColumnExpression> serverParsedKVRefs = Sets.newHashSet();
KeyValueSchema kvSchema = null;
ValueBitSet kvSchemaBitSet = null;
- Expression[] arrayFuncRefs = deserializeArrayPositionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs);
- if (arrayFuncRefs != null) {
- KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
- for (Expression expression : arrayFuncRefs) {
+ List<Expression> resultList = getServerParsedExpressions(scan, serverParsedKVRefs);
+ Expression[] serverParsedFuncRefs = resultList.toArray(new Expression[0]);
+ if (serverParsedFuncRefs != null && serverParsedFuncRefs.length > 0) {
+ KeyValueSchema.KeyValueSchemaBuilder
+ builder =
+ new KeyValueSchema.KeyValueSchemaBuilder(0);
+ for (Expression expression : serverParsedFuncRefs) {
builder.addField(expression);
}
kvSchema = builder.build();
@@ -150,21 +157,26 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
env.getConfiguration().get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY) != null) {
dataRegion = env.getRegion();
}
- innerScanner = getWrappedScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, dataColumns,
- tupleProjector, dataRegion, indexMaintainer, tx, viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null,
- ptr, useQualifierAsIndex);
+ innerScanner =
+ getWrappedScanner(env, innerScanner, serverParsedKVRefs, serverParsedFuncRefs,
+ offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainer, tx,
+ viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr,
+ useQualifierAsIndex);
final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
if (j != null) {
- innerScanner = new HashJoinRegionScanner(env, innerScanner, scan, arrayKVRefs, arrayFuncRefs,
- p, j, tenantId, useQualifierAsIndex,
- useNewValueColumnQualifier);
+ innerScanner =
+ new HashJoinRegionScanner(env, innerScanner, scan, serverParsedKVRefs,
+ serverParsedFuncRefs, p, j, tenantId, useQualifierAsIndex,
+ useNewValueColumnQualifier);
}
if (scanOffset != null) {
- innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator(
- new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme),
- scanOffset, getPageSizeMsForRegionScanner(scan)),
- scan.getAttribute(QueryConstants.LAST_SCAN) != null);
+ innerScanner =
+ getOffsetScanner(innerScanner, new OffsetResultIterator(
+ new RegionScannerResultIterator(innerScanner,
+ getMinMaxQualifiersFromScan(scan), encodingScheme), scanOffset,
+ getPageSizeMsForRegionScanner(scan)),
+ scan.getAttribute(QueryConstants.LAST_SCAN) != null);
}
boolean spoolingEnabled =
env.getConfiguration().getBoolean(
@@ -183,6 +195,39 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
return getTopNScanner(env, innerScanner, iterator, tenantId);
}
+ private List<Expression> getServerParsedExpressions(Scan scan,
+ Set<KeyValueColumnExpression> serverParsedKVRefs) {
+ Expression[] serverParsedArrayFuncRefs = null;
+ if (scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX) != null) {
+ serverParsedArrayFuncRefs =
+ deserializeServerParsedPositionalExpressionInfoFromScan(scan,
+ BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX, serverParsedKVRefs);
+ }
+ List<Expression> resultList = new ArrayList<>();
+ if (serverParsedArrayFuncRefs != null) {
+ Collections.addAll(resultList, serverParsedArrayFuncRefs);
+ }
+ Expression[] serverParsedJsonValueFuncRefs = null;
+ if (scan.getAttribute(BaseScannerRegionObserver.JSON_VALUE_FUNCTION) != null) {
+ serverParsedJsonValueFuncRefs =
+ deserializeServerParsedPositionalExpressionInfoFromScan(scan,
+ BaseScannerRegionObserver.JSON_VALUE_FUNCTION, serverParsedKVRefs);
+ }
+ if (serverParsedJsonValueFuncRefs != null) {
+ Collections.addAll(resultList, serverParsedJsonValueFuncRefs);
+ }
+ Expression[] serverParsedJsonQueryFuncRefs = null;
+ if (scan.getAttribute(BaseScannerRegionObserver.JSON_QUERY_FUNCTION) != null) {
+ serverParsedJsonQueryFuncRefs =
+ deserializeServerParsedPositionalExpressionInfoFromScan(scan,
+ BaseScannerRegionObserver.JSON_QUERY_FUNCTION, serverParsedKVRefs);
+ }
+ if (serverParsedJsonQueryFuncRefs != null) {
+ Collections.addAll(resultList, serverParsedJsonQueryFuncRefs);
+ }
+ return resultList;
+ }
+
@VisibleForTesting
static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s,
boolean spoolingEnabled, long thresholdBytes) {
@@ -232,31 +277,40 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
}
}
- private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan scan, RegionScanner s,
- Set<KeyValueColumnExpression> arrayKVRefs) {
- byte[] specificArrayIdx = scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX);
+ private Expression[] deserializeServerParsedPositionalExpressionInfoFromScan(Scan scan,
+ String scanAttribute, Set<KeyValueColumnExpression> serverParsedKVRefs) {
+ byte[] specificArrayIdx = scan.getAttribute(scanAttribute);
if (specificArrayIdx == null) {
return null;
}
ByteArrayInputStream stream = new ByteArrayInputStream(specificArrayIdx);
try {
DataInputStream input = new DataInputStream(stream);
- int arrayKVRefSize = WritableUtils.readVInt(input);
- for (int i = 0; i < arrayKVRefSize; i++) {
+ int kvRefSize = WritableUtils.readVInt(input);
+ for (int i = 0; i < kvRefSize; i++) {
PTable.ImmutableStorageScheme scheme = EncodedColumnsUtil.getImmutableStorageScheme(scan);
KeyValueColumnExpression kvExp = scheme != PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN ? new SingleCellColumnExpression(scheme)
: new KeyValueColumnExpression();
kvExp.readFields(input);
- arrayKVRefs.add(kvExp);
+ serverParsedKVRefs.add(kvExp);
}
- int arrayKVFuncSize = WritableUtils.readVInt(input);
- Expression[] arrayFuncRefs = new Expression[arrayKVFuncSize];
- for (int i = 0; i < arrayKVFuncSize; i++) {
- ArrayIndexFunction arrayIdxFunc = new ArrayIndexFunction();
- arrayIdxFunc.readFields(input);
- arrayFuncRefs[i] = arrayIdxFunc;
+ int kvFuncSize = WritableUtils.readVInt(input);
+ Expression[] funcRefs = new Expression[kvFuncSize];
+ for (int i = 0; i < kvFuncSize; i++) {
+ ScalarFunction func = null;
+ if (scanAttribute.equals(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX)) {
+ func = new ArrayIndexFunction();
+ } else if (scanAttribute.equals(BaseScannerRegionObserver.JSON_VALUE_FUNCTION)) {
+ func = new JsonValueFunction();
+ } else if (scanAttribute.equals(BaseScannerRegionObserver.JSON_QUERY_FUNCTION)) {
+ func = new JsonQueryFunction();
+ }
+ if (func != null) {
+ func.readFields(input);
+ funcRefs[i] = func;
+ }
}
- return arrayFuncRefs;
+ return funcRefs;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 59fb21f522..7ef507eeca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -104,8 +104,8 @@ public abstract class RegionScannerFactory {
* re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
* for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
* the same from a custom filter.
- * @param arrayKVRefs
- * @param arrayFuncRefs
+ * @param serverParsedKVRefs
+ * @param serverParsedFuncRefs
* @param offset starting position in the rowkey.
* @param scan
* @param tupleProjector
@@ -115,8 +115,9 @@ public abstract class RegionScannerFactory {
* @param viewConstants
*/
public RegionScanner getWrappedScanner(final RegionCoprocessorEnvironment env,
- final RegionScanner regionScanner, final Set<KeyValueColumnExpression> arrayKVRefs,
- final Expression[] arrayFuncRefs, final int offset, final Scan scan,
+ final RegionScanner regionScanner, final Set<KeyValueColumnExpression> serverParsedKVRefs,
+ final Expression[] serverParsedFuncRefs,
+ final int offset, final Scan scan,
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
final Region dataRegion, final IndexMaintainer indexMaintainer,
PhoenixTransactionContext tx,
@@ -284,10 +285,11 @@ public abstract class RegionScannerFactory {
}
}
}
- Cell arrayElementCell = null;
- if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
- int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
- arrayElementCell = result.get(arrayElementCellPosition);
+ Cell serverParsedResultCell = null;
+ if (serverParsedFuncRefs != null && serverParsedFuncRefs.length > 0 && serverParsedKVRefs.size() > 0) {
+ int resultPosition = replaceServerParsedExpressionElement(serverParsedKVRefs,
+ serverParsedFuncRefs, result);
+ serverParsedResultCell = result.get(resultPosition);
}
if (projector != null) {
Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) :
@@ -301,8 +303,8 @@ public abstract class RegionScannerFactory {
result.clear();
result.add(tupleWithDynColsIfReqd.mergeWithDynColsListBytesAndGetValue(0,
serializedDynColsList));
- if (arrayElementCell != null) {
- result.add(arrayElementCell);
+ if (serverParsedResultCell != null) {
+ result.add(serverParsedResultCell);
}
}
if (extraLimit >= 0 && --extraLimit == 0) {
@@ -413,38 +415,40 @@ public abstract class RegionScannerFactory {
return next;
}
- private int replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs,
- final Expression[] arrayFuncRefs, List<Cell> result) {
+ private int replaceServerParsedExpressionElement(
+ final Set<KeyValueColumnExpression> serverParsedKVRefs,
+ final Expression[] serverParsedFuncRefs, List<Cell> result) {
// make a copy of the results array here, as we're modifying it below
MultiKeyValueTuple tuple = new MultiKeyValueTuple(ImmutableList.copyOf(result));
// The size of both the arrays would be same?
// Using KeyValueSchema to set and retrieve the value
// collect the first kv to get the row
Cell rowKv = result.get(0);
- for (KeyValueColumnExpression kvExp : arrayKVRefs) {
+ for (KeyValueColumnExpression kvExp : serverParsedKVRefs) {
if (kvExp.evaluate(tuple, ptr)) {
ListIterator<Cell> itr = result.listIterator();
while (itr.hasNext()) {
Cell kv = itr.next();
if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length,
- kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength())
- && Bytes.equals(kvExp.getColumnQualifier(), 0, kvExp.getColumnQualifier().length,
- kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) {
- // remove the kv that has the full array values.
+ kv.getFamilyArray(), kv.getFamilyOffset(),
+ kv.getFamilyLength()) && Bytes.equals(kvExp.getColumnQualifier(), 0,
+ kvExp.getColumnQualifier().length, kv.getQualifierArray(),
+ kv.getQualifierOffset(), kv.getQualifierLength())) {
+ // remove the kv that has the full array/json values.
itr.remove();
break;
}
}
}
}
- byte[] value = kvSchema.toBytes(tuple, arrayFuncRefs,
- kvSchemaBitSet, ptr);
- // Add a dummy kv with the exact value of the array index
+ byte[] value = kvSchema.toBytes(tuple, serverParsedFuncRefs, kvSchemaBitSet, ptr);
+ // Add a dummy kv with the exact value of the array index or json value
result.add(new KeyValue(rowKv.getRowArray(), rowKv.getRowOffset(), rowKv.getRowLength(),
- QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0, QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length,
- QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,
- QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP,
- KeyValue.Type.codeToType(rowKv.getType().getCode()), value, 0, value.length));
+ QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0,
+ QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length,
+ QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,
+ QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP,
+ KeyValue.Type.codeToType(rowKv.getType().getCode()), value, 0, value.length));
return getArrayCellPosition(result);
}