You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2018/01/31 09:26:43 UTC

incubator-hawq git commit: HAWQ-1527 Added feature to enable partition filtering for integral data types

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 9ce5be82d -> 6fae88a57


HAWQ-1527 Added feature to enable partition filtering for integral data types


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/6fae88a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/6fae88a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/6fae88a5

Branch: refs/heads/master
Commit: 6fae88a5781de4ffad3cc1cd7fdb6966be59e67e
Parents: 9ce5be8
Author: Shubham Sharma <sh...@gmail.com>
Authored: Fri Sep 15 16:05:19 2017 -0700
Committer: rlei <rl...@pivotal.io>
Committed: Wed Jan 31 17:26:17 2018 +0800

----------------------------------------------------------------------
 .../pxf/plugins/hive/HiveDataFragmenter.java    |  24 ++++-
 .../plugins/hive/HiveDataFragmenterTest.java    | 107 +++++++++++++++++++
 2 files changed, 127 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fae88a5/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
index c24b552..76b83e6 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
@@ -60,6 +60,7 @@ import org.apache.hawq.pxf.api.utilities.ProfilesConf;
 import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.hawq.pxf.plugins.hive.utilities.ProfileFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
 
 /**
  * Fragmenter class for HIVE tables. <br>
@@ -100,6 +101,7 @@ public class HiveDataFragmenter extends Fragmenter {
     private Set<String> setPartitions = new TreeSet<String>(
             String.CASE_INSENSITIVE_ORDER);
     private Map<String, String> partitionkeyTypes = new HashMap<>();
+    private boolean canPushDownIntegral;
 
     /**
      * Constructs a HiveDataFragmenter object.
@@ -120,6 +122,9 @@ public class HiveDataFragmenter extends Fragmenter {
         super(inputData);
         jobConf = new JobConf(new Configuration(), clazz);
         client = HiveUtilities.initHiveClient();
+        // canPushDownIntegral represents hive.metastore.integral.jdo.pushdown property in hive-site.xml
+        canPushDownIntegral =
+                HiveConf.getBoolVar(new HiveConf(), HiveConf.ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN);
     }
 
     @Override
@@ -409,7 +414,11 @@ public class HiveDataFragmenter extends Fragmenter {
         String filterValue = bFilter.getConstant()!= null ? bFilter.getConstant().constant().toString() : "";
         ColumnDescriptor filterColumn = inputData.getColumn(filterColumnIndex);
         String filterColumnName = filterColumn.columnName();
-
+        FilterParser.Operation operation = ((BasicFilter) filter).getOperation();
+        String colType = partitionkeyTypes.get(filterColumnName);
+        boolean isIntegralSupported =
+                canPushDownIntegral &&
+                        (operation == FilterParser.Operation.HDOP_EQ || operation == FilterParser.Operation.HDOP_NE);
         // In case this filter is not a partition, we ignore this filter (no add
         // to filter list)
         if (!setPartitions.contains(filterColumnName)) {
@@ -418,8 +427,15 @@ public class HiveDataFragmenter extends Fragmenter {
             return false;
         }
 
-		if (!partitionkeyTypes.get(filterColumnName).equalsIgnoreCase(serdeConstants.STRING_TYPE_NAME)) {
-            LOG.debug("Filter type is not string type , ignore this filter for hive: "
+        /* 
+         * HAWQ-1527 - Filtering only supported for partition columns of type string or 
+         * intgeral datatype. Integral datatypes include - TINYINT, SMALLINT, INT, BIGINT. 
+         * Note that with integral data types only equals("=") and not equals("!=") operators
+         * are supported. There are no operator restrictions with String.
+         */
+        if (!colType.equalsIgnoreCase(serdeConstants.STRING_TYPE_NAME)
+                && (!isIntegralSupported || !serdeConstants.IntegralTypes.contains(colType))) {
+            LOG.debug("Column type is neither string nor an integral data type, ignore this filter for hive: "
                     + filter);
             return false;
         }
@@ -428,7 +444,7 @@ public class HiveDataFragmenter extends Fragmenter {
             filtersString.append(prefix);
         filtersString.append(filterColumnName);
 
-        switch(((BasicFilter) filter).getOperation()) {
+        switch(operation) {
             case HDOP_EQ:
                 filtersString.append(HIVE_API_EQ);
                 break;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fae88a5/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenterTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenterTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenterTest.java
index 2c28500..1a04b17 100755
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenterTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenterTest.java
@@ -145,6 +145,113 @@ public class HiveDataFragmenterTest {
         }
     }
 
+    @Test
+    public void testIntegralPushdown() throws Exception {
+        prepareConstruction();
+        fragmenter = new HiveDataFragmenter(inputData);
+        // Mock private field partitionkeyTypes
+        Field partitionkeyTypes = PowerMockito.field(HiveDataFragmenter.class, "partitionkeyTypes");
+        // Mock private method buildSingleFilter
+        Method method = PowerMockito.method(HiveDataFragmenter.class, "buildSingleFilter",
+                new Class[]{Object.class,StringBuilder.class,String.class});
+        //Mock private field setPartitions
+        Field setPartitions = PowerMockito.field(HiveDataFragmenter.class, "setPartitions");
+        //Mock private field canPushDownIntegral
+        Field canPushDownIntegral = PowerMockito.field(HiveDataFragmenter.class, "canPushDownIntegral");
+        canPushDownIntegral.set(fragmenter,true);
+
+        ColumnDescriptor dateColumnDescriptor =
+                new ColumnDescriptor("dateColumn", 1082, 1, "date", null, true);
+        ColumnDescriptor stringColumnDescriptor =
+                new ColumnDescriptor("stringColumn", 25, 1, "string", null, true);
+        ColumnDescriptor intColumnDescriptor =
+                new ColumnDescriptor("intColumn", 23, 1, "int", null, true);
+        ColumnDescriptor bigIntColumnDescriptor =
+                new ColumnDescriptor("bigIntColumn", 20, 1, "bigint", null, true);
+        ColumnDescriptor smallIntColumnDescriptor =
+                new ColumnDescriptor("smallIntColumn", 21, 1, "smallint", null, true);
+        List<ColumnDescriptor> columnDescriptors = new ArrayList<>();
+
+        columnDescriptors.add(dateColumnDescriptor);
+        columnDescriptors.add(stringColumnDescriptor);
+        columnDescriptors.add(intColumnDescriptor);
+        columnDescriptors.add(bigIntColumnDescriptor);
+        columnDescriptors.add(smallIntColumnDescriptor);
+
+        for (ColumnDescriptor cd : columnDescriptors){
+
+            checkPushDownFilter(fragmenter, cd, method, partitionkeyTypes,setPartitions);
+        }
+    }
+
+    private void checkPushDownFilter(HiveDataFragmenter fragmenter, ColumnDescriptor columnDescriptor, Method method,
+                                     Field partitionkeyTypes, Field setPartitions) throws Exception{
+        String filterColumnName=columnDescriptor.columnName();
+        int filterColumnIndex = columnDescriptor.columnIndex();
+        String typeName = columnDescriptor.columnTypeName();
+        int typeCode = columnDescriptor.columnTypeCode();
+        String data = "2016-08-11";
+        String dataIntegralDataTypes= "126";
+        String filterString = "a"+filterColumnIndex+"c"+typeCode+"s"+data.length()+"d"+data+"o";
+        String filterIntegralDataTypes =
+                "a"+filterColumnIndex+"c"+typeCode+"s"+dataIntegralDataTypes.length()+"d"+dataIntegralDataTypes+"o";
+        int notEquals=6;
+        int equals=5;
+        int greaterEquals=4;
+
+        when(inputData.getColumn(filterColumnIndex)).thenReturn(columnDescriptor);
+        //Set partition Key type
+        Map<String, String> localpartitionkeyTypes = new HashMap<>();
+        localpartitionkeyTypes.put(filterColumnName,typeName);
+        partitionkeyTypes.set(fragmenter,localpartitionkeyTypes);
+        // Set column as partition
+        Set<String> localSetPartitions = new TreeSet<String>(
+                String.CASE_INSENSITIVE_ORDER);
+        localSetPartitions.add(filterColumnName);
+        setPartitions.set(fragmenter,localSetPartitions);
+
+        switch(typeName){
+
+            case "date":
+                assertFalse(isColumnStringOrIntegral(method, filterString+notEquals));
+                assertFalse(isColumnStringOrIntegral(method, filterString+equals));
+                assertFalse(isColumnStringOrIntegral(method, filterString+greaterEquals));
+                break;
+            case "string":
+                assertTrue(isColumnStringOrIntegral(method, filterString+notEquals));
+                assertTrue(isColumnStringOrIntegral(method, filterString+equals));
+                assertTrue(isColumnStringOrIntegral(method, filterString+greaterEquals));
+                break;
+            case "int":
+                assertTrue(isColumnStringOrIntegral(method, filterIntegralDataTypes+notEquals));
+                assertTrue(isColumnStringOrIntegral(method, filterIntegralDataTypes+equals));
+                assertFalse(isColumnStringOrIntegral(method, filterIntegralDataTypes+greaterEquals));
+                break;
+            case "bigint":
+                assertTrue(isColumnStringOrIntegral(method, filterIntegralDataTypes+notEquals));
+                assertTrue(isColumnStringOrIntegral(method, filterIntegralDataTypes+equals));
+                assertFalse(isColumnStringOrIntegral(method, filterIntegralDataTypes+greaterEquals));
+                break;
+            case "smallint":
+                assertTrue(isColumnStringOrIntegral(method, filterIntegralDataTypes+notEquals));
+                assertTrue(isColumnStringOrIntegral(method, filterIntegralDataTypes+equals));
+                assertFalse(isColumnStringOrIntegral(method, filterIntegralDataTypes+greaterEquals));
+                break;
+        }
+    }
+
+    private boolean isColumnStringOrIntegral(Method method, String filterString) throws Exception{
+        BasicFilter bFilter;
+        String prefix="";
+        StringBuilder localFilterString = new StringBuilder();
+        boolean result;
+        HiveFilterBuilder builder = new HiveFilterBuilder(null);
+
+        bFilter = (BasicFilter) builder.getFilterObject(filterString);
+        result = (Boolean)method.invoke(fragmenter, new Object[]{bFilter,localFilterString,prefix});
+        return result;
+    }
+
     private void checkFilters(HiveDataFragmenter fragmenter, BasicFilter bFilter, FilterParser.Operation operation)
             throws Exception{