You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by od...@apache.org on 2016/11/22 23:37:33 UTC

[2/2] incubator-hawq git commit: HAWQ-1115. Implement filter-push down for IN on PXF service side.

HAWQ-1115. Implement filter-push down for IN on PXF service side.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/3988fa9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/3988fa9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/3988fa9d

Branch: refs/heads/HAWQ-1114_
Commit: 3988fa9d650bfce07c9951abfe52f093f148b3ee
Parents: f536900
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Tue Nov 22 15:37:15 2016 -0800
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Tue Nov 22 15:37:15 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hawq/pxf/api/FilterParser.java   | 56 ++++++++++++++++++--
 .../org/apache/hawq/pxf/api/io/DataType.java    | 19 +++++++
 .../apache/hawq/pxf/api/FilterParserTest.java   | 22 ++++++++
 .../hawq/pxf/plugins/hive/HiveORCAccessor.java  |  9 ++++
 .../pxf/plugins/hive/HiveORCAccessorTest.java   | 15 ++++++
 .../plugins/hive/HiveORCSearchArgumentTest.java | 42 ++++++++++++---
 6 files changed, 151 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java
index e362eed..ff0d972 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FilterParser.java
@@ -25,6 +25,8 @@ import org.apache.hawq.pxf.api.io.DataType;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Stack;
 
 /**
@@ -65,11 +67,13 @@ public class FilterParser {
     private Stack<Object> operandsStack;
     private FilterBuilder filterBuilder;
     public static final char COL_OP = 'a';
-    public static final char CONST_OP = 'c';
+    public static final char SCALAR_CONST_OP = 'c';
+    public static final char LIST_CONST_OP = 'm';
     public static final char CONST_LEN = 's';
     public static final char CONST_DATA = 'd';
     public static final char COMP_OP = 'o';
     public static final char LOG_OP = 'l';
+
     public static final String DEFAULT_CHARSET = "UTF-8";
 
     /** Supported operations by the parser. */
@@ -83,7 +87,8 @@ public class FilterParser {
         HDOP_NE,
         HDOP_LIKE,
         HDOP_IS_NULL,
-        HDOP_IS_NOT_NULL
+        HDOP_IS_NOT_NULL,
+        HDOP_IN
     }
 
     /**
@@ -213,8 +218,11 @@ public class FilterParser {
                 case COL_OP:
                     operandsStack.push(new ColumnIndex(safeToInt(parseNumber())));
                     break;
-                case CONST_OP:
-                    operandsStack.push(new Constant(parseParameter()));
+                case SCALAR_CONST_OP:
+                    operandsStack.push(new Constant(parseScalarParameter()));
+                    break;
+                case LIST_CONST_OP:
+                    operandsStack.push(new Constant(parseListParameter()));
                     break;
                 case COMP_OP:
                     opNumber = safeToInt(parseNumber());
@@ -354,6 +362,10 @@ public class FilterParser {
     }
 
     private Object convertDataType(byte[] byteData, int start, int end, DataType dataType) throws Exception {
+
+        if (byteData.length < end)
+            throw new FilterStringSyntaxException("filter string is shorter than expected");
+
         String data = new String(byteData, start, end-start, DEFAULT_CHARSET);
         try {
             switch (dataType) {
@@ -391,7 +403,7 @@ public class FilterParser {
     /**
      * Parses either a number or a string.
      */
-    private Object parseParameter() throws Exception {
+    private Object parseScalarParameter() throws Exception {
         if (index == filterByteArr.length) {
             throw new FilterStringSyntaxException("argument should follow at " + index);
         }
@@ -418,6 +430,40 @@ public class FilterParser {
         return data;
     }
 
+    private Object parseListParameter() throws Exception {
+        if (index == filterByteArr.length) {
+            throw new FilterStringSyntaxException("argument should follow at " + index);
+        }
+
+        DataType dataType = DataType.get(parseConstDataType());
+        List<Object> data = new ArrayList<Object>();
+        if (dataType == DataType.UNSUPPORTED_TYPE) {
+            throw new FilterStringSyntaxException("invalid DataType OID at " + (index - 1));
+        }
+
+        if (dataType.getTypeElem() == null) {
+            throw new FilterStringSyntaxException("expected non-scalar datatype, but got datatype with oid = " + dataType.getOID());
+        }
+
+        while (((char) filterByteArr[index]) == CONST_LEN) {
+            int dataLength = parseDataLength();
+
+            if (index + dataLength > filterByteArr.length) {
+                throw new FilterStringSyntaxException("data size larger than filter string starting at " + index);
+            }
+
+            if (((char) filterByteArr[index]) != CONST_DATA) {
+                throw new FilterStringSyntaxException("data delimiter 'd' expected at " + index);
+            }
+
+            index++;
+            data.add(convertDataType(filterByteArr, index, index+dataLength, dataType.getTypeElem()));
+            index += dataLength;
+        }
+
+        return data;
+    }
+
     private Long parseNumber() throws Exception {
         if (index == filterByteArr.length) {
             throw new FilterStringSyntaxException("numeric argument expected at " + index);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java
index cac700c..d3db038 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/io/DataType.java
@@ -49,17 +49,32 @@ public enum DataType {
     TIME(1083),
     TIMESTAMP(1114),
     NUMERIC(1700),
+
+    INT2ARRAY(1005),
+    INT4ARRAY(1007),
+    INT8ARRAY(1016),
+    BOOLARRAY(1000),
+    TEXTARRAY(1009),
+
     UNSUPPORTED_TYPE(-1);
 
     private static final Map<Integer, DataType> lookup = new HashMap<>();
 
     static {
+
+        INT2ARRAY.typeElem = SMALLINT;
+        INT4ARRAY.typeElem = INTEGER;
+        INT8ARRAY.typeElem = BIGINT;
+        BOOLARRAY.typeElem = BOOLEAN;
+        TEXTARRAY.typeElem = TEXT;
+
         for (DataType dt : EnumSet.allOf(DataType.class)) {
             lookup.put(dt.getOID(), dt);
         }
     }
 
     private final int OID;
+    private DataType typeElem;
 
     DataType(int OID) {
         this.OID = OID;
@@ -81,4 +96,8 @@ public enum DataType {
     public int getOID() {
         return OID;
     }
+
+    public DataType getTypeElem() {
+        return typeElem;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java
index 46f60f1..b754253 100644
--- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java
+++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FilterParserTest.java
@@ -143,6 +143,24 @@ public class FilterParserTest {
         index = 6;
         exception = "failed to parse number data type starting at " + index;
         runParseNegative("const operand with an invalid value", filter, exception);
+
+        filter = "m1122";
+        index = 4;
+        exception = "invalid DataType OID at " + index;
+        runParseNegative("const operand with an invalid datatype", filter, exception);
+
+        filter = "m20";
+        exception = "expected non-scalar datatype, but got datatype with oid = 20";
+        runParseNegative("const operand with an scalar datatype instead of list", filter, exception);
+
+        filter = "m1007s1d1s1d2s2d3";
+        exception = "filter string is shorter than expected";
+        runParseNegative("const operand with list datatype, and \"d\" tag has less data than indicated in \"s\" tag", filter, exception);
+
+        filter = "m1007s1d1s1d2s2d123";
+        index = 18;
+        exception = "unknown opcode 3(51) at " + index;
+        runParseNegative("const operand with list datatype, and \"d\" tag has more data than indicated in \"s\" tag", filter, exception);
     }
 
     @Test
@@ -272,6 +290,10 @@ public class FilterParserTest {
         op = Operation.HDOP_IS_NOT_NULL;
         runParseOneUnaryOperation("this filter was build from HDOP_IS_NULL", filter, op);
 
+        filter = "a1m1005s1d1s1d2s1d3o10";
+        op = Operation.HDOP_IN;
+        runParseOneOperation("this filter was built from HDOP_IN", filter, op);
+
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index 9d79f97..dc195f4 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -195,6 +195,15 @@ public class HiveORCAccessor extends HiveAccessor {
             case HDOP_IS_NOT_NULL:
                 builder.startNot().isNull(filterColumnName).end();
                 break;
+            case HDOP_IN:
+                if (filterValue instanceof List) {
+                    @SuppressWarnings("unchecked")
+                    List<Object> l = (List<Object>)filterValue;
+                    builder.in(filterColumnName, l.toArray());
+                } else {
+                    throw new IllegalArgumentException("filterValue should be instace of List for HDOP_IN operation");
+                }
+                break;
             default: {
                 LOG.debug("Filter push-down is not supported for " + filter.getOperation() + "operation.");
                 return false;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
index c5700b6..7bbe811 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
@@ -103,4 +103,19 @@ public class HiveORCAccessorTest {
         assertEquals(sarg.toKryo(), jobConf.get(SARG_PUSHDOWN));
     }
 
+    @Test
+    public void parseFilterWithIn() throws Exception {
+
+        when(inputData.hasFilter()).thenReturn(true);
+        when(inputData.getFilterString()).thenReturn("a1m1007s1d1s1d2s1d3o10");
+        when(columnDesc.columnName()).thenReturn("FOO");
+        when(inputData.getColumn(1)).thenReturn(columnDesc);
+
+        accessor.openForRead();
+
+        SearchArgument sarg = SearchArgumentFactory.newBuilder().startAnd().in("FOO", 1, 2, 3).end().build();
+
+        assertEquals(sarg.toKryo(), jobConf.get(SARG_PUSHDOWN));
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3988fa9d/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java
index 382f065..74d87e7 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCSearchArgumentTest.java
@@ -40,14 +40,25 @@ public class HiveORCSearchArgumentTest {
         HiveFilterBuilder eval = new HiveFilterBuilder(null);
         Object filter = eval.getFilterObject(filterStr);
 
-        Object current = filter;
         SearchArgument.Builder filterBuilder = SearchArgumentFactory.newBuilder();
         buildExpression(filterBuilder, Arrays.asList(filter));
         SearchArgument sarg = filterBuilder.build();
         Assert.assertEquals("and(or(lt(col1, 5), not(lteq(col1, 1))), or(lt(col1, 5), lteq(col1, 3)))", sarg.toFilterPredicate().toString());
     }
 
-    private void buildExpression(SearchArgument.Builder builder, List<Object> filterList) {
+    @Test
+    public void buildIn() throws Exception {
+        String filterStr = "a0m1009s4drow1s4drow2o10a1m1009s3ds_6s3ds_7o10l0";
+        HiveFilterBuilder eval = new HiveFilterBuilder(null);
+        Object filter = eval.getFilterObject(filterStr);
+
+        SearchArgument.Builder filterBuilder = SearchArgumentFactory.newBuilder();
+        buildExpression(filterBuilder, Arrays.asList(filter));
+        SearchArgument sarg = filterBuilder.build();
+        Assert.assertEquals("and(or(eq(col1, Binary{\"row1\"}), eq(col1, Binary{\"row2\"})), or(eq(col1, Binary{\"s_6\"}), eq(col1, Binary{\"s_7\"})))", sarg.toFilterPredicate().toString());
+    }
+
+    private boolean buildExpression(SearchArgument.Builder builder, List<Object> filterList) {
         for (Object f : filterList) {
             if (f instanceof LogicalFilter) {
                 switch(((LogicalFilter) f).getOperator()) {
@@ -61,15 +72,21 @@ public class HiveORCSearchArgumentTest {
                         builder.startNot();
                         break;
                 }
-                buildExpression(builder, ((LogicalFilter) f).getFilterList());
-                builder.end();
+                if (buildExpression(builder, ((LogicalFilter) f).getFilterList())) {
+                    builder.end();
+                } else {
+                    return false;
+                }
             } else {
-                buildArgument(builder, f);
+                if (!buildArgument(builder, f)) {
+                    return false;
+                }
             }
         }
+        return true;
     }
 
-    private void buildArgument(SearchArgument.Builder builder, Object filterObj) {
+    private boolean buildArgument(SearchArgument.Builder builder, Object filterObj) {
         /* The below functions will not be compatible and requires update  with Hive 2.0 APIs */
         BasicFilter filter = (BasicFilter) filterObj;
         int filterColumnIndex = filter.getColumn().index();
@@ -97,7 +114,18 @@ public class HiveORCSearchArgumentTest {
             case HDOP_NE:
                 builder.startNot().equals(filterColumnName, filterValue).end();
                 break;
+            case HDOP_IN:
+                if (filterValue instanceof List) {
+                    @SuppressWarnings("unchecked")
+                    List<Object> l = (List<Object>)filterValue;
+                    builder.in(filterColumnName, l.toArray());
+                } else {
+                    throw new IllegalArgumentException("filterValue should be instace of List for HDOP_IN operation");
+                }
+                break;
+            default:
+                return false;
         }
-        return;
+        return true;
     }
 }