You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/09/22 23:32:01 UTC

[20/52] [abbrv] hive git commit: HIVE-10289: Support filter on non-first partition key and non-string partition key (Daniel Dai reviewed by Alan Gates)

http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java
index ec99685..9762309 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java
@@ -20,15 +20,30 @@ package org.apache.hadoop.hive.metastore.hbase;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.hbase.PartitionKeyComparator.Operator;
 import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
 import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode;
 import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
 import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
@@ -147,7 +162,7 @@ class HBaseFilterPlanUtil {
   public static class ScanPlan extends FilterPlan {
 
     public static class ScanMarker {
-      final byte[] bytes;
+      final String value;
       /**
        * If inclusive = true, it means that the
        * marker includes those bytes.
@@ -155,20 +170,24 @@ class HBaseFilterPlanUtil {
        * or ends at the next possible byte array
        */
       final boolean isInclusive;
-      ScanMarker(byte [] b, boolean i){
-        this.bytes = b;
+      final String type;
+      ScanMarker(String obj, boolean i, String type){
+        this.value = obj;
         this.isInclusive = i;
+        this.type = type;
       }
       @Override
       public String toString() {
-        return "ScanMarker [bytes=" + Arrays.toString(bytes) + ", isInclusive=" + isInclusive + "]";
+        return "ScanMarker [" + "value=" + value.toString() + ", isInclusive=" + isInclusive +
+            ", type=" + type + "]";
       }
       @Override
       public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + Arrays.hashCode(bytes);
+        result = prime * result + value.hashCode();
         result = prime * result + (isInclusive ? 1231 : 1237);
+        result = prime * result + type.hashCode();
         return result;
       }
       @Override
@@ -180,48 +199,118 @@ class HBaseFilterPlanUtil {
         if (getClass() != obj.getClass())
           return false;
         ScanMarker other = (ScanMarker) obj;
-        if (!Arrays.equals(bytes, other.bytes))
+        if (!value.equals(other.value))
           return false;
         if (isInclusive != other.isInclusive)
           return false;
+        if (type != other.type)
+          return false;
         return true;
       }
     }
-    // represent Scan start
-    private ScanMarker startMarker = new ScanMarker(null, false);
-    // represent Scan end
-    private ScanMarker endMarker = new ScanMarker(null, false);
-
-    private ScanFilter filter;
-
-    public ScanFilter getFilter() {
-      return filter;
+    public static class ScanMarkerPair {
+      public ScanMarkerPair(ScanMarker startMarker, ScanMarker endMarker) {
+        this.startMarker = startMarker;
+        this.endMarker = endMarker;
+      }
+      ScanMarker startMarker;
+      ScanMarker endMarker;
+    }
+    // represent Scan start, partition key name -> scanMarkerPair
+    Map<String, ScanMarkerPair> markers = new HashMap<String, ScanMarkerPair>();
+    List<Operator> ops = new ArrayList<Operator>();
+
+    // Get the number of partition key prefixes which can be used in the scan range.
+    // For example, if partition key is (year, month, state)
+    // 1. year = 2015 and month >= 1 and month < 5
+    //    year + month can be used in scan range, majorParts = 2
+    // 2. year = 2015 and state = 'CA'
+    //    only year can be used in scan range, majorParts = 1
+    // 3. month = 10 and state = 'CA'
+    //    nothing can be used in scan range, majorParts = 0
+    private int getMajorPartsCount(List<FieldSchema> parts) {
+      int majorPartsCount = 0;
+      while (majorPartsCount<parts.size() && markers.containsKey(parts.get(majorPartsCount).getName())) {
+        ScanMarkerPair pair = markers.get(parts.get(majorPartsCount).getName());
+        majorPartsCount++;
+        if (pair.startMarker!=null && pair.endMarker!=null && pair.startMarker.value.equals(pair
+            .endMarker.value) && pair.startMarker.isInclusive && pair.endMarker.isInclusive) {
+          // is equal
+          continue;
+        } else {
+          break;
+        }
+      }
+      return majorPartsCount;
     }
+    public Filter getFilter(List<FieldSchema> parts) {
+      int majorPartsCount = getMajorPartsCount(parts);
+      Set<String> majorKeys = new HashSet<String>();
+      for (int i=0;i<majorPartsCount;i++) {
+        majorKeys.add(parts.get(i).getName());
+      }
 
-    public void setFilter(ScanFilter filter) {
-      this.filter = filter;
-    }
+      List<String> names = HBaseUtils.getPartitionNames(parts);
+      List<PartitionKeyComparator.Range> ranges = new ArrayList<PartitionKeyComparator.Range>();
+      for (Map.Entry<String, ScanMarkerPair> entry : markers.entrySet()) {
+        if (names.contains(entry.getKey()) && !majorKeys.contains(entry.getKey())) {
+          PartitionKeyComparator.Mark startMark = null;
+          if (entry.getValue().startMarker != null) {
+            startMark = new PartitionKeyComparator.Mark(entry.getValue().startMarker.value,
+                entry.getValue().startMarker.isInclusive);
+          }
+          PartitionKeyComparator.Mark endMark = null;
+          if (entry.getValue().endMarker != null) {
+            startMark = new PartitionKeyComparator.Mark(entry.getValue().endMarker.value,
+                entry.getValue().endMarker.isInclusive);
+          }
+          PartitionKeyComparator.Range range = new PartitionKeyComparator.Range(
+              entry.getKey(), startMark, endMark);
+          ranges.add(range);
+        }
+      }
 
-    public ScanMarker getStartMarker() {
-      return startMarker;
+      if (ranges.isEmpty() && ops.isEmpty()) {
+        return null;
+      } else {
+        return new RowFilter(CompareFilter.CompareOp.EQUAL, new PartitionKeyComparator(
+            StringUtils.join(names, ","), StringUtils.join(HBaseUtils.getPartitionKeyTypes(parts), ","),
+            ranges, ops));
+      }
     }
 
-    public void setStartMarker(ScanMarker startMarker) {
-      this.startMarker = startMarker;
-    }
-    public void setStartMarker(byte[] start, boolean isInclusive) {
-      setStartMarker(new ScanMarker(start, isInclusive));
+    public void setStartMarker(String keyName, String keyType, String start, boolean isInclusive) {
+      if (markers.containsKey(keyName)) {
+        markers.get(keyName).startMarker = new ScanMarker(start, isInclusive, keyType);
+      } else {
+        ScanMarkerPair marker = new ScanMarkerPair(new ScanMarker(start, isInclusive, keyType), null);
+        markers.put(keyName, marker);
+      }
     }
 
-    public ScanMarker getEndMarker() {
-      return endMarker;
+    public ScanMarker getStartMarker(String keyName) {
+      if (markers.containsKey(keyName)) {
+        return markers.get(keyName).startMarker;
+      } else {
+        return null;
+      }
     }
 
-    public void setEndMarker(ScanMarker endMarker) {
-      this.endMarker = endMarker;
+    public void setEndMarker(String keyName, String keyType, String end, boolean isInclusive) {
+      if (markers.containsKey(keyName)) {
+        markers.get(keyName).endMarker = new ScanMarker(end, isInclusive, keyType);
+      } else {
+        ScanMarkerPair marker = new ScanMarkerPair(null, new ScanMarker(end, isInclusive, keyType));
+        markers.put(keyName, marker);
+      }
     }
-    public void setEndMarker(byte[] end, boolean isInclusive) {
-      setEndMarker(new ScanMarker(end, isInclusive));
+
+    public ScanMarker getEndMarker(String keyName) {
+      if (markers.containsKey(keyName)) {
+        return markers.get(keyName).endMarker;
+      } else {
+        return null;
+      }
     }
 
     @Override
@@ -236,28 +325,33 @@ class HBaseFilterPlanUtil {
     private ScanPlan and(ScanPlan other) {
       // create combined FilterPlan based on existing lhs and rhs plan
       ScanPlan newPlan = new ScanPlan();
+      newPlan.markers.putAll(markers);
+
+      for (String keyName : other.markers.keySet()) {
+        if (newPlan.markers.containsKey(keyName)) {
+          // create new scan start
+          ScanMarker greaterStartMarker = getComparedMarker(this.getStartMarker(keyName),
+              other.getStartMarker(keyName), true);
+          if (greaterStartMarker != null) {
+            newPlan.setStartMarker(keyName, greaterStartMarker.type, greaterStartMarker.value, greaterStartMarker.isInclusive);
+          }
+
+          // create new scan end
+          ScanMarker lesserEndMarker = getComparedMarker(this.getEndMarker(keyName), other.getEndMarker(keyName),
+              false);
+          if (lesserEndMarker != null) {
+            newPlan.setEndMarker(keyName, lesserEndMarker.type, lesserEndMarker.value, lesserEndMarker.isInclusive);
+          }
+        } else {
+          newPlan.markers.put(keyName, other.markers.get(keyName));
+        }
+      }
 
-      // create new scan start
-      ScanMarker greaterStartMarker = getComparedMarker(this.getStartMarker(),
-          other.getStartMarker(), true);
-      newPlan.setStartMarker(greaterStartMarker);
-
-      // create new scan end
-      ScanMarker lesserEndMarker = getComparedMarker(this.getEndMarker(), other.getEndMarker(),
-          false);
-      newPlan.setEndMarker(lesserEndMarker);
-
-      // create new filter plan
-      newPlan.setFilter(createCombinedFilter(this.getFilter(), other.getFilter()));
-
+      newPlan.ops.addAll(ops);
+      newPlan.ops.addAll(other.ops);
       return newPlan;
     }
 
-    private ScanFilter createCombinedFilter(ScanFilter filter1, ScanFilter filter2) {
-      // TODO create combined filter - filter1 && filter2
-      return null;
-    }
-
     /**
      * @param lStartMarker
      * @param rStartMarker
@@ -268,13 +362,23 @@ class HBaseFilterPlanUtil {
     static ScanMarker getComparedMarker(ScanMarker lStartMarker, ScanMarker rStartMarker,
         boolean getGreater) {
       // if one of them has null bytes, just return other
-      if(lStartMarker.bytes == null) {
+      if(lStartMarker == null) {
         return rStartMarker;
-      } else if (rStartMarker.bytes == null) {
+      } else if (rStartMarker == null) {
         return lStartMarker;
       }
-
-      int compareRes = compare(lStartMarker.bytes, rStartMarker.bytes);
+      TypeInfo expectedType =
+          TypeInfoUtils.getTypeInfoFromTypeString(lStartMarker.type);
+      ObjectInspector outputOI =
+          TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(expectedType);
+      Converter lConverter = ObjectInspectorConverters.getConverter(
+          PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+      Converter rConverter = ObjectInspectorConverters.getConverter(
+          PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+      Comparable lValue = (Comparable)lConverter.convert(lStartMarker.value);
+      Comparable rValue = (Comparable)rConverter.convert(rStartMarker.value);
+
+      int compareRes = lValue.compareTo(rValue);
       if (compareRes == 0) {
         // bytes are equal, now compare the isInclusive flags
         if (lStartMarker.isInclusive == rStartMarker.isInclusive) {
@@ -287,7 +391,7 @@ class HBaseFilterPlanUtil {
           isInclusive = false;
         }
         // else
-        return new ScanMarker(lStartMarker.bytes, isInclusive);
+        return new ScanMarker(lStartMarker.value, isInclusive, lStartMarker.type);
       }
       if (getGreater) {
         return compareRes == 1 ? lStartMarker : rStartMarker;
@@ -313,42 +417,74 @@ class HBaseFilterPlanUtil {
     /**
      * @return row suffix - This is appended to db + table, to generate start row for the Scan
      */
-    public byte[] getStartRowSuffix() {
-      if (startMarker.isInclusive) {
-        return startMarker.bytes;
-      } else {
-        return HBaseUtils.getEndPrefix(startMarker.bytes);
+    public byte[] getStartRowSuffix(String dbName, String tableName, List<FieldSchema> parts) {
+      int majorPartsCount = getMajorPartsCount(parts);
+      List<String> majorPartTypes = new ArrayList<String>();
+      List<String> components = new ArrayList<String>();
+      boolean endPrefix = false;
+      for (int i=0;i<majorPartsCount;i++) {
+        majorPartTypes.add(parts.get(i).getType());
+        ScanMarker marker = markers.get(parts.get(i).getName()).startMarker;
+        if (marker != null) {
+          components.add(marker.value);
+          if (i==majorPartsCount-1) {
+            endPrefix = !marker.isInclusive;
+          }
+        } else {
+          components.add(null);
+          if (i==majorPartsCount-1) {
+            endPrefix = false;
+          }
+        }
       }
+      byte[] bytes = HBaseUtils.buildPartitionKey(dbName, tableName, majorPartTypes, components, endPrefix);
+      return bytes;
     }
 
     /**
      * @return row suffix - This is appended to db + table, to generate end row for the Scan
      */
-    public byte[] getEndRowSuffix() {
-      if (endMarker.isInclusive) {
-        return HBaseUtils.getEndPrefix(endMarker.bytes);
-      } else {
-        return endMarker.bytes;
+    public byte[] getEndRowSuffix(String dbName, String tableName, List<FieldSchema> parts) {
+      int majorPartsCount = getMajorPartsCount(parts);
+      List<String> majorPartTypes = new ArrayList<String>();
+      List<String> components = new ArrayList<String>();
+      boolean endPrefix = false;
+      for (int i=0;i<majorPartsCount;i++) {
+        majorPartTypes.add(parts.get(i).getType());
+        ScanMarker marker = markers.get(parts.get(i).getName()).endMarker;
+        if (marker != null) {
+          components.add(marker.value);
+          if (i==majorPartsCount-1) {
+            endPrefix = marker.isInclusive;
+          }
+        } else {
+          components.add(null);
+          if (i==majorPartsCount-1) {
+            endPrefix = true;
+          }
+        }
+      }
+      byte[] bytes = HBaseUtils.buildPartitionKey(dbName, tableName, majorPartTypes, components, endPrefix);
+      if (components.isEmpty()) {
+        bytes[bytes.length-1]++;
       }
+      return bytes;
     }
 
     @Override
     public String toString() {
-      return "ScanPlan [startMarker=" + startMarker + ", endMarker=" + endMarker + ", filter="
-          + filter + "]";
+      StringBuffer sb = new StringBuffer();
+      sb.append("ScanPlan:\n");
+      for (Map.Entry<String, ScanMarkerPair> entry : markers.entrySet()) {
+        sb.append("key=" + entry.getKey() + "[startMarker=" + entry.getValue().startMarker
+            + ", endMarker=" + entry.getValue().endMarker + "]");
+      }
+      return sb.toString();
     }
 
   }
 
   /**
-   * represent a plan that can be used to create a hbase filter and then set in
-   * Scan.setFilter()
-   */
-  public static class ScanFilter {
-    // TODO: implement this
-  }
-
-  /**
    * Visitor for ExpressionTree.
    * It first generates the ScanPlan for the leaf nodes. The higher level nodes are
    * either AND or OR operations. It then calls FilterPlan.and and FilterPlan.or with
@@ -369,9 +505,12 @@ class HBaseFilterPlanUtil {
     // temporary params for current left and right side plans, for AND, OR
     private FilterPlan rPlan;
 
-    private final String firstPartcolumn;
-    public PartitionFilterGenerator(String firstPartitionColumn) {
-      this.firstPartcolumn = firstPartitionColumn;
+    private Map<String, String> nameToType = new HashMap<String, String>();
+
+    public PartitionFilterGenerator(List<FieldSchema> parts) {
+      for (FieldSchema part : parts) {
+        nameToType.put(part.getName(), part.getType());
+      }
     }
 
     FilterPlan getPlan() {
@@ -414,63 +553,37 @@ class HBaseFilterPlanUtil {
     public void visit(LeafNode node) throws MetaException {
       ScanPlan leafPlan = new ScanPlan();
       curPlan = leafPlan;
-      if (!isFirstParitionColumn(node.keyName)) {
-        leafPlan.setFilter(generateScanFilter(node));
-        return;
-      }
-      if (!(node.value instanceof String)) {
-        // only string type is supported currently
-        // treat conditions on other types as true
-        return;
-      }
 
       // this is a condition on first partition column, so might influence the
       // start and end of the scan
       final boolean INCLUSIVE = true;
       switch (node.operator) {
       case EQUALS:
-        leafPlan.setStartMarker(toBytes(node.value), INCLUSIVE);
-        leafPlan.setEndMarker(toBytes(node.value), INCLUSIVE);
+        leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
+        leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
         break;
       case GREATERTHAN:
-        leafPlan.setStartMarker(toBytes(node.value), !INCLUSIVE);
+        leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), !INCLUSIVE);
         break;
       case GREATERTHANOREQUALTO:
-        leafPlan.setStartMarker(toBytes(node.value), INCLUSIVE);
+        leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
         break;
       case LESSTHAN:
-        leafPlan.setEndMarker(toBytes(node.value), !INCLUSIVE);
+        leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), !INCLUSIVE);
         break;
       case LESSTHANOREQUALTO:
-        leafPlan.setEndMarker(toBytes(node.value), INCLUSIVE);
+        leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
         break;
       case LIKE:
+        leafPlan.ops.add(new Operator(Operator.Type.LIKE, node.keyName, node.value.toString()));
+        break;
       case NOTEQUALS:
       case NOTEQUALS2:
-        // TODO: create filter plan for these
-        hasUnsupportedCondition = true;
+        leafPlan.ops.add(new Operator(Operator.Type.NOTEQUALS, node.keyName, node.value.toString()));
         break;
       }
     }
 
-    @VisibleForTesting
-    static byte[] toBytes(Object value) {
-      // TODO: actually implement this
-      // We need to determine the actual type and use appropriate
-      // serialization format for that type
-      return ((String) value).getBytes(HBaseUtils.ENCODING);
-    }
-
-    private ScanFilter generateScanFilter(LeafNode node) {
-      // TODO Auto-generated method stub
-      hasUnsupportedCondition = true;
-      return null;
-    }
-
-    private boolean isFirstParitionColumn(String keyName) {
-      return keyName.equalsIgnoreCase(firstPartcolumn);
-    }
-
     private boolean hasUnsupportedCondition() {
       return hasUnsupportedCondition;
     }
@@ -486,12 +599,12 @@ class HBaseFilterPlanUtil {
     }
   }
 
-  public static PlanResult getFilterPlan(ExpressionTree exprTree, String firstPartitionColumn) throws MetaException {
+  public static PlanResult getFilterPlan(ExpressionTree exprTree, List<FieldSchema> parts) throws MetaException {
     if (exprTree == null) {
       // TODO: if exprTree is null, we should do what ObjectStore does. See HIVE-10102
       return new PlanResult(new ScanPlan(), true);
     }
-    PartitionFilterGenerator pGenerator = new PartitionFilterGenerator(firstPartitionColumn);
+    PartitionFilterGenerator pGenerator = new PartitionFilterGenerator(parts);
     exprTree.accept(pGenerator);
     return new PlanResult(pGenerator.getPlan(), pGenerator.hasUnsupportedCondition());
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
index ca1582e..66c46a5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.hive.metastore.hbase;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -51,6 +54,7 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.Role;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.hbase.PartitionKeyComparator.Operator;
 import org.apache.hive.common.util.BloomFilter;
 
 import java.io.IOException;
@@ -493,12 +497,12 @@ public class HBaseReadWrite {
    * @return a list of partition objects.
    * @throws IOException
    */
-   List<Partition> getPartitions(String dbName, String tableName, List<List<String>> partValLists)
-       throws IOException {
+   List<Partition> getPartitions(String dbName, String tableName, List<String> partTypes,
+       List<List<String>> partValLists) throws IOException {
      List<Partition> parts = new ArrayList<>(partValLists.size());
      List<Get> gets = new ArrayList<>(partValLists.size());
      for (List<String> partVals : partValLists) {
-       byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals);
+       byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partTypes, partVals);
        Get get = new Get(key);
        get.addColumn(CATALOG_CF, CATALOG_COL);
        gets.add(get);
@@ -526,7 +530,8 @@ public class HBaseReadWrite {
    */
   void putPartition(Partition partition) throws IOException {
     byte[] hash = putStorageDescriptor(partition.getSd());
-    byte[][] serialized = HBaseUtils.serializePartition(partition, hash);
+    byte[][] serialized = HBaseUtils.serializePartition(partition,
+        HBaseUtils.getPartitionKeyTypes(getTable(partition.getDbName(), partition.getTableName()).getPartitionKeys()), hash);
     store(PART_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]);
     partCache.put(partition.getDbName(), partition.getTableName(), partition);
   }
@@ -547,7 +552,8 @@ public class HBaseReadWrite {
       decrementStorageDescriptorRefCount(oldPart.getSd());
       hash = putStorageDescriptor(newPart.getSd());
     }
-    byte[][] serialized = HBaseUtils.serializePartition(newPart, hash);
+    byte[][] serialized = HBaseUtils.serializePartition(newPart,
+        HBaseUtils.getPartitionKeyTypes(getTable(newPart.getDbName(), newPart.getTableName()).getPartitionKeys()), hash);
     store(PART_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]);
     partCache.put(newPart.getDbName(), newPart.getTableName(), newPart);
     if (!oldPart.getTableName().equals(newPart.getTableName())) {
@@ -565,7 +571,9 @@ public class HBaseReadWrite {
     List<Put> puts = new ArrayList<>(partitions.size());
     for (Partition partition : partitions) {
       byte[] hash = putStorageDescriptor(partition.getSd());
-      byte[][] serialized = HBaseUtils.serializePartition(partition, hash);
+      List<String> partTypes = HBaseUtils.getPartitionKeyTypes(
+          getTable(partition.getDbName(), partition.getTableName()).getPartitionKeys());
+      byte[][] serialized = HBaseUtils.serializePartition(partition, partTypes, hash);
       Put p = new Put(serialized[0]);
       p.add(CATALOG_CF, CATALOG_COL, serialized[1]);
       puts.add(p);
@@ -591,7 +599,9 @@ public class HBaseReadWrite {
         decrementStorageDescriptorRefCount(oldParts.get(i).getSd());
         hash = putStorageDescriptor(newParts.get(i).getSd());
       }
-      byte[][] serialized = HBaseUtils.serializePartition(newParts.get(i), hash);
+      Partition newPart = newParts.get(i);
+      byte[][] serialized = HBaseUtils.serializePartition(newPart,
+          HBaseUtils.getPartitionKeyTypes(getTable(newPart.getDbName(), newPart.getTableName()).getPartitionKeys()), hash);
       Put p = new Put(serialized[0]);
       p.add(CATALOG_CF, CATALOG_COL, serialized[1]);
       puts.add(p);
@@ -624,8 +634,9 @@ public class HBaseReadWrite {
           ? new ArrayList<>(cached).subList(0, maxPartitions)
           : new ArrayList<>(cached);
     }
-    byte[] keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(dbName, tableName);
-    List<Partition> parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), -1, null);
+    byte[] keyPrefix = HBaseUtils.buildPartitionKey(dbName, tableName, new ArrayList<String>(),
+        new ArrayList<String>(), false);
+    List<Partition> parts = scanPartitionsWithFilter(dbName, tableName, keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), -1, null);
     partCache.put(dbName, tableName, parts, true);
     return maxPartitions < parts.size() ? parts.subList(0, maxPartitions) : parts;
   }
@@ -672,72 +683,68 @@ public class HBaseReadWrite {
     if (table == null) {
       throw new NoSuchObjectException("Unable to find table " + dbName + "." + tableName);
     }
-    if (partVals.size() == table.getPartitionKeys().size()) {
-      keyPrefix = HBaseUtils.buildKey(keyElements.toArray(new String[keyElements.size()]));
-    } else {
-      keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(keyElements.toArray(
-          new String[keyElements.size()]));
-    }
+    keyPrefix = HBaseUtils.buildPartitionKey(dbName, tableName,
+        HBaseUtils.getPartitionKeyTypes(table.getPartitionKeys().subList(0, keyElements.size()-2)),
+          keyElements.subList(0, keyElements.size()-2));
 
     // Now, build a filter out of the remaining keys
-    String regex = null;
+    List<PartitionKeyComparator.Range> ranges = new ArrayList<PartitionKeyComparator.Range>();
+    List<Operator> ops = new ArrayList<Operator>();
     if (!(partVals.size() == table.getPartitionKeys().size() && firstStar == -1)) {
-      StringBuilder buf = new StringBuilder(".*");
+
       for (int i = Math.max(0, firstStar);
            i < table.getPartitionKeys().size() && i < partVals.size(); i++) {
-        buf.append(HBaseUtils.KEY_SEPARATOR);
+
         if ("*".equals(partVals.get(i))) {
-          buf.append("[^");
-          buf.append(HBaseUtils.KEY_SEPARATOR);
-          buf.append("]+");
+          PartitionKeyComparator.Range range = new PartitionKeyComparator.Range(
+              table.getPartitionKeys().get(i).getName(),
+              new PartitionKeyComparator.Mark(partVals.get(i), true),
+              new PartitionKeyComparator.Mark(partVals.get(i), true));
+          ranges.add(range);
         } else {
-          buf.append(partVals.get(i));
+          PartitionKeyComparator.Operator op = new PartitionKeyComparator.Operator(
+              PartitionKeyComparator.Operator.Type.LIKE,
+              table.getPartitionKeys().get(i).getName(),
+              ".*");
         }
       }
-      if (partVals.size() < table.getPartitionKeys().size()) {
-        buf.append(HBaseUtils.KEY_SEPARATOR);
-        buf.append(".*");
-      }
-      regex = buf.toString();
     }
 
     Filter filter = null;
-    if (regex != null) {
-      filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regex));
+    if (!ranges.isEmpty() || !ops.isEmpty()) {
+      filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new PartitionKeyComparator(
+          StringUtils.join(HBaseUtils.getPartitionNames(table.getPartitionKeys()), ","),
+          StringUtils.join(HBaseUtils.getPartitionKeyTypes(table.getPartitionKeys()), ","),
+          ranges, ops));
     }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Scanning partitions with prefix <" + new String(keyPrefix) + "> and filter <" +
-          regex + ">");
+          filter + ">");
     }
 
-    List<Partition> parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), maxPartitions, filter);
+    List<Partition> parts = scanPartitionsWithFilter(dbName, tableName, keyPrefix,
+        HBaseUtils.getEndPrefix(keyPrefix), maxPartitions, filter);
     partCache.put(dbName, tableName, parts, false);
     return parts;
   }
 
   List<Partition> scanPartitions(String dbName, String tableName, byte[] keyStart, byte[] keyEnd,
       Filter filter, int maxPartitions) throws IOException, NoSuchObjectException {
-    List<String> keyElements = new ArrayList<>();
-    keyElements.add(dbName);
-    keyElements.add(tableName);
-
-    byte[] keyPrefix =
-        HBaseUtils.buildKeyWithTrailingSeparator(keyElements.toArray(new String[keyElements.size()]));
-    byte[] startRow = ArrayUtils.addAll(keyPrefix, keyStart);
+    byte[] startRow = keyStart;
     byte[] endRow;
     if (keyEnd == null || keyEnd.length == 0) {
       // stop when current db+table entries are over
-      endRow = HBaseUtils.getEndPrefix(keyPrefix);
+      endRow = HBaseUtils.getEndPrefix(startRow);
     } else {
-      endRow = ArrayUtils.addAll(keyPrefix, keyEnd);
+      endRow = keyEnd;
     }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Scanning partitions with start row <" + new String(startRow) + "> and end row <"
           + new String(endRow) + ">");
     }
-    return scanPartitionsWithFilter(startRow, endRow, maxPartitions, filter);
+    return scanPartitionsWithFilter(dbName, tableName, startRow, endRow, maxPartitions, filter);
   }
 
 
@@ -762,7 +769,8 @@ public class HBaseReadWrite {
       Partition p = getPartition(dbName, tableName, partVals, false);
       decrementStorageDescriptorRefCount(p.getSd());
     }
-    byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals);
+    byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName,
+        HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()), partVals);
     delete(PART_TABLE, key, null, null);
   }
 
@@ -770,7 +778,8 @@ public class HBaseReadWrite {
                                  boolean populateCache) throws IOException {
     Partition cached = partCache.get(dbName, tableName, partVals);
     if (cached != null) return cached;
-    byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals);
+    byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName,
+        HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()), partVals);
     byte[] serialized = read(PART_TABLE, key, CATALOG_CF, CATALOG_COL);
     if (serialized == null) return null;
     HBaseUtils.StorageDescriptorParts sdParts =
@@ -781,17 +790,18 @@ public class HBaseReadWrite {
     return sdParts.containingPartition;
   }
 
-  private List<Partition> scanPartitionsWithFilter(byte[] startRow, byte [] endRow,
-      int maxResults, Filter filter)
+  private List<Partition> scanPartitionsWithFilter(String dbName, String tableName,
+      byte[] startRow, byte [] endRow, int maxResults, Filter filter)
       throws IOException {
     Iterator<Result> iter =
         scan(PART_TABLE, startRow, endRow, CATALOG_CF, CATALOG_COL, filter);
+    List<FieldSchema> tablePartitions = getTable(dbName, tableName).getPartitionKeys();
     List<Partition> parts = new ArrayList<>();
     int numToFetch = maxResults < 0 ? Integer.MAX_VALUE : maxResults;
     for (int i = 0; i < numToFetch && iter.hasNext(); i++) {
       Result result = iter.next();
-      HBaseUtils.StorageDescriptorParts sdParts = HBaseUtils.deserializePartition(result.getRow(),
-          result.getValue(CATALOG_CF, CATALOG_COL));
+      HBaseUtils.StorageDescriptorParts sdParts = HBaseUtils.deserializePartition(dbName, tableName,
+          tablePartitions, result.getRow(), result.getValue(CATALOG_CF, CATALOG_COL));
       StorageDescriptor sd = getStorageDescriptor(sdParts.sdHash);
       HBaseUtils.assembleStorageDescriptor(sd, sdParts);
       parts.add(sdParts.containingPartition);
@@ -1558,7 +1568,9 @@ public class HBaseReadWrite {
 
     for (int i = 0; i < partNames.size(); i++) {
       valToPartMap.put(partVals.get(i), partNames.get(i));
-      byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, partVals.get(i));
+      byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName,
+          HBaseUtils.getPartitionKeyTypes(getTable(dbName, tblName).getPartitionKeys()),
+          partVals.get(i));
       Get get = new Get(partKey);
       for (byte[] colName : colNameBytes) {
         get.addColumn(STATS_CF, colName);
@@ -1690,9 +1702,11 @@ public class HBaseReadWrite {
     return keys;
   }
 
-  private byte[] getStatisticsKey(String dbName, String tableName, List<String> partVals) {
+  private byte[] getStatisticsKey(String dbName, String tableName, List<String> partVals) throws IOException {
     return partVals == null ? HBaseUtils.buildKey(dbName, tableName) : HBaseUtils
-        .buildPartitionKey(dbName, tableName, partVals);
+        .buildPartitionKey(dbName, tableName,
+            HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()),
+            partVals);
   }
 
   private String getStatisticsTable(List<String> partVals) {

http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 0204f37..717e094 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -541,7 +541,8 @@ public class HBaseStore implements RawStore {
     boolean commit = false;
     openTransaction();
     try {
-      List<Partition> oldParts = getHBase().getPartitions(db_name, tbl_name, part_vals_list);
+      List<Partition> oldParts = getHBase().getPartitions(db_name, tbl_name,
+          HBaseUtils.getPartitionKeyTypes(getTable(db_name, tbl_name).getPartitionKeys()), part_vals_list);
       getHBase().replacePartitions(oldParts, new_parts);
       for (List<String> part_vals : part_vals_list) {
         getHBase().getStatsCache().invalidate(db_name, tbl_name,
@@ -634,10 +635,8 @@ public class HBaseStore implements RawStore {
     if (table == null) {
       throw new NoSuchObjectException("Unable to find table " + dbName + "." + tblName);
     }
-    String firstPartitionColumn = table.getPartitionKeys().get(0).getName();
     // general hbase filter plan from expression tree
-    PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, firstPartitionColumn);
-
+    PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, table.getPartitionKeys());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Hbase Filter Plan generated : " + planRes.plan);
     }
@@ -648,7 +647,9 @@ public class HBaseStore implements RawStore {
     for (ScanPlan splan : planRes.plan.getPlans()) {
       try {
         List<Partition> parts = getHBase().scanPartitions(dbName, tblName,
-            splan.getStartRowSuffix(), splan.getEndRowSuffix(), null, -1);
+            splan.getStartRowSuffix(dbName, tblName, table.getPartitionKeys()),
+            splan.getEndRowSuffix(dbName, tblName, table.getPartitionKeys()),
+            splan.getFilter(table.getPartitionKeys()), -1);
         boolean reachedMax = false;
         for (Partition part : parts) {
           mergedParts.put(part.getValues(), part);

http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
index 62bb4de..b6fa591 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
@@ -18,11 +18,14 @@
  */
 package org.apache.hadoop.hive.metastore.hbase;
 
+import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
@@ -50,6 +53,19 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDeWithEndPrefix;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hive.common.util.BloomFilter;
 
 import java.io.IOException;
@@ -63,6 +79,7 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -712,15 +729,31 @@ class HBaseUtils {
     return sd;
   }
 
+  static List<String> getPartitionKeyTypes(List<FieldSchema> parts) {
+    com.google.common.base.Function<FieldSchema, String> fieldSchemaToType =
+        new com.google.common.base.Function<FieldSchema, String>() {
+      public String apply(FieldSchema fs) { return fs.getType(); }
+    };
+    return Lists.transform(parts, fieldSchemaToType);
+  }
+
+  static List<String> getPartitionNames(List<FieldSchema> parts) {
+    com.google.common.base.Function<FieldSchema, String> fieldSchemaToName =
+        new com.google.common.base.Function<FieldSchema, String>() {
+      public String apply(FieldSchema fs) { return fs.getName(); }
+    };
+    return Lists.transform(parts, fieldSchemaToName);
+  }
+
   /**
    * Serialize a partition
    * @param part partition object
    * @param sdHash hash that is being used as a key for the enclosed storage descriptor
    * @return First element is the key, second is the serialized partition
    */
-  static byte[][] serializePartition(Partition part, byte[] sdHash) {
+  static byte[][] serializePartition(Partition part, List<String> partTypes, byte[] sdHash) {
     byte[][] result = new byte[2][];
-    result[0] = buildPartitionKey(part.getDbName(), part.getTableName(), part.getValues());
+    result[0] = buildPartitionKey(part.getDbName(), part.getTableName(), partTypes, part.getValues());
     HbaseMetastoreProto.Partition.Builder builder = HbaseMetastoreProto.Partition.newBuilder();
     builder
         .setCreateTime(part.getCreateTime())
@@ -735,11 +768,54 @@ class HBaseUtils {
     return result;
   }
 
-  static byte[] buildPartitionKey(String dbName, String tableName, List<String> partVals) {
-    Deque<String> keyParts = new ArrayDeque<>(partVals);
-    keyParts.addFirst(tableName);
-    keyParts.addFirst(dbName);
-    return buildKey(keyParts.toArray(new String[keyParts.size()]));
+  static byte[] buildPartitionKey(String dbName, String tableName, List<String> partTypes, List<String> partVals) {
+    return buildPartitionKey(dbName, tableName, partTypes, partVals, false);
+  }
+
+  static byte[] buildPartitionKey(String dbName, String tableName, List<String> partTypes, List<String> partVals, boolean endPrefix) {
+    Object[] components = new Object[partVals.size()];
+    for (int i=0;i<partVals.size();i++) {
+      TypeInfo expectedType =
+          TypeInfoUtils.getTypeInfoFromTypeString(partTypes.get(i));
+      ObjectInspector outputOI =
+          TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(expectedType);
+      Converter converter = ObjectInspectorConverters.getConverter(
+          PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+      components[i] = converter.convert(partVals.get(i));
+    }
+
+    return buildSerializedPartitionKey(dbName, tableName, partTypes, components, endPrefix);
+  }
+
+  static byte[] buildSerializedPartitionKey(String dbName, String tableName, List<String> partTypes, Object[] components, boolean endPrefix) {
+    ObjectInspector javaStringOI =
+        PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.STRING);
+    Object[] data = new Object[components.length+2];
+    List<ObjectInspector> fois = new ArrayList<ObjectInspector>(components.length+2);
+    boolean[] endPrefixes = new boolean[components.length+2];
+
+    data[0] = dbName;
+    fois.add(javaStringOI);
+    endPrefixes[0] = false;
+    data[1] = tableName;
+    fois.add(javaStringOI);
+    endPrefixes[1] = false;
+
+    for (int i = 0; i < components.length; i++) {
+      data[i+2] = components[i];
+      TypeInfo expectedType =
+          TypeInfoUtils.getTypeInfoFromTypeString(partTypes.get(i));
+      ObjectInspector outputOI =
+          TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(expectedType);
+      fois.add(outputOI);
+    }
+    Output output = new Output();
+    try {
+      BinarySortableSerDeWithEndPrefix.serializeStruct(output, data, fois, endPrefix);
+    } catch (SerDeException e) {
+      throw new RuntimeException("Cannot serialize partition " + StringUtils.join(components, ","));
+    }
+    return Arrays.copyOf(output.getData(), output.getLength());
   }
 
   static class StorageDescriptorParts {
@@ -771,11 +847,10 @@ class HBaseUtils {
    * @param serialized the value fetched from HBase
    * @return A struct that contains the partition plus parts of the storage descriptor
    */
-  static StorageDescriptorParts deserializePartition(byte[] key, byte[] serialized)
-      throws InvalidProtocolBufferException {
-    String[] keys = deserializeKey(key);
-    return deserializePartition(keys[0], keys[1],
-        Arrays.asList(Arrays.copyOfRange(keys, 2, keys.length)), serialized);
+  static StorageDescriptorParts deserializePartition(String dbName, String tableName, List<FieldSchema> partitions,
+      byte[] key, byte[] serialized) throws InvalidProtocolBufferException {
+    List keys = deserializePartitionKey(partitions, key);
+    return deserializePartition(dbName, tableName, keys, serialized);
   }
 
   /**
@@ -811,6 +886,36 @@ class HBaseUtils {
     return k.split(KEY_SEPARATOR_STR);
   }
 
+  private static List<String> deserializePartitionKey(List<FieldSchema> partitions, byte[] key) {
+    StringBuffer names = new StringBuffer();
+    names.append("dbName,tableName,");
+    StringBuffer types = new StringBuffer();
+    types.append("string,string,");
+    for (int i=0;i<partitions.size();i++) {
+      names.append(partitions.get(i).getName());
+      types.append(TypeInfoUtils.getTypeInfoFromTypeString(partitions.get(i).getType()));
+      if (i!=partitions.size()-1) {
+        names.append(",");
+        types.append(",");
+      }
+    }
+    BinarySortableSerDe serDe = new BinarySortableSerDe();
+    Properties props = new Properties();
+    props.setProperty(serdeConstants.LIST_COLUMNS, names.toString());
+    props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types.toString());
+    try {
+      serDe.initialize(new Configuration(), props);
+      List deserializedkeys = ((List)serDe.deserialize(new BytesWritable(key))).subList(2, partitions.size()+2);
+      List<String> partitionKeys = new ArrayList<String>();
+      for (Object deserializedKey : deserializedkeys) {
+        partitionKeys.add(deserializedKey.toString());
+      }
+      return partitionKeys;
+    } catch (SerDeException e) {
+      throw new RuntimeException("Error when deserialize key", e);
+    }
+  }
+
   /**
    * Serialize a table
    * @param table table object

http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java
new file mode 100644
index 0000000..01fe403
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.hbase;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hive.metastore.hbase.HbaseMetastoreProto.PartitionKeyComparator.Operator.Type;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class PartitionKeyComparator extends ByteArrayComparable {
+  private static final Log LOG = LogFactory.getLog(PartitionKeyComparator.class);
+  static class Mark {
+    Mark(String value, boolean inclusive) {
+      this.value = value;
+      this.inclusive = inclusive;
+    }
+    String value;
+    boolean inclusive;
+    public String toString() {
+      return value + (inclusive?"_":"");
+    }
+  }
+  static class Range {
+    Range(String keyName, Mark start, Mark end) {
+      this.keyName = keyName;
+      this.start = start;
+      this.end = end;
+    }
+    String keyName;
+    Mark start;
+    Mark end;
+    public String toString() {
+      return "" + keyName + ":" + (start!=null?start.toString():"") + (end!=null?end.toString():"");
+    }
+  }
+  // Cache the information derived from ranges for performance, including
+  // range in native datatype
+  static class NativeRange {
+    int pos;
+    Comparable start;
+    Comparable end;
+  }
+  static class Operator {
+    public Operator(Type type, String keyName, String val) {
+      this.type = type;
+      this.keyName = keyName;
+      this.val = val;
+    }
+    enum Type {
+      LIKE, NOTEQUALS
+    };
+    Type type;
+    String keyName;
+    String val;
+  }
+  static class NativeOperator {
+    int pos;
+    Comparable val;
+  }
+  String names;
+  String types;
+  List<Range> ranges;
+  List<NativeRange> nativeRanges;
+  List<Operator> ops;
+  List<NativeOperator> nativeOps;
+  Properties serdeProps;
+  public PartitionKeyComparator(String names, String types, List<Range> ranges, List<Operator> ops) {
+    super(null);
+    this.names = names;
+    this.types = types;
+    this.ranges = ranges;
+    this.ops = ops;
+    serdeProps = new Properties();
+    serdeProps.setProperty(serdeConstants.LIST_COLUMNS, "dbName,tableName," + names);
+    serdeProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string,string," + types);
+
+    this.nativeRanges = new ArrayList<NativeRange>(this.ranges.size());
+    for (int i=0;i<ranges.size();i++) {
+      Range range = ranges.get(i);
+      NativeRange nativeRange = new NativeRange();;
+      nativeRanges.add(i, nativeRange);
+      nativeRange.pos = Arrays.asList(names.split(",")).indexOf(range.keyName);
+      TypeInfo expectedType =
+          TypeInfoUtils.getTypeInfoFromTypeString(types.split(",")[nativeRange.pos]);
+      ObjectInspector outputOI =
+          TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(expectedType);
+      nativeRange.start = null;
+      if (range.start != null) {
+        Converter converter = ObjectInspectorConverters.getConverter(
+            PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+        nativeRange.start = (Comparable)converter.convert(range.start.value);
+      }
+      nativeRange.end = null;
+      if (range.end != null) {
+        Converter converter = ObjectInspectorConverters.getConverter(
+            PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+        nativeRange.end = (Comparable)converter.convert(range.end.value);
+      }
+    }
+
+    this.nativeOps = new ArrayList<NativeOperator>(this.ops.size());
+    for (int i=0;i<ops.size();i++) {
+      Operator op = ops.get(i);
+      NativeOperator nativeOp = new NativeOperator();
+      nativeOps.add(i, nativeOp);
+      nativeOp.pos = ArrayUtils.indexOf(names.split(","), op.keyName);
+      TypeInfo expectedType =
+          TypeInfoUtils.getTypeInfoFromTypeString(types.split(",")[nativeOp.pos]);
+      ObjectInspector outputOI =
+          TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(expectedType);
+      Converter converter = ObjectInspectorConverters.getConverter(
+          PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+      nativeOp.val = (Comparable)converter.convert(op.val);
+    }
+  }
+
+  public static PartitionKeyComparator parseFrom(final byte [] bytes) {
+    HbaseMetastoreProto.PartitionKeyComparator proto;
+    try {
+      proto = HbaseMetastoreProto.PartitionKeyComparator.parseFrom(bytes);
+    } catch (InvalidProtocolBufferException e) {
+      throw new RuntimeException(e);
+    }
+    List<Range> ranges = new ArrayList<Range>();
+    for (HbaseMetastoreProto.PartitionKeyComparator.Range range : proto.getRangeList()) {
+      Mark start = null;
+      if (range.hasStart()) {
+        start = new Mark(range.getStart().getValue(), range.getStart().getInclusive());
+      }
+      Mark end = null;
+      if (range.hasEnd()) {
+        end = new Mark(range.getEnd().getValue(), range.getEnd().getInclusive());
+      }
+      ranges.add(new Range(range.getKey(), start, end));
+    }
+    List<Operator> ops = new ArrayList<Operator>();
+    for (HbaseMetastoreProto.PartitionKeyComparator.Operator op : proto.getOpList()) {
+      ops.add(new Operator(Operator.Type.valueOf(op.getType().name()), op.getKey(),
+          op.getVal()));
+    }
+    return new PartitionKeyComparator(proto.getNames(), proto.getTypes(), ranges, ops);
+  }
+
+  @Override
+  public byte[] toByteArray() {
+    HbaseMetastoreProto.PartitionKeyComparator.Builder builder = 
+        HbaseMetastoreProto.PartitionKeyComparator.newBuilder();
+    builder.setNames(names);
+    builder.setTypes(types);
+    for (int i=0;i<ranges.size();i++) {
+      Range range = ranges.get(i);
+      HbaseMetastoreProto.PartitionKeyComparator.Mark startMark = null;
+      if (range.start != null) {
+        startMark = HbaseMetastoreProto.PartitionKeyComparator.Mark.newBuilder()
+          .setValue(range.start.value)
+          .setInclusive(range.start.inclusive)
+          .build();
+      }
+      HbaseMetastoreProto.PartitionKeyComparator.Mark endMark = null;
+      if (range.end != null) {
+        endMark = HbaseMetastoreProto.PartitionKeyComparator.Mark.newBuilder()
+          .setValue(range.end.value)
+          .setInclusive(range.end.inclusive)
+          .build();
+      }
+        
+      HbaseMetastoreProto.PartitionKeyComparator.Range.Builder rangeBuilder = 
+        HbaseMetastoreProto.PartitionKeyComparator.Range.newBuilder();
+      rangeBuilder.setKey(range.keyName);
+      if (startMark != null) {
+        rangeBuilder.setStart(startMark);
+      }
+      if (endMark != null) {
+        rangeBuilder.setEnd(endMark);
+      }
+      builder.addRange(rangeBuilder.build());
+    }
+    for (int i=0;i<ops.size();i++) {
+      Operator op = ops.get(i);
+      builder.addOp(HbaseMetastoreProto.PartitionKeyComparator.Operator.newBuilder()
+        .setKey(op.keyName)
+        .setType(Type.valueOf(op.type.toString()))
+        .setVal(op.val).build());
+    }
+    return builder.build().toByteArray();
+  }
+
+  @Override
+  public int compareTo(byte[] value, int offset, int length) {
+    byte[] bytes = Arrays.copyOfRange(value, offset, offset + length);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Get key " + new String(bytes));
+    }
+    BinarySortableSerDe serDe = new BinarySortableSerDe();
+    List deserializedkeys = null;
+    try {
+      serDe.initialize(new Configuration(), serdeProps);
+      deserializedkeys = ((List)serDe.deserialize(new BytesWritable(bytes))).subList(2, 2 + names.split(",").length);
+    } catch (SerDeException e) {
+      // don't bother with failed deserialization, continue with next key
+      return 1;
+    }
+    for (int i=0;i<ranges.size();i++) {
+      Range range = ranges.get(i);
+      NativeRange nativeRange = nativeRanges.get(i);
+
+      Comparable partVal = (Comparable)deserializedkeys.get(nativeRange.pos);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Try to match range " + partVal + ", start " + nativeRange.start + ", end "
+            + nativeRange.end);
+      }
+      if (range.start == null || range.start.inclusive && partVal.compareTo(nativeRange.start)>=0 ||
+          !range.start.inclusive && partVal.compareTo(nativeRange.start)>0) {
+        if (range.end == null || range.end.inclusive && partVal.compareTo(nativeRange.end)<=0 ||
+            !range.end.inclusive && partVal.compareTo(nativeRange.end)<0) {
+          continue;
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Fail to match range " + range.keyName + "-" + partVal + "[" + nativeRange.start
+            + "," + nativeRange.end + "]");
+      }
+      return 1;
+    }
+
+    for (int i=0;i<ops.size();i++) {
+      Operator op = ops.get(i);
+      NativeOperator nativeOp = nativeOps.get(i);
+      switch (op.type) {
+      case LIKE:
+        if (!deserializedkeys.get(nativeOp.pos).toString().matches(op.val)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Fail to match operator " + op.keyName + "(" + deserializedkeys.get(nativeOp.pos)
+                + ") LIKE " + nativeOp.val);
+          }
+          return 1;
+        }
+        break;
+      case NOTEQUALS:
+        if (nativeOp.val.equals(deserializedkeys.get(nativeOp.pos))) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Fail to match operator " + op.keyName + "(" + deserializedkeys.get(nativeOp.pos)
+                + ")!=" + nativeOp.val);
+          }
+          return 1;
+        }
+        break;
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("All conditions satisfied:" + deserializedkeys);
+    }
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
----------------------------------------------------------------------
diff --git a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
index cba3671..0d0ef89 100644
--- a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
+++ b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
@@ -255,3 +255,28 @@ message Table {
   optional PrincipalPrivilegeSet privileges = 13;
   optional bool is_temporary = 14;
 }
+
+message PartitionKeyComparator {
+  required string names = 1;
+  required string types = 2;
+  message Mark {
+    required string value = 1;
+    required bool inclusive = 2;
+  }
+  message Range {
+    required string key = 1;
+    optional Mark start = 2;
+    optional Mark end = 3;
+  }
+  message Operator {
+    enum Type {
+      LIKE = 0;
+      NOTEQUALS = 1;
+    }
+    required Type type = 1;
+    required string key = 2;
+    required string val = 3;
+  }
+  repeated Operator op = 3;
+  repeated Range range = 4;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java
index 5943d14..06884b3 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java
@@ -18,12 +18,17 @@
  */
 package org.apache.hadoop.hive.metastore.hbase;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.metastore.PartFilterExprUtil;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.FilterPlan;
 import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.MultiScanPlan;
-import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PartitionFilterGenerator;
 import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PlanResult;
 import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan;
 import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan.ScanMarker;
@@ -35,6 +40,8 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.primitives.Shorts;
+
 public class TestHBaseFilterPlanUtil {
   final boolean INCLUSIVE = true;
 
@@ -68,31 +75,28 @@ public class TestHBaseFilterPlanUtil {
     ScanMarker r;
 
     // equal plans
-    l = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE);
-    r = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE);
+    l = new ScanMarker("1", INCLUSIVE, "int");
+    r = new ScanMarker("1", INCLUSIVE, "int");
     assertFirstGreater(l, r);
 
-    l = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE);
-    r = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE);
+    l = new ScanMarker("1", !INCLUSIVE, "int");
+    r = new ScanMarker("1", !INCLUSIVE, "int");
     assertFirstGreater(l, r);
 
-    l = new ScanMarker(null, !INCLUSIVE);
-    r = new ScanMarker(null, !INCLUSIVE);
-    assertFirstGreater(l, r);
+    assertFirstGreater(null, null);
 
     // create l is greater because of inclusive flag
-    l = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE);
-    r = new ScanMarker(null, !INCLUSIVE);
+    l = new ScanMarker("1", !INCLUSIVE, "int");
     // the rule for null vs non-null is different
     // non-null is both smaller and greater than null
-    Assert.assertEquals(l, ScanPlan.getComparedMarker(l, r, true));
-    Assert.assertEquals(l, ScanPlan.getComparedMarker(r, l, true));
-    Assert.assertEquals(l, ScanPlan.getComparedMarker(l, r, false));
-    Assert.assertEquals(l, ScanPlan.getComparedMarker(r, l, false));
+    Assert.assertEquals(l, ScanPlan.getComparedMarker(l, null, true));
+    Assert.assertEquals(l, ScanPlan.getComparedMarker(null, l, true));
+    Assert.assertEquals(l, ScanPlan.getComparedMarker(l, null, false));
+    Assert.assertEquals(l, ScanPlan.getComparedMarker(null, l, false));
 
     // create l that is greater because of the bytes
-    l = new ScanMarker(new byte[] { 1, 2, 0 }, INCLUSIVE);
-    r = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE);
+    l = new ScanMarker("2", INCLUSIVE, "int");
+    r = new ScanMarker("1", INCLUSIVE, "int");
     assertFirstGreater(l, r);
 
   }
@@ -111,36 +115,30 @@ public class TestHBaseFilterPlanUtil {
   public void testScanPlanAnd() {
     ScanPlan l = new ScanPlan();
     ScanPlan r = new ScanPlan();
-    l.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE));
-    r.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE));
+    l.setStartMarker("a", "int", "10", INCLUSIVE);
+    r.setStartMarker("a", "int", "10", INCLUSIVE);
 
     ScanPlan res;
     // both equal
     res = l.and(r).getPlans().get(0);
-    Assert.assertEquals(new ScanMarker(new byte[] { 10 }, INCLUSIVE), res.getStartMarker());
+    Assert.assertEquals(new ScanMarker("10", INCLUSIVE, "int"), res.markers.get("a").startMarker);
 
     // add equal end markers as well, and test AND again
-    l.setEndMarker(new ScanMarker(new byte[] { 20 }, INCLUSIVE));
-    r.setEndMarker(new ScanMarker(new byte[] { 20 }, INCLUSIVE));
+    l.setEndMarker("a", "int", "20", INCLUSIVE);
+    r.setEndMarker("a", "int", "20", INCLUSIVE);
     res = l.and(r).getPlans().get(0);
-    Assert.assertEquals(new ScanMarker(new byte[] { 10 }, INCLUSIVE), res.getStartMarker());
-    Assert.assertEquals(new ScanMarker(new byte[] { 20 }, INCLUSIVE), res.getEndMarker());
-
-    l.setEndMarker(new ScanMarker(null, INCLUSIVE));
-    r.setStartMarker(new ScanMarker(null, !INCLUSIVE));
-    // markers with non null bytes are both lesser and greator
-    Assert.assertEquals(l.getStartMarker(), res.getStartMarker());
-    Assert.assertEquals(r.getEndMarker(), res.getEndMarker());
+    Assert.assertEquals(new ScanMarker("10", INCLUSIVE, "int"), res.markers.get("a").startMarker);
+    Assert.assertEquals(new ScanMarker("20", INCLUSIVE, "int"), res.markers.get("a").endMarker);
 
-    l.setStartMarker(new ScanMarker(new byte[] { 10, 11 }, !INCLUSIVE));
-    l.setEndMarker(new ScanMarker(new byte[] { 20, 21 }, INCLUSIVE));
+    l.setStartMarker("a", "int", "10", !INCLUSIVE);
+    l.setEndMarker("a", "int", "20", INCLUSIVE);
 
-    r.setStartMarker(new ScanMarker(new byte[] { 10, 10 }, INCLUSIVE));
-    r.setEndMarker(new ScanMarker(new byte[] { 15 }, INCLUSIVE));
+    r.setStartMarker("a", "int", "10", INCLUSIVE);
+    r.setEndMarker("a", "int", "15", INCLUSIVE);
     res = l.and(r).getPlans().get(0);
     // start of l is greater, end of r is smaller
-    Assert.assertEquals(l.getStartMarker(), res.getStartMarker());
-    Assert.assertEquals(r.getEndMarker(), res.getEndMarker());
+    Assert.assertEquals(l.markers.get("a").startMarker, res.markers.get("a").startMarker);
+    Assert.assertEquals(r.markers.get("a").endMarker, res.markers.get("a").endMarker);
 
   }
 
@@ -151,13 +149,13 @@ public class TestHBaseFilterPlanUtil {
   public void testScanPlanOr() {
     ScanPlan l = new ScanPlan();
     ScanPlan r = new ScanPlan();
-    l.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE));
-    r.setStartMarker(new ScanMarker(new byte[] { 11 }, INCLUSIVE));
+    l.setStartMarker("a", "int", "1", INCLUSIVE);
+    r.setStartMarker("a", "int", "11", INCLUSIVE);
 
     FilterPlan res1 = l.or(r);
     Assert.assertEquals(2, res1.getPlans().size());
-    res1.getPlans().get(0).getStartMarker().equals(l.getStartMarker());
-    res1.getPlans().get(1).getStartMarker().equals(r.getStartMarker());
+    res1.getPlans().get(0).markers.get("a").startMarker.equals(l.markers.get("a").startMarker);
+    res1.getPlans().get(1).markers.get("a").startMarker.equals(r.markers.get("a").startMarker);
 
     FilterPlan res2 = res1.or(r);
     Assert.assertEquals(3, res2.getPlans().size());
@@ -223,72 +221,71 @@ public class TestHBaseFilterPlanUtil {
 
     final String KEY = "k1";
     final String VAL = "v1";
-    final byte[] VAL_BYTES = PartitionFilterGenerator.toBytes(VAL);
+    final String OTHERKEY = "k2";
     LeafNode l = new LeafNode();
     l.keyName = KEY;
     l.value = VAL;
-    final ScanMarker DEFAULT_SCANMARKER = new ScanMarker(null, false);
+    final ScanMarker DEFAULT_SCANMARKER = null;
+    List<FieldSchema> parts = new ArrayList<FieldSchema>();
+    parts.add(new FieldSchema(KEY, "int", null));
+    parts.add(new FieldSchema(OTHERKEY, "int", null));
 
     l.operator = Operator.EQUALS;
-    verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, INCLUSIVE), new ScanMarker(VAL_BYTES, INCLUSIVE));
+    verifyPlan(l, parts, KEY, new ScanMarker(VAL, INCLUSIVE, "int"), new ScanMarker(VAL, INCLUSIVE, "int"));
 
     l.operator = Operator.GREATERTHAN;
-    verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, !INCLUSIVE), DEFAULT_SCANMARKER);
+    verifyPlan(l, parts, KEY, new ScanMarker(VAL, !INCLUSIVE, "int"), DEFAULT_SCANMARKER);
 
     l.operator = Operator.GREATERTHANOREQUALTO;
-    verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, INCLUSIVE), DEFAULT_SCANMARKER);
+    verifyPlan(l, parts, KEY, new ScanMarker(VAL, INCLUSIVE, "int"), DEFAULT_SCANMARKER);
 
     l.operator = Operator.LESSTHAN;
-    verifyPlan(l, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL_BYTES, !INCLUSIVE));
+    verifyPlan(l, parts, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL, !INCLUSIVE, "int"));
 
     l.operator = Operator.LESSTHANOREQUALTO;
-    verifyPlan(l, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL_BYTES, INCLUSIVE));
-
-    // following leaf node plans should currently have true for 'has unsupported condition',
-    // because of the unsupported operator
-    l.operator = Operator.NOTEQUALS;
-    verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
-
-    l.operator = Operator.NOTEQUALS2;
-    verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
-
-    l.operator = Operator.LIKE;
-    verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
+    verifyPlan(l, parts, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL, INCLUSIVE, "int"));
 
     // following leaf node plans should currently have true for 'has unsupported condition',
     // because of the condition is not on first key
     l.operator = Operator.EQUALS;
-    verifyPlan(l, "NOT_FIRST_PART", DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
-
-    l.operator = Operator.NOTEQUALS;
-    verifyPlan(l, "NOT_FIRST_PART", DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
+    verifyPlan(l, parts, OTHERKEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, false);
 
     // if tree is null, it should return equivalent of full scan, and true
     // for 'has unsupported condition'
-    verifyPlan(null, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
+    verifyPlan(null, parts, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
 
   }
 
-  private void verifyPlan(TreeNode l, String keyName, ScanMarker startMarker, ScanMarker endMarker)
+  private void verifyPlan(TreeNode l, List<FieldSchema> parts, String keyName, ScanMarker startMarker, ScanMarker endMarker)
       throws MetaException {
-    verifyPlan(l, keyName, startMarker, endMarker, false);
+    verifyPlan(l, parts, keyName, startMarker, endMarker, false);
   }
 
-  private void verifyPlan(TreeNode l, String keyName, ScanMarker startMarker, ScanMarker endMarker,
+  private void verifyPlan(TreeNode l, List<FieldSchema> parts, String keyName, ScanMarker startMarker, ScanMarker endMarker,
       boolean hasUnsupportedCondition) throws MetaException {
     ExpressionTree e = null;
     if (l != null) {
       e = new ExpressionTree();
       e.setRootForTest(l);
     }
-    PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, keyName);
+    PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts);
     FilterPlan plan = planRes.plan;
     Assert.assertEquals("Has unsupported condition", hasUnsupportedCondition,
         planRes.hasUnsupportedCondition);
     Assert.assertEquals(1, plan.getPlans().size());
     ScanPlan splan = plan.getPlans().get(0);
-    Assert.assertEquals(startMarker, splan.getStartMarker());
-    Assert.assertEquals(endMarker, splan.getEndMarker());
+    if (startMarker != null) {
+      Assert.assertEquals(startMarker, splan.markers.get(keyName).startMarker);
+    } else {
+      Assert.assertTrue(splan.markers.get(keyName)==null ||
+          splan.markers.get(keyName).startMarker==null);
+    }
+    if (endMarker != null) {
+      Assert.assertEquals(endMarker, splan.markers.get(keyName).endMarker);
+    } else {
+      Assert.assertTrue(splan.markers.get(keyName)==null ||
+          splan.markers.get(keyName).endMarker==null);
+    }
   }
 
   /**
@@ -302,12 +299,13 @@ public class TestHBaseFilterPlanUtil {
     final String KEY = "k1";
     final String VAL1 = "10";
     final String VAL2 = "11";
-    final byte[] VAL1_BYTES = PartitionFilterGenerator.toBytes(VAL1);
-    final byte[] VAL2_BYTES = PartitionFilterGenerator.toBytes(VAL2);
     LeafNode l = new LeafNode();
     l.keyName = KEY;
     l.value = VAL1;
-    final ScanMarker DEFAULT_SCANMARKER = new ScanMarker(null, false);
+    final ScanMarker DEFAULT_SCANMARKER = null;
+
+    List<FieldSchema> parts = new ArrayList<FieldSchema>();
+    parts.add(new FieldSchema("k1", "int", null));
 
     LeafNode r = new LeafNode();
     r.keyName = KEY;
@@ -318,19 +316,19 @@ public class TestHBaseFilterPlanUtil {
     // verify plan for - k1 >= '10' and k1 < '11'
     l.operator = Operator.GREATERTHANOREQUALTO;
     r.operator = Operator.LESSTHAN;
-    verifyPlan(tn, KEY, new ScanMarker(VAL1_BYTES, INCLUSIVE), new ScanMarker(VAL2_BYTES,
-        !INCLUSIVE));
+    verifyPlan(tn, parts, KEY, new ScanMarker(VAL1, INCLUSIVE, "int"), new ScanMarker(VAL2,
+        !INCLUSIVE, "int"));
 
     // verify plan for - k1 >= '10' and k1 > '11'
     l.operator = Operator.GREATERTHANOREQUALTO;
     r.operator = Operator.GREATERTHAN;
-    verifyPlan(tn, KEY, new ScanMarker(VAL2_BYTES, !INCLUSIVE), DEFAULT_SCANMARKER);
+    verifyPlan(tn, parts, KEY, new ScanMarker(VAL2, !INCLUSIVE, "int"), DEFAULT_SCANMARKER);
 
     // verify plan for - k1 >= '10' or k1 > '11'
     tn = new TreeNode(l, LogicalOperator.OR, r);
     ExpressionTree e = new ExpressionTree();
     e.setRootForTest(tn);
-    PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY);
+    PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts);
     Assert.assertEquals(2, planRes.plan.getPlans().size());
     Assert.assertEquals(false, planRes.hasUnsupportedCondition);
 
@@ -338,7 +336,7 @@ public class TestHBaseFilterPlanUtil {
     TreeNode tn2 = new TreeNode(l, LogicalOperator.AND, tn);
     e = new ExpressionTree();
     e.setRootForTest(tn2);
-    planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY);
+    planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts);
     Assert.assertEquals(2, planRes.plan.getPlans().size());
     Assert.assertEquals(false, planRes.hasUnsupportedCondition);
 
@@ -351,11 +349,135 @@ public class TestHBaseFilterPlanUtil {
     TreeNode tn3 = new TreeNode(tn2, LogicalOperator.OR, klike);
     e = new ExpressionTree();
     e.setRootForTest(tn3);
-    planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY);
+    planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts);
     Assert.assertEquals(3, planRes.plan.getPlans().size());
-    Assert.assertEquals(true, planRes.hasUnsupportedCondition);
+    Assert.assertEquals(false, planRes.hasUnsupportedCondition);
+
 
+  }
 
+  @Test
+  public void testPartitionKeyScannerAllString() throws Exception {
+    List<FieldSchema> parts = new ArrayList<FieldSchema>();
+    parts.add(new FieldSchema("year", "string", null));
+    parts.add(new FieldSchema("month", "string", null));
+    parts.add(new FieldSchema("state", "string", null));
+
+    // One prefix key and one minor key range
+    ExpressionTree exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and state = 'CA'").tree;
+    PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts);
+
+    Assert.assertEquals(planRes.plan.getPlans().size(), 1);
+
+    ScanPlan sp = planRes.plan.getPlans().get(0);
+    byte[] startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+    byte[] endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+    RowFilter filter = (RowFilter)sp.getFilter(parts);
+
+    // scan range contains the major key year, rowfilter contains minor key state
+    Assert.assertTrue(Bytes.contains(startRowSuffix, "2015".getBytes()));
+    Assert.assertTrue(Bytes.contains(endRowSuffix, "2015".getBytes()));
+    Assert.assertFalse(Bytes.contains(startRowSuffix, "CA".getBytes()));
+    Assert.assertFalse(Bytes.contains(endRowSuffix, "CA".getBytes()));
+
+    PartitionKeyComparator comparator = (PartitionKeyComparator)filter.getComparator();
+    Assert.assertEquals(comparator.ranges.size(), 1);
+    Assert.assertEquals(comparator.ranges.get(0).keyName, "state");
+
+    // Two prefix key and one LIKE operator
+    exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and month > 10 "
+        + "and month <= 11 and state like 'C%'").tree;
+    planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts);
+
+    Assert.assertEquals(planRes.plan.getPlans().size(), 1);
+
+    sp = planRes.plan.getPlans().get(0);
+    startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+    endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+    filter = (RowFilter)sp.getFilter(parts);
+
+    // scan range contains the major key value year/month, rowfilter contains LIKE operator
+    Assert.assertTrue(Bytes.contains(startRowSuffix, "2015".getBytes()));
+    Assert.assertTrue(Bytes.contains(endRowSuffix, "2015".getBytes()));
+    Assert.assertTrue(Bytes.contains(startRowSuffix, "10".getBytes()));
+    Assert.assertTrue(Bytes.contains(endRowSuffix, "11".getBytes()));
+
+    comparator = (PartitionKeyComparator)filter.getComparator();
+    Assert.assertEquals(comparator.ops.size(), 1);
+    Assert.assertEquals(comparator.ops.get(0).keyName, "state");
+
+    // One prefix key, one minor key range and one LIKE operator
+    exprTree = PartFilterExprUtil.getFilterParser("year >= 2014 and month > 10 "
+        + "and month <= 11 and state like 'C%'").tree;
+    planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts);
+
+    Assert.assertEquals(planRes.plan.getPlans().size(), 1);
+
+    sp = planRes.plan.getPlans().get(0);
+    startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+    endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+    filter = (RowFilter)sp.getFilter(parts);
+
+    // scan range contains the major key value year (low bound), rowfilter contains minor key state
+    // and LIKE operator
+    Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes()));
+
+    comparator = (PartitionKeyComparator)filter.getComparator();
+    Assert.assertEquals(comparator.ranges.size(), 1);
+    Assert.assertEquals(comparator.ranges.get(0).keyName, "month");
+    Assert.assertEquals(comparator.ops.size(), 1);
+    Assert.assertEquals(comparator.ops.get(0).keyName, "state");
+
+    // Condition contains or
+    exprTree = PartFilterExprUtil.getFilterParser("year = 2014 and (month > 10 "
+        + "or month < 3)").tree;
+    planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts);
+
+    sp = planRes.plan.getPlans().get(0);
+    startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+    endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+    filter = (RowFilter)sp.getFilter(parts);
+
+    // The first ScanPlan contains year = 2014 and month > 10
+    Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes()));
+    Assert.assertTrue(Bytes.contains(endRowSuffix, "2014".getBytes()));
+    Assert.assertTrue(Bytes.contains(startRowSuffix, "10".getBytes()));
+
+    sp = planRes.plan.getPlans().get(1);
+    startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+    endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+    filter = (RowFilter)sp.getFilter(parts);
+
+    // The first ScanPlan contains year = 2014 and month < 3
+    Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes()));
+    Assert.assertTrue(Bytes.contains(endRowSuffix, "2014".getBytes()));
+    Assert.assertTrue(Bytes.contains(endRowSuffix, "3".getBytes()));
   }
 
+  @Test
+  public void testPartitionKeyScannerMixedType() throws Exception {
+    List<FieldSchema> parts = new ArrayList<FieldSchema>();
+    parts.add(new FieldSchema("year", "int", null));
+    parts.add(new FieldSchema("month", "int", null));
+    parts.add(new FieldSchema("state", "string", null));
+
+    // One prefix key and one minor key range
+    ExpressionTree exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and state = 'CA'").tree;
+    PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts);
+
+    Assert.assertEquals(planRes.plan.getPlans().size(), 1);
+
+    ScanPlan sp = planRes.plan.getPlans().get(0);
+    byte[] startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+    byte[] endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+    RowFilter filter = (RowFilter)sp.getFilter(parts);
+
+    // scan range contains the major key year, rowfilter contains minor key state
+    Assert.assertTrue(Bytes.contains(startRowSuffix, Shorts.toByteArray((short)2015)));
+    Assert.assertTrue(Bytes.contains(endRowSuffix, Shorts.toByteArray((short)2016)));
+
+    PartitionKeyComparator comparator = (PartitionKeyComparator)filter.getComparator();
+    Assert.assertEquals(comparator.ranges.size(), 1);
+    Assert.assertEquals(comparator.ranges.get(0).keyName, "state");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java
new file mode 100644
index 0000000..ec43ae3
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.binarysortable;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+public class BinarySortableSerDeWithEndPrefix extends BinarySortableSerDe {
+  public static void serializeStruct(Output byteStream, Object[] fieldData,
+      List<ObjectInspector> fieldOis, boolean endPrefix) throws SerDeException {
+    for (int i = 0; i < fieldData.length; i++) {
+      serialize(byteStream, fieldData[i], fieldOis.get(i), false);
+    }
+    if (endPrefix) {
+      if (fieldData[fieldData.length-1]!=null) {
+        byteStream.getData()[byteStream.getLength()-1]++;
+      } else {
+        byteStream.getData()[byteStream.getLength()-1]+=2;
+      }
+    }
+  }
+}
\ No newline at end of file