You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/09/22 23:32:01 UTC
[20/52] [abbrv] hive git commit: HIVE-10289: Support filter on
non-first partition key and non-string partition key (Daniel Dai reviewed by
Alan Gates)
http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java
index ec99685..9762309 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java
@@ -20,15 +20,30 @@ package org.apache.hadoop.hive.metastore.hbase;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.hbase.PartitionKeyComparator.Operator;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
@@ -147,7 +162,7 @@ class HBaseFilterPlanUtil {
public static class ScanPlan extends FilterPlan {
public static class ScanMarker {
- final byte[] bytes;
+ final String value;
/**
* If inclusive = true, it means that the
* marker includes those bytes.
@@ -155,20 +170,24 @@ class HBaseFilterPlanUtil {
* or ends at the next possible byte array
*/
final boolean isInclusive;
- ScanMarker(byte [] b, boolean i){
- this.bytes = b;
+ final String type;
+ ScanMarker(String obj, boolean i, String type){
+ this.value = obj;
this.isInclusive = i;
+ this.type = type;
}
@Override
public String toString() {
- return "ScanMarker [bytes=" + Arrays.toString(bytes) + ", isInclusive=" + isInclusive + "]";
+ return "ScanMarker [" + "value=" + value.toString() + ", isInclusive=" + isInclusive +
+ ", type=" + type + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + Arrays.hashCode(bytes);
+ result = prime * result + value.hashCode();
result = prime * result + (isInclusive ? 1231 : 1237);
+ result = prime * result + type.hashCode();
return result;
}
@Override
@@ -180,48 +199,118 @@ class HBaseFilterPlanUtil {
if (getClass() != obj.getClass())
return false;
ScanMarker other = (ScanMarker) obj;
- if (!Arrays.equals(bytes, other.bytes))
+ if (!value.equals(other.value))
return false;
if (isInclusive != other.isInclusive)
return false;
+ if (type != other.type)
+ return false;
return true;
}
}
- // represent Scan start
- private ScanMarker startMarker = new ScanMarker(null, false);
- // represent Scan end
- private ScanMarker endMarker = new ScanMarker(null, false);
-
- private ScanFilter filter;
-
- public ScanFilter getFilter() {
- return filter;
+ public static class ScanMarkerPair {
+ public ScanMarkerPair(ScanMarker startMarker, ScanMarker endMarker) {
+ this.startMarker = startMarker;
+ this.endMarker = endMarker;
+ }
+ ScanMarker startMarker;
+ ScanMarker endMarker;
+ }
+ // represent Scan start, partition key name -> scanMarkerPair
+ Map<String, ScanMarkerPair> markers = new HashMap<String, ScanMarkerPair>();
+ List<Operator> ops = new ArrayList<Operator>();
+
+ // Get the number of partition key prefixes which can be used in the scan range.
+ // For example, if partition key is (year, month, state)
+ // 1. year = 2015 and month >= 1 and month < 5
+ // year + month can be used in scan range, majorParts = 2
+ // 2. year = 2015 and state = 'CA'
+ // only year can be used in scan range, majorParts = 1
+ // 3. month = 10 and state = 'CA'
+ // nothing can be used in scan range, majorParts = 0
+ private int getMajorPartsCount(List<FieldSchema> parts) {
+ int majorPartsCount = 0;
+ while (majorPartsCount<parts.size() && markers.containsKey(parts.get(majorPartsCount).getName())) {
+ ScanMarkerPair pair = markers.get(parts.get(majorPartsCount).getName());
+ majorPartsCount++;
+ if (pair.startMarker!=null && pair.endMarker!=null && pair.startMarker.value.equals(pair
+ .endMarker.value) && pair.startMarker.isInclusive && pair.endMarker.isInclusive) {
+ // is equal
+ continue;
+ } else {
+ break;
+ }
+ }
+ return majorPartsCount;
}
+ public Filter getFilter(List<FieldSchema> parts) {
+ int majorPartsCount = getMajorPartsCount(parts);
+ Set<String> majorKeys = new HashSet<String>();
+ for (int i=0;i<majorPartsCount;i++) {
+ majorKeys.add(parts.get(i).getName());
+ }
- public void setFilter(ScanFilter filter) {
- this.filter = filter;
- }
+ List<String> names = HBaseUtils.getPartitionNames(parts);
+ List<PartitionKeyComparator.Range> ranges = new ArrayList<PartitionKeyComparator.Range>();
+ for (Map.Entry<String, ScanMarkerPair> entry : markers.entrySet()) {
+ if (names.contains(entry.getKey()) && !majorKeys.contains(entry.getKey())) {
+ PartitionKeyComparator.Mark startMark = null;
+ if (entry.getValue().startMarker != null) {
+ startMark = new PartitionKeyComparator.Mark(entry.getValue().startMarker.value,
+ entry.getValue().startMarker.isInclusive);
+ }
+ PartitionKeyComparator.Mark endMark = null;
+ if (entry.getValue().endMarker != null) {
+ startMark = new PartitionKeyComparator.Mark(entry.getValue().endMarker.value,
+ entry.getValue().endMarker.isInclusive);
+ }
+ PartitionKeyComparator.Range range = new PartitionKeyComparator.Range(
+ entry.getKey(), startMark, endMark);
+ ranges.add(range);
+ }
+ }
- public ScanMarker getStartMarker() {
- return startMarker;
+ if (ranges.isEmpty() && ops.isEmpty()) {
+ return null;
+ } else {
+ return new RowFilter(CompareFilter.CompareOp.EQUAL, new PartitionKeyComparator(
+ StringUtils.join(names, ","), StringUtils.join(HBaseUtils.getPartitionKeyTypes(parts), ","),
+ ranges, ops));
+ }
}
- public void setStartMarker(ScanMarker startMarker) {
- this.startMarker = startMarker;
- }
- public void setStartMarker(byte[] start, boolean isInclusive) {
- setStartMarker(new ScanMarker(start, isInclusive));
+ public void setStartMarker(String keyName, String keyType, String start, boolean isInclusive) {
+ if (markers.containsKey(keyName)) {
+ markers.get(keyName).startMarker = new ScanMarker(start, isInclusive, keyType);
+ } else {
+ ScanMarkerPair marker = new ScanMarkerPair(new ScanMarker(start, isInclusive, keyType), null);
+ markers.put(keyName, marker);
+ }
}
- public ScanMarker getEndMarker() {
- return endMarker;
+ public ScanMarker getStartMarker(String keyName) {
+ if (markers.containsKey(keyName)) {
+ return markers.get(keyName).startMarker;
+ } else {
+ return null;
+ }
}
- public void setEndMarker(ScanMarker endMarker) {
- this.endMarker = endMarker;
+ public void setEndMarker(String keyName, String keyType, String end, boolean isInclusive) {
+ if (markers.containsKey(keyName)) {
+ markers.get(keyName).endMarker = new ScanMarker(end, isInclusive, keyType);
+ } else {
+ ScanMarkerPair marker = new ScanMarkerPair(null, new ScanMarker(end, isInclusive, keyType));
+ markers.put(keyName, marker);
+ }
}
- public void setEndMarker(byte[] end, boolean isInclusive) {
- setEndMarker(new ScanMarker(end, isInclusive));
+
+ public ScanMarker getEndMarker(String keyName) {
+ if (markers.containsKey(keyName)) {
+ return markers.get(keyName).endMarker;
+ } else {
+ return null;
+ }
}
@Override
@@ -236,28 +325,33 @@ class HBaseFilterPlanUtil {
private ScanPlan and(ScanPlan other) {
// create combined FilterPlan based on existing lhs and rhs plan
ScanPlan newPlan = new ScanPlan();
+ newPlan.markers.putAll(markers);
+
+ for (String keyName : other.markers.keySet()) {
+ if (newPlan.markers.containsKey(keyName)) {
+ // create new scan start
+ ScanMarker greaterStartMarker = getComparedMarker(this.getStartMarker(keyName),
+ other.getStartMarker(keyName), true);
+ if (greaterStartMarker != null) {
+ newPlan.setStartMarker(keyName, greaterStartMarker.type, greaterStartMarker.value, greaterStartMarker.isInclusive);
+ }
+
+ // create new scan end
+ ScanMarker lesserEndMarker = getComparedMarker(this.getEndMarker(keyName), other.getEndMarker(keyName),
+ false);
+ if (lesserEndMarker != null) {
+ newPlan.setEndMarker(keyName, lesserEndMarker.type, lesserEndMarker.value, lesserEndMarker.isInclusive);
+ }
+ } else {
+ newPlan.markers.put(keyName, other.markers.get(keyName));
+ }
+ }
- // create new scan start
- ScanMarker greaterStartMarker = getComparedMarker(this.getStartMarker(),
- other.getStartMarker(), true);
- newPlan.setStartMarker(greaterStartMarker);
-
- // create new scan end
- ScanMarker lesserEndMarker = getComparedMarker(this.getEndMarker(), other.getEndMarker(),
- false);
- newPlan.setEndMarker(lesserEndMarker);
-
- // create new filter plan
- newPlan.setFilter(createCombinedFilter(this.getFilter(), other.getFilter()));
-
+ newPlan.ops.addAll(ops);
+ newPlan.ops.addAll(other.ops);
return newPlan;
}
- private ScanFilter createCombinedFilter(ScanFilter filter1, ScanFilter filter2) {
- // TODO create combined filter - filter1 && filter2
- return null;
- }
-
/**
* @param lStartMarker
* @param rStartMarker
@@ -268,13 +362,23 @@ class HBaseFilterPlanUtil {
static ScanMarker getComparedMarker(ScanMarker lStartMarker, ScanMarker rStartMarker,
boolean getGreater) {
// if one of them has null bytes, just return other
- if(lStartMarker.bytes == null) {
+ if(lStartMarker == null) {
return rStartMarker;
- } else if (rStartMarker.bytes == null) {
+ } else if (rStartMarker == null) {
return lStartMarker;
}
-
- int compareRes = compare(lStartMarker.bytes, rStartMarker.bytes);
+ TypeInfo expectedType =
+ TypeInfoUtils.getTypeInfoFromTypeString(lStartMarker.type);
+ ObjectInspector outputOI =
+ TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(expectedType);
+ Converter lConverter = ObjectInspectorConverters.getConverter(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+ Converter rConverter = ObjectInspectorConverters.getConverter(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+ Comparable lValue = (Comparable)lConverter.convert(lStartMarker.value);
+ Comparable rValue = (Comparable)rConverter.convert(rStartMarker.value);
+
+ int compareRes = lValue.compareTo(rValue);
if (compareRes == 0) {
// bytes are equal, now compare the isInclusive flags
if (lStartMarker.isInclusive == rStartMarker.isInclusive) {
@@ -287,7 +391,7 @@ class HBaseFilterPlanUtil {
isInclusive = false;
}
// else
- return new ScanMarker(lStartMarker.bytes, isInclusive);
+ return new ScanMarker(lStartMarker.value, isInclusive, lStartMarker.type);
}
if (getGreater) {
return compareRes == 1 ? lStartMarker : rStartMarker;
@@ -313,42 +417,74 @@ class HBaseFilterPlanUtil {
/**
* @return row suffix - This is appended to db + table, to generate start row for the Scan
*/
- public byte[] getStartRowSuffix() {
- if (startMarker.isInclusive) {
- return startMarker.bytes;
- } else {
- return HBaseUtils.getEndPrefix(startMarker.bytes);
+ public byte[] getStartRowSuffix(String dbName, String tableName, List<FieldSchema> parts) {
+ int majorPartsCount = getMajorPartsCount(parts);
+ List<String> majorPartTypes = new ArrayList<String>();
+ List<String> components = new ArrayList<String>();
+ boolean endPrefix = false;
+ for (int i=0;i<majorPartsCount;i++) {
+ majorPartTypes.add(parts.get(i).getType());
+ ScanMarker marker = markers.get(parts.get(i).getName()).startMarker;
+ if (marker != null) {
+ components.add(marker.value);
+ if (i==majorPartsCount-1) {
+ endPrefix = !marker.isInclusive;
+ }
+ } else {
+ components.add(null);
+ if (i==majorPartsCount-1) {
+ endPrefix = false;
+ }
+ }
}
+ byte[] bytes = HBaseUtils.buildPartitionKey(dbName, tableName, majorPartTypes, components, endPrefix);
+ return bytes;
}
/**
* @return row suffix - This is appended to db + table, to generate end row for the Scan
*/
- public byte[] getEndRowSuffix() {
- if (endMarker.isInclusive) {
- return HBaseUtils.getEndPrefix(endMarker.bytes);
- } else {
- return endMarker.bytes;
+ public byte[] getEndRowSuffix(String dbName, String tableName, List<FieldSchema> parts) {
+ int majorPartsCount = getMajorPartsCount(parts);
+ List<String> majorPartTypes = new ArrayList<String>();
+ List<String> components = new ArrayList<String>();
+ boolean endPrefix = false;
+ for (int i=0;i<majorPartsCount;i++) {
+ majorPartTypes.add(parts.get(i).getType());
+ ScanMarker marker = markers.get(parts.get(i).getName()).endMarker;
+ if (marker != null) {
+ components.add(marker.value);
+ if (i==majorPartsCount-1) {
+ endPrefix = marker.isInclusive;
+ }
+ } else {
+ components.add(null);
+ if (i==majorPartsCount-1) {
+ endPrefix = true;
+ }
+ }
+ }
+ byte[] bytes = HBaseUtils.buildPartitionKey(dbName, tableName, majorPartTypes, components, endPrefix);
+ if (components.isEmpty()) {
+ bytes[bytes.length-1]++;
}
+ return bytes;
}
@Override
public String toString() {
- return "ScanPlan [startMarker=" + startMarker + ", endMarker=" + endMarker + ", filter="
- + filter + "]";
+ StringBuffer sb = new StringBuffer();
+ sb.append("ScanPlan:\n");
+ for (Map.Entry<String, ScanMarkerPair> entry : markers.entrySet()) {
+ sb.append("key=" + entry.getKey() + "[startMarker=" + entry.getValue().startMarker
+ + ", endMarker=" + entry.getValue().endMarker + "]");
+ }
+ return sb.toString();
}
}
/**
- * represent a plan that can be used to create a hbase filter and then set in
- * Scan.setFilter()
- */
- public static class ScanFilter {
- // TODO: implement this
- }
-
- /**
* Visitor for ExpressionTree.
* It first generates the ScanPlan for the leaf nodes. The higher level nodes are
* either AND or OR operations. It then calls FilterPlan.and and FilterPlan.or with
@@ -369,9 +505,12 @@ class HBaseFilterPlanUtil {
// temporary params for current left and right side plans, for AND, OR
private FilterPlan rPlan;
- private final String firstPartcolumn;
- public PartitionFilterGenerator(String firstPartitionColumn) {
- this.firstPartcolumn = firstPartitionColumn;
+ private Map<String, String> nameToType = new HashMap<String, String>();
+
+ public PartitionFilterGenerator(List<FieldSchema> parts) {
+ for (FieldSchema part : parts) {
+ nameToType.put(part.getName(), part.getType());
+ }
}
FilterPlan getPlan() {
@@ -414,63 +553,37 @@ class HBaseFilterPlanUtil {
public void visit(LeafNode node) throws MetaException {
ScanPlan leafPlan = new ScanPlan();
curPlan = leafPlan;
- if (!isFirstParitionColumn(node.keyName)) {
- leafPlan.setFilter(generateScanFilter(node));
- return;
- }
- if (!(node.value instanceof String)) {
- // only string type is supported currently
- // treat conditions on other types as true
- return;
- }
// this is a condition on first partition column, so might influence the
// start and end of the scan
final boolean INCLUSIVE = true;
switch (node.operator) {
case EQUALS:
- leafPlan.setStartMarker(toBytes(node.value), INCLUSIVE);
- leafPlan.setEndMarker(toBytes(node.value), INCLUSIVE);
+ leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
+ leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
break;
case GREATERTHAN:
- leafPlan.setStartMarker(toBytes(node.value), !INCLUSIVE);
+ leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), !INCLUSIVE);
break;
case GREATERTHANOREQUALTO:
- leafPlan.setStartMarker(toBytes(node.value), INCLUSIVE);
+ leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
break;
case LESSTHAN:
- leafPlan.setEndMarker(toBytes(node.value), !INCLUSIVE);
+ leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), !INCLUSIVE);
break;
case LESSTHANOREQUALTO:
- leafPlan.setEndMarker(toBytes(node.value), INCLUSIVE);
+ leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE);
break;
case LIKE:
+ leafPlan.ops.add(new Operator(Operator.Type.LIKE, node.keyName, node.value.toString()));
+ break;
case NOTEQUALS:
case NOTEQUALS2:
- // TODO: create filter plan for these
- hasUnsupportedCondition = true;
+ leafPlan.ops.add(new Operator(Operator.Type.NOTEQUALS, node.keyName, node.value.toString()));
break;
}
}
- @VisibleForTesting
- static byte[] toBytes(Object value) {
- // TODO: actually implement this
- // We need to determine the actual type and use appropriate
- // serialization format for that type
- return ((String) value).getBytes(HBaseUtils.ENCODING);
- }
-
- private ScanFilter generateScanFilter(LeafNode node) {
- // TODO Auto-generated method stub
- hasUnsupportedCondition = true;
- return null;
- }
-
- private boolean isFirstParitionColumn(String keyName) {
- return keyName.equalsIgnoreCase(firstPartcolumn);
- }
-
private boolean hasUnsupportedCondition() {
return hasUnsupportedCondition;
}
@@ -486,12 +599,12 @@ class HBaseFilterPlanUtil {
}
}
- public static PlanResult getFilterPlan(ExpressionTree exprTree, String firstPartitionColumn) throws MetaException {
+ public static PlanResult getFilterPlan(ExpressionTree exprTree, List<FieldSchema> parts) throws MetaException {
if (exprTree == null) {
// TODO: if exprTree is null, we should do what ObjectStore does. See HIVE-10102
return new PlanResult(new ScanPlan(), true);
}
- PartitionFilterGenerator pGenerator = new PartitionFilterGenerator(firstPartitionColumn);
+ PartitionFilterGenerator pGenerator = new PartitionFilterGenerator(parts);
exprTree.accept(pGenerator);
return new PlanResult(pGenerator.getPlan(), pGenerator.hasUnsupportedCondition());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
index ca1582e..66c46a5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
@@ -19,8 +19,10 @@
package org.apache.hadoop.hive.metastore.hbase;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
@@ -51,6 +54,7 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.Role;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.hbase.PartitionKeyComparator.Operator;
import org.apache.hive.common.util.BloomFilter;
import java.io.IOException;
@@ -493,12 +497,12 @@ public class HBaseReadWrite {
* @return a list of partition objects.
* @throws IOException
*/
- List<Partition> getPartitions(String dbName, String tableName, List<List<String>> partValLists)
- throws IOException {
+ List<Partition> getPartitions(String dbName, String tableName, List<String> partTypes,
+ List<List<String>> partValLists) throws IOException {
List<Partition> parts = new ArrayList<>(partValLists.size());
List<Get> gets = new ArrayList<>(partValLists.size());
for (List<String> partVals : partValLists) {
- byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals);
+ byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partTypes, partVals);
Get get = new Get(key);
get.addColumn(CATALOG_CF, CATALOG_COL);
gets.add(get);
@@ -526,7 +530,8 @@ public class HBaseReadWrite {
*/
void putPartition(Partition partition) throws IOException {
byte[] hash = putStorageDescriptor(partition.getSd());
- byte[][] serialized = HBaseUtils.serializePartition(partition, hash);
+ byte[][] serialized = HBaseUtils.serializePartition(partition,
+ HBaseUtils.getPartitionKeyTypes(getTable(partition.getDbName(), partition.getTableName()).getPartitionKeys()), hash);
store(PART_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]);
partCache.put(partition.getDbName(), partition.getTableName(), partition);
}
@@ -547,7 +552,8 @@ public class HBaseReadWrite {
decrementStorageDescriptorRefCount(oldPart.getSd());
hash = putStorageDescriptor(newPart.getSd());
}
- byte[][] serialized = HBaseUtils.serializePartition(newPart, hash);
+ byte[][] serialized = HBaseUtils.serializePartition(newPart,
+ HBaseUtils.getPartitionKeyTypes(getTable(newPart.getDbName(), newPart.getTableName()).getPartitionKeys()), hash);
store(PART_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]);
partCache.put(newPart.getDbName(), newPart.getTableName(), newPart);
if (!oldPart.getTableName().equals(newPart.getTableName())) {
@@ -565,7 +571,9 @@ public class HBaseReadWrite {
List<Put> puts = new ArrayList<>(partitions.size());
for (Partition partition : partitions) {
byte[] hash = putStorageDescriptor(partition.getSd());
- byte[][] serialized = HBaseUtils.serializePartition(partition, hash);
+ List<String> partTypes = HBaseUtils.getPartitionKeyTypes(
+ getTable(partition.getDbName(), partition.getTableName()).getPartitionKeys());
+ byte[][] serialized = HBaseUtils.serializePartition(partition, partTypes, hash);
Put p = new Put(serialized[0]);
p.add(CATALOG_CF, CATALOG_COL, serialized[1]);
puts.add(p);
@@ -591,7 +599,9 @@ public class HBaseReadWrite {
decrementStorageDescriptorRefCount(oldParts.get(i).getSd());
hash = putStorageDescriptor(newParts.get(i).getSd());
}
- byte[][] serialized = HBaseUtils.serializePartition(newParts.get(i), hash);
+ Partition newPart = newParts.get(i);
+ byte[][] serialized = HBaseUtils.serializePartition(newPart,
+ HBaseUtils.getPartitionKeyTypes(getTable(newPart.getDbName(), newPart.getTableName()).getPartitionKeys()), hash);
Put p = new Put(serialized[0]);
p.add(CATALOG_CF, CATALOG_COL, serialized[1]);
puts.add(p);
@@ -624,8 +634,9 @@ public class HBaseReadWrite {
? new ArrayList<>(cached).subList(0, maxPartitions)
: new ArrayList<>(cached);
}
- byte[] keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(dbName, tableName);
- List<Partition> parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), -1, null);
+ byte[] keyPrefix = HBaseUtils.buildPartitionKey(dbName, tableName, new ArrayList<String>(),
+ new ArrayList<String>(), false);
+ List<Partition> parts = scanPartitionsWithFilter(dbName, tableName, keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), -1, null);
partCache.put(dbName, tableName, parts, true);
return maxPartitions < parts.size() ? parts.subList(0, maxPartitions) : parts;
}
@@ -672,72 +683,68 @@ public class HBaseReadWrite {
if (table == null) {
throw new NoSuchObjectException("Unable to find table " + dbName + "." + tableName);
}
- if (partVals.size() == table.getPartitionKeys().size()) {
- keyPrefix = HBaseUtils.buildKey(keyElements.toArray(new String[keyElements.size()]));
- } else {
- keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(keyElements.toArray(
- new String[keyElements.size()]));
- }
+ keyPrefix = HBaseUtils.buildPartitionKey(dbName, tableName,
+ HBaseUtils.getPartitionKeyTypes(table.getPartitionKeys().subList(0, keyElements.size()-2)),
+ keyElements.subList(0, keyElements.size()-2));
// Now, build a filter out of the remaining keys
- String regex = null;
+ List<PartitionKeyComparator.Range> ranges = new ArrayList<PartitionKeyComparator.Range>();
+ List<Operator> ops = new ArrayList<Operator>();
if (!(partVals.size() == table.getPartitionKeys().size() && firstStar == -1)) {
- StringBuilder buf = new StringBuilder(".*");
+
for (int i = Math.max(0, firstStar);
i < table.getPartitionKeys().size() && i < partVals.size(); i++) {
- buf.append(HBaseUtils.KEY_SEPARATOR);
+
if ("*".equals(partVals.get(i))) {
- buf.append("[^");
- buf.append(HBaseUtils.KEY_SEPARATOR);
- buf.append("]+");
+ PartitionKeyComparator.Range range = new PartitionKeyComparator.Range(
+ table.getPartitionKeys().get(i).getName(),
+ new PartitionKeyComparator.Mark(partVals.get(i), true),
+ new PartitionKeyComparator.Mark(partVals.get(i), true));
+ ranges.add(range);
} else {
- buf.append(partVals.get(i));
+ PartitionKeyComparator.Operator op = new PartitionKeyComparator.Operator(
+ PartitionKeyComparator.Operator.Type.LIKE,
+ table.getPartitionKeys().get(i).getName(),
+ ".*");
}
}
- if (partVals.size() < table.getPartitionKeys().size()) {
- buf.append(HBaseUtils.KEY_SEPARATOR);
- buf.append(".*");
- }
- regex = buf.toString();
}
Filter filter = null;
- if (regex != null) {
- filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regex));
+ if (!ranges.isEmpty() || !ops.isEmpty()) {
+ filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new PartitionKeyComparator(
+ StringUtils.join(HBaseUtils.getPartitionNames(table.getPartitionKeys()), ","),
+ StringUtils.join(HBaseUtils.getPartitionKeyTypes(table.getPartitionKeys()), ","),
+ ranges, ops));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Scanning partitions with prefix <" + new String(keyPrefix) + "> and filter <" +
- regex + ">");
+ filter + ">");
}
- List<Partition> parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), maxPartitions, filter);
+ List<Partition> parts = scanPartitionsWithFilter(dbName, tableName, keyPrefix,
+ HBaseUtils.getEndPrefix(keyPrefix), maxPartitions, filter);
partCache.put(dbName, tableName, parts, false);
return parts;
}
List<Partition> scanPartitions(String dbName, String tableName, byte[] keyStart, byte[] keyEnd,
Filter filter, int maxPartitions) throws IOException, NoSuchObjectException {
- List<String> keyElements = new ArrayList<>();
- keyElements.add(dbName);
- keyElements.add(tableName);
-
- byte[] keyPrefix =
- HBaseUtils.buildKeyWithTrailingSeparator(keyElements.toArray(new String[keyElements.size()]));
- byte[] startRow = ArrayUtils.addAll(keyPrefix, keyStart);
+ byte[] startRow = keyStart;
byte[] endRow;
if (keyEnd == null || keyEnd.length == 0) {
// stop when current db+table entries are over
- endRow = HBaseUtils.getEndPrefix(keyPrefix);
+ endRow = HBaseUtils.getEndPrefix(startRow);
} else {
- endRow = ArrayUtils.addAll(keyPrefix, keyEnd);
+ endRow = keyEnd;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Scanning partitions with start row <" + new String(startRow) + "> and end row <"
+ new String(endRow) + ">");
}
- return scanPartitionsWithFilter(startRow, endRow, maxPartitions, filter);
+ return scanPartitionsWithFilter(dbName, tableName, startRow, endRow, maxPartitions, filter);
}
@@ -762,7 +769,8 @@ public class HBaseReadWrite {
Partition p = getPartition(dbName, tableName, partVals, false);
decrementStorageDescriptorRefCount(p.getSd());
}
- byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals);
+ byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName,
+ HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()), partVals);
delete(PART_TABLE, key, null, null);
}
@@ -770,7 +778,8 @@ public class HBaseReadWrite {
boolean populateCache) throws IOException {
Partition cached = partCache.get(dbName, tableName, partVals);
if (cached != null) return cached;
- byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals);
+ byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName,
+ HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()), partVals);
byte[] serialized = read(PART_TABLE, key, CATALOG_CF, CATALOG_COL);
if (serialized == null) return null;
HBaseUtils.StorageDescriptorParts sdParts =
@@ -781,17 +790,18 @@ public class HBaseReadWrite {
return sdParts.containingPartition;
}
- private List<Partition> scanPartitionsWithFilter(byte[] startRow, byte [] endRow,
- int maxResults, Filter filter)
+ private List<Partition> scanPartitionsWithFilter(String dbName, String tableName,
+ byte[] startRow, byte [] endRow, int maxResults, Filter filter)
throws IOException {
Iterator<Result> iter =
scan(PART_TABLE, startRow, endRow, CATALOG_CF, CATALOG_COL, filter);
+ List<FieldSchema> tablePartitions = getTable(dbName, tableName).getPartitionKeys();
List<Partition> parts = new ArrayList<>();
int numToFetch = maxResults < 0 ? Integer.MAX_VALUE : maxResults;
for (int i = 0; i < numToFetch && iter.hasNext(); i++) {
Result result = iter.next();
- HBaseUtils.StorageDescriptorParts sdParts = HBaseUtils.deserializePartition(result.getRow(),
- result.getValue(CATALOG_CF, CATALOG_COL));
+ HBaseUtils.StorageDescriptorParts sdParts = HBaseUtils.deserializePartition(dbName, tableName,
+ tablePartitions, result.getRow(), result.getValue(CATALOG_CF, CATALOG_COL));
StorageDescriptor sd = getStorageDescriptor(sdParts.sdHash);
HBaseUtils.assembleStorageDescriptor(sd, sdParts);
parts.add(sdParts.containingPartition);
@@ -1558,7 +1568,9 @@ public class HBaseReadWrite {
for (int i = 0; i < partNames.size(); i++) {
valToPartMap.put(partVals.get(i), partNames.get(i));
- byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, partVals.get(i));
+ byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName,
+ HBaseUtils.getPartitionKeyTypes(getTable(dbName, tblName).getPartitionKeys()),
+ partVals.get(i));
Get get = new Get(partKey);
for (byte[] colName : colNameBytes) {
get.addColumn(STATS_CF, colName);
@@ -1690,9 +1702,11 @@ public class HBaseReadWrite {
return keys;
}
- private byte[] getStatisticsKey(String dbName, String tableName, List<String> partVals) {
+ private byte[] getStatisticsKey(String dbName, String tableName, List<String> partVals) throws IOException {
return partVals == null ? HBaseUtils.buildKey(dbName, tableName) : HBaseUtils
- .buildPartitionKey(dbName, tableName, partVals);
+ .buildPartitionKey(dbName, tableName,
+ HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()),
+ partVals);
}
private String getStatisticsTable(List<String> partVals) {
http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 0204f37..717e094 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -541,7 +541,8 @@ public class HBaseStore implements RawStore {
boolean commit = false;
openTransaction();
try {
- List<Partition> oldParts = getHBase().getPartitions(db_name, tbl_name, part_vals_list);
+ List<Partition> oldParts = getHBase().getPartitions(db_name, tbl_name,
+ HBaseUtils.getPartitionKeyTypes(getTable(db_name, tbl_name).getPartitionKeys()), part_vals_list);
getHBase().replacePartitions(oldParts, new_parts);
for (List<String> part_vals : part_vals_list) {
getHBase().getStatsCache().invalidate(db_name, tbl_name,
@@ -634,10 +635,8 @@ public class HBaseStore implements RawStore {
if (table == null) {
throw new NoSuchObjectException("Unable to find table " + dbName + "." + tblName);
}
- String firstPartitionColumn = table.getPartitionKeys().get(0).getName();
// general hbase filter plan from expression tree
- PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, firstPartitionColumn);
-
+ PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, table.getPartitionKeys());
if (LOG.isDebugEnabled()) {
LOG.debug("Hbase Filter Plan generated : " + planRes.plan);
}
@@ -648,7 +647,9 @@ public class HBaseStore implements RawStore {
for (ScanPlan splan : planRes.plan.getPlans()) {
try {
List<Partition> parts = getHBase().scanPartitions(dbName, tblName,
- splan.getStartRowSuffix(), splan.getEndRowSuffix(), null, -1);
+ splan.getStartRowSuffix(dbName, tblName, table.getPartitionKeys()),
+ splan.getEndRowSuffix(dbName, tblName, table.getPartitionKeys()),
+ splan.getFilter(table.getPartitionKeys()), -1);
boolean reachedMax = false;
for (Partition part : parts) {
mergedParts.put(part.getValues(), part);
http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
index 62bb4de..b6fa591 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
@@ -18,11 +18,14 @@
*/
package org.apache.hadoop.hive.metastore.hbase;
+import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
@@ -50,6 +53,19 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDeWithEndPrefix;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hive.common.util.BloomFilter;
import java.io.IOException;
@@ -63,6 +79,7 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
@@ -712,15 +729,31 @@ class HBaseUtils {
return sd;
}
+ static List<String> getPartitionKeyTypes(List<FieldSchema> parts) {
+ com.google.common.base.Function<FieldSchema, String> fieldSchemaToType =
+ new com.google.common.base.Function<FieldSchema, String>() {
+ public String apply(FieldSchema fs) { return fs.getType(); }
+ };
+ return Lists.transform(parts, fieldSchemaToType);
+ }
+
+ static List<String> getPartitionNames(List<FieldSchema> parts) {
+ com.google.common.base.Function<FieldSchema, String> fieldSchemaToName =
+ new com.google.common.base.Function<FieldSchema, String>() {
+ public String apply(FieldSchema fs) { return fs.getName(); }
+ };
+ return Lists.transform(parts, fieldSchemaToName);
+ }
+
/**
* Serialize a partition
* @param part partition object
* @param sdHash hash that is being used as a key for the enclosed storage descriptor
* @return First element is the key, second is the serialized partition
*/
- static byte[][] serializePartition(Partition part, byte[] sdHash) {
+ static byte[][] serializePartition(Partition part, List<String> partTypes, byte[] sdHash) {
byte[][] result = new byte[2][];
- result[0] = buildPartitionKey(part.getDbName(), part.getTableName(), part.getValues());
+ result[0] = buildPartitionKey(part.getDbName(), part.getTableName(), partTypes, part.getValues());
HbaseMetastoreProto.Partition.Builder builder = HbaseMetastoreProto.Partition.newBuilder();
builder
.setCreateTime(part.getCreateTime())
@@ -735,11 +768,54 @@ class HBaseUtils {
return result;
}
- static byte[] buildPartitionKey(String dbName, String tableName, List<String> partVals) {
- Deque<String> keyParts = new ArrayDeque<>(partVals);
- keyParts.addFirst(tableName);
- keyParts.addFirst(dbName);
- return buildKey(keyParts.toArray(new String[keyParts.size()]));
+ static byte[] buildPartitionKey(String dbName, String tableName, List<String> partTypes, List<String> partVals) {
+ return buildPartitionKey(dbName, tableName, partTypes, partVals, false);
+ }
+
+ static byte[] buildPartitionKey(String dbName, String tableName, List<String> partTypes, List<String> partVals, boolean endPrefix) {
+ Object[] components = new Object[partVals.size()];
+ for (int i=0;i<partVals.size();i++) {
+ TypeInfo expectedType =
+ TypeInfoUtils.getTypeInfoFromTypeString(partTypes.get(i));
+ ObjectInspector outputOI =
+ TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(expectedType);
+ Converter converter = ObjectInspectorConverters.getConverter(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+ components[i] = converter.convert(partVals.get(i));
+ }
+
+ return buildSerializedPartitionKey(dbName, tableName, partTypes, components, endPrefix);
+ }
+
+ static byte[] buildSerializedPartitionKey(String dbName, String tableName, List<String> partTypes, Object[] components, boolean endPrefix) {
+ ObjectInspector javaStringOI =
+ PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.STRING);
+ Object[] data = new Object[components.length+2];
+ List<ObjectInspector> fois = new ArrayList<ObjectInspector>(components.length+2);
+ boolean[] endPrefixes = new boolean[components.length+2];
+
+ data[0] = dbName;
+ fois.add(javaStringOI);
+ endPrefixes[0] = false;
+ data[1] = tableName;
+ fois.add(javaStringOI);
+ endPrefixes[1] = false;
+
+ for (int i = 0; i < components.length; i++) {
+ data[i+2] = components[i];
+ TypeInfo expectedType =
+ TypeInfoUtils.getTypeInfoFromTypeString(partTypes.get(i));
+ ObjectInspector outputOI =
+ TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(expectedType);
+ fois.add(outputOI);
+ }
+ Output output = new Output();
+ try {
+ BinarySortableSerDeWithEndPrefix.serializeStruct(output, data, fois, endPrefix);
+ } catch (SerDeException e) {
+ throw new RuntimeException("Cannot serialize partition " + StringUtils.join(components, ","));
+ }
+ return Arrays.copyOf(output.getData(), output.getLength());
}
static class StorageDescriptorParts {
@@ -771,11 +847,10 @@ class HBaseUtils {
* @param serialized the value fetched from HBase
* @return A struct that contains the partition plus parts of the storage descriptor
*/
- static StorageDescriptorParts deserializePartition(byte[] key, byte[] serialized)
- throws InvalidProtocolBufferException {
- String[] keys = deserializeKey(key);
- return deserializePartition(keys[0], keys[1],
- Arrays.asList(Arrays.copyOfRange(keys, 2, keys.length)), serialized);
+ static StorageDescriptorParts deserializePartition(String dbName, String tableName, List<FieldSchema> partitions,
+ byte[] key, byte[] serialized) throws InvalidProtocolBufferException {
+ List keys = deserializePartitionKey(partitions, key);
+ return deserializePartition(dbName, tableName, keys, serialized);
}
/**
@@ -811,6 +886,36 @@ class HBaseUtils {
return k.split(KEY_SEPARATOR_STR);
}
+ private static List<String> deserializePartitionKey(List<FieldSchema> partitions, byte[] key) {
+ StringBuffer names = new StringBuffer();
+ names.append("dbName,tableName,");
+ StringBuffer types = new StringBuffer();
+ types.append("string,string,");
+ for (int i=0;i<partitions.size();i++) {
+ names.append(partitions.get(i).getName());
+ types.append(TypeInfoUtils.getTypeInfoFromTypeString(partitions.get(i).getType()));
+ if (i!=partitions.size()-1) {
+ names.append(",");
+ types.append(",");
+ }
+ }
+ BinarySortableSerDe serDe = new BinarySortableSerDe();
+ Properties props = new Properties();
+ props.setProperty(serdeConstants.LIST_COLUMNS, names.toString());
+ props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types.toString());
+ try {
+ serDe.initialize(new Configuration(), props);
+ List deserializedkeys = ((List)serDe.deserialize(new BytesWritable(key))).subList(2, partitions.size()+2);
+ List<String> partitionKeys = new ArrayList<String>();
+ for (Object deserializedKey : deserializedkeys) {
+ partitionKeys.add(deserializedKey.toString());
+ }
+ return partitionKeys;
+ } catch (SerDeException e) {
+ throw new RuntimeException("Error when deserialize key", e);
+ }
+ }
+
/**
* Serialize a table
* @param table table object
http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java
new file mode 100644
index 0000000..01fe403
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.hbase;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hive.metastore.hbase.HbaseMetastoreProto.PartitionKeyComparator.Operator.Type;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class PartitionKeyComparator extends ByteArrayComparable {
+ private static final Log LOG = LogFactory.getLog(PartitionKeyComparator.class);
+ static class Mark {
+ Mark(String value, boolean inclusive) {
+ this.value = value;
+ this.inclusive = inclusive;
+ }
+ String value;
+ boolean inclusive;
+ public String toString() {
+ return value + (inclusive?"_":"");
+ }
+ }
+ static class Range {
+ Range(String keyName, Mark start, Mark end) {
+ this.keyName = keyName;
+ this.start = start;
+ this.end = end;
+ }
+ String keyName;
+ Mark start;
+ Mark end;
+ public String toString() {
+ return "" + keyName + ":" + (start!=null?start.toString():"") + (end!=null?end.toString():"");
+ }
+ }
+ // Cache the information derived from ranges for performance, including
+ // range in native datatype
+ static class NativeRange {
+ int pos;
+ Comparable start;
+ Comparable end;
+ }
+ static class Operator {
+ public Operator(Type type, String keyName, String val) {
+ this.type = type;
+ this.keyName = keyName;
+ this.val = val;
+ }
+ enum Type {
+ LIKE, NOTEQUALS
+ };
+ Type type;
+ String keyName;
+ String val;
+ }
+ static class NativeOperator {
+ int pos;
+ Comparable val;
+ }
+ String names;
+ String types;
+ List<Range> ranges;
+ List<NativeRange> nativeRanges;
+ List<Operator> ops;
+ List<NativeOperator> nativeOps;
+ Properties serdeProps;
+ public PartitionKeyComparator(String names, String types, List<Range> ranges, List<Operator> ops) {
+ super(null);
+ this.names = names;
+ this.types = types;
+ this.ranges = ranges;
+ this.ops = ops;
+ serdeProps = new Properties();
+ serdeProps.setProperty(serdeConstants.LIST_COLUMNS, "dbName,tableName," + names);
+ serdeProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string,string," + types);
+
+ this.nativeRanges = new ArrayList<NativeRange>(this.ranges.size());
+ for (int i=0;i<ranges.size();i++) {
+ Range range = ranges.get(i);
+ NativeRange nativeRange = new NativeRange();;
+ nativeRanges.add(i, nativeRange);
+ nativeRange.pos = Arrays.asList(names.split(",")).indexOf(range.keyName);
+ TypeInfo expectedType =
+ TypeInfoUtils.getTypeInfoFromTypeString(types.split(",")[nativeRange.pos]);
+ ObjectInspector outputOI =
+ TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(expectedType);
+ nativeRange.start = null;
+ if (range.start != null) {
+ Converter converter = ObjectInspectorConverters.getConverter(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+ nativeRange.start = (Comparable)converter.convert(range.start.value);
+ }
+ nativeRange.end = null;
+ if (range.end != null) {
+ Converter converter = ObjectInspectorConverters.getConverter(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+ nativeRange.end = (Comparable)converter.convert(range.end.value);
+ }
+ }
+
+ this.nativeOps = new ArrayList<NativeOperator>(this.ops.size());
+ for (int i=0;i<ops.size();i++) {
+ Operator op = ops.get(i);
+ NativeOperator nativeOp = new NativeOperator();
+ nativeOps.add(i, nativeOp);
+ nativeOp.pos = ArrayUtils.indexOf(names.split(","), op.keyName);
+ TypeInfo expectedType =
+ TypeInfoUtils.getTypeInfoFromTypeString(types.split(",")[nativeOp.pos]);
+ ObjectInspector outputOI =
+ TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(expectedType);
+ Converter converter = ObjectInspectorConverters.getConverter(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI);
+ nativeOp.val = (Comparable)converter.convert(op.val);
+ }
+ }
+
+ public static PartitionKeyComparator parseFrom(final byte [] bytes) {
+ HbaseMetastoreProto.PartitionKeyComparator proto;
+ try {
+ proto = HbaseMetastoreProto.PartitionKeyComparator.parseFrom(bytes);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ List<Range> ranges = new ArrayList<Range>();
+ for (HbaseMetastoreProto.PartitionKeyComparator.Range range : proto.getRangeList()) {
+ Mark start = null;
+ if (range.hasStart()) {
+ start = new Mark(range.getStart().getValue(), range.getStart().getInclusive());
+ }
+ Mark end = null;
+ if (range.hasEnd()) {
+ end = new Mark(range.getEnd().getValue(), range.getEnd().getInclusive());
+ }
+ ranges.add(new Range(range.getKey(), start, end));
+ }
+ List<Operator> ops = new ArrayList<Operator>();
+ for (HbaseMetastoreProto.PartitionKeyComparator.Operator op : proto.getOpList()) {
+ ops.add(new Operator(Operator.Type.valueOf(op.getType().name()), op.getKey(),
+ op.getVal()));
+ }
+ return new PartitionKeyComparator(proto.getNames(), proto.getTypes(), ranges, ops);
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ HbaseMetastoreProto.PartitionKeyComparator.Builder builder =
+ HbaseMetastoreProto.PartitionKeyComparator.newBuilder();
+ builder.setNames(names);
+ builder.setTypes(types);
+ for (int i=0;i<ranges.size();i++) {
+ Range range = ranges.get(i);
+ HbaseMetastoreProto.PartitionKeyComparator.Mark startMark = null;
+ if (range.start != null) {
+ startMark = HbaseMetastoreProto.PartitionKeyComparator.Mark.newBuilder()
+ .setValue(range.start.value)
+ .setInclusive(range.start.inclusive)
+ .build();
+ }
+ HbaseMetastoreProto.PartitionKeyComparator.Mark endMark = null;
+ if (range.end != null) {
+ endMark = HbaseMetastoreProto.PartitionKeyComparator.Mark.newBuilder()
+ .setValue(range.end.value)
+ .setInclusive(range.end.inclusive)
+ .build();
+ }
+
+ HbaseMetastoreProto.PartitionKeyComparator.Range.Builder rangeBuilder =
+ HbaseMetastoreProto.PartitionKeyComparator.Range.newBuilder();
+ rangeBuilder.setKey(range.keyName);
+ if (startMark != null) {
+ rangeBuilder.setStart(startMark);
+ }
+ if (endMark != null) {
+ rangeBuilder.setEnd(endMark);
+ }
+ builder.addRange(rangeBuilder.build());
+ }
+ for (int i=0;i<ops.size();i++) {
+ Operator op = ops.get(i);
+ builder.addOp(HbaseMetastoreProto.PartitionKeyComparator.Operator.newBuilder()
+ .setKey(op.keyName)
+ .setType(Type.valueOf(op.type.toString()))
+ .setVal(op.val).build());
+ }
+ return builder.build().toByteArray();
+ }
+
+ @Override
+ public int compareTo(byte[] value, int offset, int length) {
+ byte[] bytes = Arrays.copyOfRange(value, offset, offset + length);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Get key " + new String(bytes));
+ }
+ BinarySortableSerDe serDe = new BinarySortableSerDe();
+ List deserializedkeys = null;
+ try {
+ serDe.initialize(new Configuration(), serdeProps);
+ deserializedkeys = ((List)serDe.deserialize(new BytesWritable(bytes))).subList(2, 2 + names.split(",").length);
+ } catch (SerDeException e) {
+ // don't bother with failed deserialization, continue with next key
+ return 1;
+ }
+ for (int i=0;i<ranges.size();i++) {
+ Range range = ranges.get(i);
+ NativeRange nativeRange = nativeRanges.get(i);
+
+ Comparable partVal = (Comparable)deserializedkeys.get(nativeRange.pos);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try to match range " + partVal + ", start " + nativeRange.start + ", end "
+ + nativeRange.end);
+ }
+ if (range.start == null || range.start.inclusive && partVal.compareTo(nativeRange.start)>=0 ||
+ !range.start.inclusive && partVal.compareTo(nativeRange.start)>0) {
+ if (range.end == null || range.end.inclusive && partVal.compareTo(nativeRange.end)<=0 ||
+ !range.end.inclusive && partVal.compareTo(nativeRange.end)<0) {
+ continue;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fail to match range " + range.keyName + "-" + partVal + "[" + nativeRange.start
+ + "," + nativeRange.end + "]");
+ }
+ return 1;
+ }
+
+ for (int i=0;i<ops.size();i++) {
+ Operator op = ops.get(i);
+ NativeOperator nativeOp = nativeOps.get(i);
+ switch (op.type) {
+ case LIKE:
+ if (!deserializedkeys.get(nativeOp.pos).toString().matches(op.val)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fail to match operator " + op.keyName + "(" + deserializedkeys.get(nativeOp.pos)
+ + ") LIKE " + nativeOp.val);
+ }
+ return 1;
+ }
+ break;
+ case NOTEQUALS:
+ if (nativeOp.val.equals(deserializedkeys.get(nativeOp.pos))) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fail to match operator " + op.keyName + "(" + deserializedkeys.get(nativeOp.pos)
+ + ")!=" + nativeOp.val);
+ }
+ return 1;
+ }
+ break;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All conditions satisfied:" + deserializedkeys);
+ }
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
----------------------------------------------------------------------
diff --git a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
index cba3671..0d0ef89 100644
--- a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
+++ b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
@@ -255,3 +255,28 @@ message Table {
optional PrincipalPrivilegeSet privileges = 13;
optional bool is_temporary = 14;
}
+
+message PartitionKeyComparator {
+ required string names = 1;
+ required string types = 2;
+ message Mark {
+ required string value = 1;
+ required bool inclusive = 2;
+ }
+ message Range {
+ required string key = 1;
+ optional Mark start = 2;
+ optional Mark end = 3;
+ }
+ message Operator {
+ enum Type {
+ LIKE = 0;
+ NOTEQUALS = 1;
+ }
+ required Type type = 1;
+ required string key = 2;
+ required string val = 3;
+ }
+ repeated Operator op = 3;
+ repeated Range range = 4;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java
index 5943d14..06884b3 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java
@@ -18,12 +18,17 @@
*/
package org.apache.hadoop.hive.metastore.hbase;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.metastore.PartFilterExprUtil;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.FilterPlan;
import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.MultiScanPlan;
-import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PartitionFilterGenerator;
import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PlanResult;
import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan;
import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan.ScanMarker;
@@ -35,6 +40,8 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.primitives.Shorts;
+
public class TestHBaseFilterPlanUtil {
final boolean INCLUSIVE = true;
@@ -68,31 +75,28 @@ public class TestHBaseFilterPlanUtil {
ScanMarker r;
// equal plans
- l = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE);
- r = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE);
+ l = new ScanMarker("1", INCLUSIVE, "int");
+ r = new ScanMarker("1", INCLUSIVE, "int");
assertFirstGreater(l, r);
- l = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE);
- r = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE);
+ l = new ScanMarker("1", !INCLUSIVE, "int");
+ r = new ScanMarker("1", !INCLUSIVE, "int");
assertFirstGreater(l, r);
- l = new ScanMarker(null, !INCLUSIVE);
- r = new ScanMarker(null, !INCLUSIVE);
- assertFirstGreater(l, r);
+ assertFirstGreater(null, null);
// create l is greater because of inclusive flag
- l = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE);
- r = new ScanMarker(null, !INCLUSIVE);
+ l = new ScanMarker("1", !INCLUSIVE, "int");
// the rule for null vs non-null is different
// non-null is both smaller and greater than null
- Assert.assertEquals(l, ScanPlan.getComparedMarker(l, r, true));
- Assert.assertEquals(l, ScanPlan.getComparedMarker(r, l, true));
- Assert.assertEquals(l, ScanPlan.getComparedMarker(l, r, false));
- Assert.assertEquals(l, ScanPlan.getComparedMarker(r, l, false));
+ Assert.assertEquals(l, ScanPlan.getComparedMarker(l, null, true));
+ Assert.assertEquals(l, ScanPlan.getComparedMarker(null, l, true));
+ Assert.assertEquals(l, ScanPlan.getComparedMarker(l, null, false));
+ Assert.assertEquals(l, ScanPlan.getComparedMarker(null, l, false));
// create l that is greater because of the bytes
- l = new ScanMarker(new byte[] { 1, 2, 0 }, INCLUSIVE);
- r = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE);
+ l = new ScanMarker("2", INCLUSIVE, "int");
+ r = new ScanMarker("1", INCLUSIVE, "int");
assertFirstGreater(l, r);
}
@@ -111,36 +115,30 @@ public class TestHBaseFilterPlanUtil {
public void testScanPlanAnd() {
ScanPlan l = new ScanPlan();
ScanPlan r = new ScanPlan();
- l.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE));
- r.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE));
+ l.setStartMarker("a", "int", "10", INCLUSIVE);
+ r.setStartMarker("a", "int", "10", INCLUSIVE);
ScanPlan res;
// both equal
res = l.and(r).getPlans().get(0);
- Assert.assertEquals(new ScanMarker(new byte[] { 10 }, INCLUSIVE), res.getStartMarker());
+ Assert.assertEquals(new ScanMarker("10", INCLUSIVE, "int"), res.markers.get("a").startMarker);
// add equal end markers as well, and test AND again
- l.setEndMarker(new ScanMarker(new byte[] { 20 }, INCLUSIVE));
- r.setEndMarker(new ScanMarker(new byte[] { 20 }, INCLUSIVE));
+ l.setEndMarker("a", "int", "20", INCLUSIVE);
+ r.setEndMarker("a", "int", "20", INCLUSIVE);
res = l.and(r).getPlans().get(0);
- Assert.assertEquals(new ScanMarker(new byte[] { 10 }, INCLUSIVE), res.getStartMarker());
- Assert.assertEquals(new ScanMarker(new byte[] { 20 }, INCLUSIVE), res.getEndMarker());
-
- l.setEndMarker(new ScanMarker(null, INCLUSIVE));
- r.setStartMarker(new ScanMarker(null, !INCLUSIVE));
- // markers with non null bytes are both lesser and greator
- Assert.assertEquals(l.getStartMarker(), res.getStartMarker());
- Assert.assertEquals(r.getEndMarker(), res.getEndMarker());
+ Assert.assertEquals(new ScanMarker("10", INCLUSIVE, "int"), res.markers.get("a").startMarker);
+ Assert.assertEquals(new ScanMarker("20", INCLUSIVE, "int"), res.markers.get("a").endMarker);
- l.setStartMarker(new ScanMarker(new byte[] { 10, 11 }, !INCLUSIVE));
- l.setEndMarker(new ScanMarker(new byte[] { 20, 21 }, INCLUSIVE));
+ l.setStartMarker("a", "int", "10", !INCLUSIVE);
+ l.setEndMarker("a", "int", "20", INCLUSIVE);
- r.setStartMarker(new ScanMarker(new byte[] { 10, 10 }, INCLUSIVE));
- r.setEndMarker(new ScanMarker(new byte[] { 15 }, INCLUSIVE));
+ r.setStartMarker("a", "int", "10", INCLUSIVE);
+ r.setEndMarker("a", "int", "15", INCLUSIVE);
res = l.and(r).getPlans().get(0);
// start of l is greater, end of r is smaller
- Assert.assertEquals(l.getStartMarker(), res.getStartMarker());
- Assert.assertEquals(r.getEndMarker(), res.getEndMarker());
+ Assert.assertEquals(l.markers.get("a").startMarker, res.markers.get("a").startMarker);
+ Assert.assertEquals(r.markers.get("a").endMarker, res.markers.get("a").endMarker);
}
@@ -151,13 +149,13 @@ public class TestHBaseFilterPlanUtil {
public void testScanPlanOr() {
ScanPlan l = new ScanPlan();
ScanPlan r = new ScanPlan();
- l.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE));
- r.setStartMarker(new ScanMarker(new byte[] { 11 }, INCLUSIVE));
+ l.setStartMarker("a", "int", "1", INCLUSIVE);
+ r.setStartMarker("a", "int", "11", INCLUSIVE);
FilterPlan res1 = l.or(r);
Assert.assertEquals(2, res1.getPlans().size());
- res1.getPlans().get(0).getStartMarker().equals(l.getStartMarker());
- res1.getPlans().get(1).getStartMarker().equals(r.getStartMarker());
+ res1.getPlans().get(0).markers.get("a").startMarker.equals(l.markers.get("a").startMarker);
+ res1.getPlans().get(1).markers.get("a").startMarker.equals(r.markers.get("a").startMarker);
FilterPlan res2 = res1.or(r);
Assert.assertEquals(3, res2.getPlans().size());
@@ -223,72 +221,71 @@ public class TestHBaseFilterPlanUtil {
final String KEY = "k1";
final String VAL = "v1";
- final byte[] VAL_BYTES = PartitionFilterGenerator.toBytes(VAL);
+ final String OTHERKEY = "k2";
LeafNode l = new LeafNode();
l.keyName = KEY;
l.value = VAL;
- final ScanMarker DEFAULT_SCANMARKER = new ScanMarker(null, false);
+ final ScanMarker DEFAULT_SCANMARKER = null;
+ List<FieldSchema> parts = new ArrayList<FieldSchema>();
+ parts.add(new FieldSchema(KEY, "int", null));
+ parts.add(new FieldSchema(OTHERKEY, "int", null));
l.operator = Operator.EQUALS;
- verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, INCLUSIVE), new ScanMarker(VAL_BYTES, INCLUSIVE));
+ verifyPlan(l, parts, KEY, new ScanMarker(VAL, INCLUSIVE, "int"), new ScanMarker(VAL, INCLUSIVE, "int"));
l.operator = Operator.GREATERTHAN;
- verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, !INCLUSIVE), DEFAULT_SCANMARKER);
+ verifyPlan(l, parts, KEY, new ScanMarker(VAL, !INCLUSIVE, "int"), DEFAULT_SCANMARKER);
l.operator = Operator.GREATERTHANOREQUALTO;
- verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, INCLUSIVE), DEFAULT_SCANMARKER);
+ verifyPlan(l, parts, KEY, new ScanMarker(VAL, INCLUSIVE, "int"), DEFAULT_SCANMARKER);
l.operator = Operator.LESSTHAN;
- verifyPlan(l, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL_BYTES, !INCLUSIVE));
+ verifyPlan(l, parts, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL, !INCLUSIVE, "int"));
l.operator = Operator.LESSTHANOREQUALTO;
- verifyPlan(l, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL_BYTES, INCLUSIVE));
-
- // following leaf node plans should currently have true for 'has unsupported condition',
- // because of the unsupported operator
- l.operator = Operator.NOTEQUALS;
- verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
-
- l.operator = Operator.NOTEQUALS2;
- verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
-
- l.operator = Operator.LIKE;
- verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
+ verifyPlan(l, parts, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL, INCLUSIVE, "int"));
// following leaf node plans should currently have true for 'has unsupported condition',
// because of the condition is not on first key
l.operator = Operator.EQUALS;
- verifyPlan(l, "NOT_FIRST_PART", DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
-
- l.operator = Operator.NOTEQUALS;
- verifyPlan(l, "NOT_FIRST_PART", DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
+ verifyPlan(l, parts, OTHERKEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, false);
// if tree is null, it should return equivalent of full scan, and true
// for 'has unsupported condition'
- verifyPlan(null, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
+ verifyPlan(null, parts, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true);
}
- private void verifyPlan(TreeNode l, String keyName, ScanMarker startMarker, ScanMarker endMarker)
+ private void verifyPlan(TreeNode l, List<FieldSchema> parts, String keyName, ScanMarker startMarker, ScanMarker endMarker)
throws MetaException {
- verifyPlan(l, keyName, startMarker, endMarker, false);
+ verifyPlan(l, parts, keyName, startMarker, endMarker, false);
}
- private void verifyPlan(TreeNode l, String keyName, ScanMarker startMarker, ScanMarker endMarker,
+ private void verifyPlan(TreeNode l, List<FieldSchema> parts, String keyName, ScanMarker startMarker, ScanMarker endMarker,
boolean hasUnsupportedCondition) throws MetaException {
ExpressionTree e = null;
if (l != null) {
e = new ExpressionTree();
e.setRootForTest(l);
}
- PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, keyName);
+ PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts);
FilterPlan plan = planRes.plan;
Assert.assertEquals("Has unsupported condition", hasUnsupportedCondition,
planRes.hasUnsupportedCondition);
Assert.assertEquals(1, plan.getPlans().size());
ScanPlan splan = plan.getPlans().get(0);
- Assert.assertEquals(startMarker, splan.getStartMarker());
- Assert.assertEquals(endMarker, splan.getEndMarker());
+ if (startMarker != null) {
+ Assert.assertEquals(startMarker, splan.markers.get(keyName).startMarker);
+ } else {
+ Assert.assertTrue(splan.markers.get(keyName)==null ||
+ splan.markers.get(keyName).startMarker==null);
+ }
+ if (endMarker != null) {
+ Assert.assertEquals(endMarker, splan.markers.get(keyName).endMarker);
+ } else {
+ Assert.assertTrue(splan.markers.get(keyName)==null ||
+ splan.markers.get(keyName).endMarker==null);
+ }
}
/**
@@ -302,12 +299,13 @@ public class TestHBaseFilterPlanUtil {
final String KEY = "k1";
final String VAL1 = "10";
final String VAL2 = "11";
- final byte[] VAL1_BYTES = PartitionFilterGenerator.toBytes(VAL1);
- final byte[] VAL2_BYTES = PartitionFilterGenerator.toBytes(VAL2);
LeafNode l = new LeafNode();
l.keyName = KEY;
l.value = VAL1;
- final ScanMarker DEFAULT_SCANMARKER = new ScanMarker(null, false);
+ final ScanMarker DEFAULT_SCANMARKER = null;
+
+ List<FieldSchema> parts = new ArrayList<FieldSchema>();
+ parts.add(new FieldSchema("k1", "int", null));
LeafNode r = new LeafNode();
r.keyName = KEY;
@@ -318,19 +316,19 @@ public class TestHBaseFilterPlanUtil {
// verify plan for - k1 >= '10' and k1 < '11'
l.operator = Operator.GREATERTHANOREQUALTO;
r.operator = Operator.LESSTHAN;
- verifyPlan(tn, KEY, new ScanMarker(VAL1_BYTES, INCLUSIVE), new ScanMarker(VAL2_BYTES,
- !INCLUSIVE));
+ verifyPlan(tn, parts, KEY, new ScanMarker(VAL1, INCLUSIVE, "int"), new ScanMarker(VAL2,
+ !INCLUSIVE, "int"));
// verify plan for - k1 >= '10' and k1 > '11'
l.operator = Operator.GREATERTHANOREQUALTO;
r.operator = Operator.GREATERTHAN;
- verifyPlan(tn, KEY, new ScanMarker(VAL2_BYTES, !INCLUSIVE), DEFAULT_SCANMARKER);
+ verifyPlan(tn, parts, KEY, new ScanMarker(VAL2, !INCLUSIVE, "int"), DEFAULT_SCANMARKER);
// verify plan for - k1 >= '10' or k1 > '11'
tn = new TreeNode(l, LogicalOperator.OR, r);
ExpressionTree e = new ExpressionTree();
e.setRootForTest(tn);
- PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY);
+ PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts);
Assert.assertEquals(2, planRes.plan.getPlans().size());
Assert.assertEquals(false, planRes.hasUnsupportedCondition);
@@ -338,7 +336,7 @@ public class TestHBaseFilterPlanUtil {
TreeNode tn2 = new TreeNode(l, LogicalOperator.AND, tn);
e = new ExpressionTree();
e.setRootForTest(tn2);
- planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY);
+ planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts);
Assert.assertEquals(2, planRes.plan.getPlans().size());
Assert.assertEquals(false, planRes.hasUnsupportedCondition);
@@ -351,11 +349,135 @@ public class TestHBaseFilterPlanUtil {
TreeNode tn3 = new TreeNode(tn2, LogicalOperator.OR, klike);
e = new ExpressionTree();
e.setRootForTest(tn3);
- planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY);
+ planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts);
Assert.assertEquals(3, planRes.plan.getPlans().size());
- Assert.assertEquals(true, planRes.hasUnsupportedCondition);
+ Assert.assertEquals(false, planRes.hasUnsupportedCondition);
+
+ }
+ @Test
+ public void testPartitionKeyScannerAllString() throws Exception {
+ List<FieldSchema> parts = new ArrayList<FieldSchema>();
+ parts.add(new FieldSchema("year", "string", null));
+ parts.add(new FieldSchema("month", "string", null));
+ parts.add(new FieldSchema("state", "string", null));
+
+ // One prefix key and one minor key range
+ ExpressionTree exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and state = 'CA'").tree;
+ PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts);
+
+ Assert.assertEquals(planRes.plan.getPlans().size(), 1);
+
+ ScanPlan sp = planRes.plan.getPlans().get(0);
+ byte[] startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+ byte[] endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+ RowFilter filter = (RowFilter)sp.getFilter(parts);
+
+ // scan range contains the major key year, rowfilter contains minor key state
+ Assert.assertTrue(Bytes.contains(startRowSuffix, "2015".getBytes()));
+ Assert.assertTrue(Bytes.contains(endRowSuffix, "2015".getBytes()));
+ Assert.assertFalse(Bytes.contains(startRowSuffix, "CA".getBytes()));
+ Assert.assertFalse(Bytes.contains(endRowSuffix, "CA".getBytes()));
+
+ PartitionKeyComparator comparator = (PartitionKeyComparator)filter.getComparator();
+ Assert.assertEquals(comparator.ranges.size(), 1);
+ Assert.assertEquals(comparator.ranges.get(0).keyName, "state");
+
+ // Two prefix key and one LIKE operator
+ exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and month > 10 "
+ + "and month <= 11 and state like 'C%'").tree;
+ planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts);
+
+ Assert.assertEquals(planRes.plan.getPlans().size(), 1);
+
+ sp = planRes.plan.getPlans().get(0);
+ startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+ endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+ filter = (RowFilter)sp.getFilter(parts);
+
+ // scan range contains the major key value year/month, rowfilter contains LIKE operator
+ Assert.assertTrue(Bytes.contains(startRowSuffix, "2015".getBytes()));
+ Assert.assertTrue(Bytes.contains(endRowSuffix, "2015".getBytes()));
+ Assert.assertTrue(Bytes.contains(startRowSuffix, "10".getBytes()));
+ Assert.assertTrue(Bytes.contains(endRowSuffix, "11".getBytes()));
+
+ comparator = (PartitionKeyComparator)filter.getComparator();
+ Assert.assertEquals(comparator.ops.size(), 1);
+ Assert.assertEquals(comparator.ops.get(0).keyName, "state");
+
+ // One prefix key, one minor key range and one LIKE operator
+ exprTree = PartFilterExprUtil.getFilterParser("year >= 2014 and month > 10 "
+ + "and month <= 11 and state like 'C%'").tree;
+ planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts);
+
+ Assert.assertEquals(planRes.plan.getPlans().size(), 1);
+
+ sp = planRes.plan.getPlans().get(0);
+ startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+ endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+ filter = (RowFilter)sp.getFilter(parts);
+
+ // scan range contains the major key value year (low bound), rowfilter contains minor key state
+ // and LIKE operator
+ Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes()));
+
+ comparator = (PartitionKeyComparator)filter.getComparator();
+ Assert.assertEquals(comparator.ranges.size(), 1);
+ Assert.assertEquals(comparator.ranges.get(0).keyName, "month");
+ Assert.assertEquals(comparator.ops.size(), 1);
+ Assert.assertEquals(comparator.ops.get(0).keyName, "state");
+
+ // Condition contains or
+ exprTree = PartFilterExprUtil.getFilterParser("year = 2014 and (month > 10 "
+ + "or month < 3)").tree;
+ planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts);
+
+ sp = planRes.plan.getPlans().get(0);
+ startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+ endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+ filter = (RowFilter)sp.getFilter(parts);
+
+ // The first ScanPlan contains year = 2014 and month > 10
+ Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes()));
+ Assert.assertTrue(Bytes.contains(endRowSuffix, "2014".getBytes()));
+ Assert.assertTrue(Bytes.contains(startRowSuffix, "10".getBytes()));
+
+ sp = planRes.plan.getPlans().get(1);
+ startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+ endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+ filter = (RowFilter)sp.getFilter(parts);
+
+ // The first ScanPlan contains year = 2014 and month < 3
+ Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes()));
+ Assert.assertTrue(Bytes.contains(endRowSuffix, "2014".getBytes()));
+ Assert.assertTrue(Bytes.contains(endRowSuffix, "3".getBytes()));
}
+ @Test
+ public void testPartitionKeyScannerMixedType() throws Exception {
+ List<FieldSchema> parts = new ArrayList<FieldSchema>();
+ parts.add(new FieldSchema("year", "int", null));
+ parts.add(new FieldSchema("month", "int", null));
+ parts.add(new FieldSchema("state", "string", null));
+
+ // One prefix key and one minor key range
+ ExpressionTree exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and state = 'CA'").tree;
+ PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts);
+
+ Assert.assertEquals(planRes.plan.getPlans().size(), 1);
+
+ ScanPlan sp = planRes.plan.getPlans().get(0);
+ byte[] startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts);
+ byte[] endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts);
+ RowFilter filter = (RowFilter)sp.getFilter(parts);
+
+ // scan range contains the major key year, rowfilter contains minor key state
+ Assert.assertTrue(Bytes.contains(startRowSuffix, Shorts.toByteArray((short)2015)));
+ Assert.assertTrue(Bytes.contains(endRowSuffix, Shorts.toByteArray((short)2016)));
+
+ PartitionKeyComparator comparator = (PartitionKeyComparator)filter.getComparator();
+ Assert.assertEquals(comparator.ranges.size(), 1);
+ Assert.assertEquals(comparator.ranges.get(0).keyName, "state");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java
new file mode 100644
index 0000000..ec43ae3
--- /dev/null
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.binarysortable;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+public class BinarySortableSerDeWithEndPrefix extends BinarySortableSerDe {
+ public static void serializeStruct(Output byteStream, Object[] fieldData,
+ List<ObjectInspector> fieldOis, boolean endPrefix) throws SerDeException {
+ for (int i = 0; i < fieldData.length; i++) {
+ serialize(byteStream, fieldData[i], fieldOis.get(i), false);
+ }
+ if (endPrefix) {
+ if (fieldData[fieldData.length-1]!=null) {
+ byteStream.getData()[byteStream.getLength()-1]++;
+ } else {
+ byteStream.getData()[byteStream.getLength()-1]+=2;
+ }
+ }
+ }
+}
\ No newline at end of file