You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by od...@apache.org on 2016/11/22 23:37:33 UTC
[2/2] incubator-hawq git commit: HAWQ-1115. Implement filter-push
down for IN on PXF service side.
HAWQ-1115. Implement filter-push down for IN on PXF service side.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/3988fa9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/3988fa9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/3988fa9d
Branch: refs/heads/HAWQ-1114_
Commit: 3988fa9d650bfce07c9951abfe52f093f148b3ee
Parents: f536900
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Tue Nov 22 15:37:15 2016 -0800
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Tue Nov 22 15:37:15 2016 -0800
----------------------------------------------------------------------
.../org/apache/hawq/pxf/api/FilterParser.java | 56 ++++++++++++++++++--
.../org/apache/hawq/pxf/api/io/DataType.java | 19 +++++++
.../apache/hawq/pxf/api/FilterParserTest.java | 22 ++++++++
.../hawq/pxf/plugins/hive/HiveORCAccessor.java | 9 ++++
.../pxf/plugins/hive/HiveORCAccessorTest.java | 15 ++++++
.../plugins/hive/HiveORCSearchArgumentTest.java | 42 ++++++++++++---
6 files changed, 151 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java
index e362eed..ff0d972 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java
@@ -25,6 +25,8 @@ import org.apache.hawq.pxf.api.io.DataType;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Stack;
/**
@@ -65,11 +67,13 @@ public class FilterParser {
private Stack<Object> operandsStack;
private FilterBuilder filterBuilder;
public static final char COL_OP = 'a';
- public static final char CONST_OP = 'c';
+ public static final char SCALAR_CONST_OP = 'c';
+ public static final char LIST_CONST_OP = 'm';
public static final char CONST_LEN = 's';
public static final char CONST_DATA = 'd';
public static final char COMP_OP = 'o';
public static final char LOG_OP = 'l';
+
public static final String DEFAULT_CHARSET = "UTF-8";
/** Supported operations by the parser. */
@@ -83,7 +87,8 @@ public class FilterParser {
HDOP_NE,
HDOP_LIKE,
HDOP_IS_NULL,
- HDOP_IS_NOT_NULL
+ HDOP_IS_NOT_NULL,
+ HDOP_IN
}
/**
@@ -213,8 +218,11 @@ public class FilterParser {
case COL_OP:
operandsStack.push(new ColumnIndex(safeToInt(parseNumber())));
break;
- case CONST_OP:
- operandsStack.push(new Constant(parseParameter()));
+ case SCALAR_CONST_OP:
+ operandsStack.push(new Constant(parseScalarParameter()));
+ break;
+ case LIST_CONST_OP:
+ operandsStack.push(new Constant(parseListParameter()));
break;
case COMP_OP:
opNumber = safeToInt(parseNumber());
@@ -354,6 +362,10 @@ public class FilterParser {
}
private Object convertDataType(byte[] byteData, int start, int end, DataType dataType) throws Exception {
+
+ if (byteData.length < end)
+ throw new FilterStringSyntaxException("filter string is shorter than expected");
+
String data = new String(byteData, start, end-start, DEFAULT_CHARSET);
try {
switch (dataType) {
@@ -391,7 +403,7 @@ public class FilterParser {
/**
* Parses either a number or a string.
*/
- private Object parseParameter() throws Exception {
+ private Object parseScalarParameter() throws Exception {
if (index == filterByteArr.length) {
throw new FilterStringSyntaxException("argument should follow at " + index);
}
@@ -418,6 +430,40 @@ public class FilterParser {
return data;
}
+ private Object parseListParameter() throws Exception {
+ if (index == filterByteArr.length) {
+ throw new FilterStringSyntaxException("argument should follow at " + index);
+ }
+
+ DataType dataType = DataType.get(parseConstDataType());
+ List<Object> data = new ArrayList<Object>();
+ if (dataType == DataType.UNSUPPORTED_TYPE) {
+ throw new FilterStringSyntaxException("invalid DataType OID at " + (index - 1));
+ }
+
+ if (dataType.getTypeElem() == null) {
+ throw new FilterStringSyntaxException("expected non-scalar datatype, but got datatype with oid = " + dataType.getOID());
+ }
+
+ while (((char) filterByteArr[index]) == CONST_LEN) {
+ int dataLength = parseDataLength();
+
+ if (index + dataLength > filterByteArr.length) {
+ throw new FilterStringSyntaxException("data size larger than filter string starting at " + index);
+ }
+
+ if (((char) filterByteArr[index]) != CONST_DATA) {
+ throw new FilterStringSyntaxException("data delimiter 'd' expected at " + index);
+ }
+
+ index++;
+ data.add(convertDataType(filterByteArr, index, index+dataLength, dataType.getTypeElem()));
+ index += dataLength;
+ }
+
+ return data;
+ }
+
private Long parseNumber() throws Exception {
if (index == filterByteArr.length) {
throw new FilterStringSyntaxException("numeric argument expected at " + index);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java
index cac700c..d3db038 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java
@@ -49,17 +49,32 @@ public enum DataType {
TIME(1083),
TIMESTAMP(1114),
NUMERIC(1700),
+
+ INT2ARRAY(1005),
+ INT4ARRAY(1007),
+ INT8ARRAY(1016),
+ BOOLARRAY(1000),
+ TEXTARRAY(1009),
+
UNSUPPORTED_TYPE(-1);
private static final Map<Integer, DataType> lookup = new HashMap<>();
static {
+
+ INT2ARRAY.typeElem = SMALLINT;
+ INT4ARRAY.typeElem = INTEGER;
+ INT8ARRAY.typeElem = BIGINT;
+ BOOLARRAY.typeElem = BOOLEAN;
+ TEXTARRAY.typeElem = TEXT;
+
for (DataType dt : EnumSet.allOf(DataType.class)) {
lookup.put(dt.getOID(), dt);
}
}
private final int OID;
+ private DataType typeElem;
DataType(int OID) {
this.OID = OID;
@@ -81,4 +96,8 @@ public enum DataType {
public int getOID() {
return OID;
}
+
+ public DataType getTypeElem() {
+ return typeElem;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java
index 46f60f1..b754253 100644
--- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java
+++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java
@@ -143,6 +143,24 @@ public class FilterParserTest {
index = 6;
exception = "failed to parse number data type starting at " + index;
runParseNegative("const operand with an invalid value", filter, exception);
+
+ filter = "m1122";
+ index = 4;
+ exception = "invalid DataType OID at " + index;
+ runParseNegative("const operand with an invalid datatype", filter, exception);
+
+ filter = "m20";
+ exception = "expected non-scalar datatype, but got datatype with oid = 20";
+ runParseNegative("const operand with an scalar datatype instead of list", filter, exception);
+
+ filter = "m1007s1d1s1d2s2d3";
+ exception = "filter string is shorter than expected";
+ runParseNegative("const operand with list datatype, and \"d\" tag has less data than indicated in \"s\" tag", filter, exception);
+
+ filter = "m1007s1d1s1d2s2d123";
+ index = 18;
+ exception = "unknown opcode 3(51) at " + index;
+ runParseNegative("const operand with list datatype, and \"d\" tag has more data than indicated in \"s\" tag", filter, exception);
}
@Test
@@ -272,6 +290,10 @@ public class FilterParserTest {
op = Operation.HDOP_IS_NOT_NULL;
runParseOneUnaryOperation("this filter was build from HDOP_IS_NULL", filter, op);
+ filter = "a1m1005s1d1s1d2s1d3o10";
+ op = Operation.HDOP_IN;
+ runParseOneOperation("this filter was built from HDOP_IN", filter, op);
+
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index 9d79f97..dc195f4 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -195,6 +195,15 @@ public class HiveORCAccessor extends HiveAccessor {
case HDOP_IS_NOT_NULL:
builder.startNot().isNull(filterColumnName).end();
break;
+ case HDOP_IN:
+ if (filterValue instanceof List) {
+ @SuppressWarnings("unchecked")
+ List<Object> l = (List<Object>)filterValue;
+ builder.in(filterColumnName, l.toArray());
+ } else {
+ throw new IllegalArgumentException("filterValue should be instace of List for HDOP_IN operation");
+ }
+ break;
default: {
LOG.debug("Filter push-down is not supported for " + filter.getOperation() + "operation.");
return false;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
index c5700b6..7bbe811 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
@@ -103,4 +103,19 @@ public class HiveORCAccessorTest {
assertEquals(sarg.toKryo(), jobConf.get(SARG_PUSHDOWN));
}
+ @Test
+ public void parseFilterWithIn() throws Exception {
+
+ when(inputData.hasFilter()).thenReturn(true);
+ when(inputData.getFilterString()).thenReturn("a1m1007s1d1s1d2s1d3o10");
+ when(columnDesc.columnName()).thenReturn("FOO");
+ when(inputData.getColumn(1)).thenReturn(columnDesc);
+
+ accessor.openForRead();
+
+ SearchArgument sarg = SearchArgumentFactory.newBuilder().startAnd().in("FOO", 1, 2, 3).end().build();
+
+ assertEquals(sarg.toKryo(), jobConf.get(SARG_PUSHDOWN));
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java
index 382f065..74d87e7 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java
@@ -40,14 +40,25 @@ public class HiveORCSearchArgumentTest {
HiveFilterBuilder eval = new HiveFilterBuilder(null);
Object filter = eval.getFilterObject(filterStr);
- Object current = filter;
SearchArgument.Builder filterBuilder = SearchArgumentFactory.newBuilder();
buildExpression(filterBuilder, Arrays.asList(filter));
SearchArgument sarg = filterBuilder.build();
Assert.assertEquals("and(or(lt(col1, 5), not(lteq(col1, 1))), or(lt(col1, 5), lteq(col1, 3)))", sarg.toFilterPredicate().toString());
}
- private void buildExpression(SearchArgument.Builder builder, List<Object> filterList) {
+ @Test
+ public void buildIn() throws Exception {
+ String filterStr = "a0m1009s4drow1s4drow2o10a1m1009s3ds_6s3ds_7o10l0";
+ HiveFilterBuilder eval = new HiveFilterBuilder(null);
+ Object filter = eval.getFilterObject(filterStr);
+
+ SearchArgument.Builder filterBuilder = SearchArgumentFactory.newBuilder();
+ buildExpression(filterBuilder, Arrays.asList(filter));
+ SearchArgument sarg = filterBuilder.build();
+ Assert.assertEquals("and(or(eq(col1, Binary{\"row1\"}), eq(col1, Binary{\"row2\"})), or(eq(col1, Binary{\"s_6\"}), eq(col1, Binary{\"s_7\"})))", sarg.toFilterPredicate().toString());
+ }
+
+ private boolean buildExpression(SearchArgument.Builder builder, List<Object> filterList) {
for (Object f : filterList) {
if (f instanceof LogicalFilter) {
switch(((LogicalFilter) f).getOperator()) {
@@ -61,15 +72,21 @@ public class HiveORCSearchArgumentTest {
builder.startNot();
break;
}
- buildExpression(builder, ((LogicalFilter) f).getFilterList());
- builder.end();
+ if (buildExpression(builder, ((LogicalFilter) f).getFilterList())) {
+ builder.end();
+ } else {
+ return false;
+ }
} else {
- buildArgument(builder, f);
+ if (!buildArgument(builder, f)) {
+ return false;
+ }
}
}
+ return true;
}
- private void buildArgument(SearchArgument.Builder builder, Object filterObj) {
+ private boolean buildArgument(SearchArgument.Builder builder, Object filterObj) {
/* The below functions will not be compatible and requires update with Hive 2.0 APIs */
BasicFilter filter = (BasicFilter) filterObj;
int filterColumnIndex = filter.getColumn().index();
@@ -97,7 +114,18 @@ public class HiveORCSearchArgumentTest {
case HDOP_NE:
builder.startNot().equals(filterColumnName, filterValue).end();
break;
+ case HDOP_IN:
+ if (filterValue instanceof List) {
+ @SuppressWarnings("unchecked")
+ List<Object> l = (List<Object>)filterValue;
+ builder.in(filterColumnName, l.toArray());
+ } else {
+ throw new IllegalArgumentException("filterValue should be instace of List for HDOP_IN operation");
+ }
+ break;
+ default:
+ return false;
}
- return;
+ return true;
}
}