You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/10/10 19:42:14 UTC
[2/3] hive git commit: HIVE-12025 refactor bucketId generating code
(Eugene Koifman, reviewed by Prashanth Jayachandran, Sergey Shelukhin,
Elliot West)
HIVE-12025 refactor bucketId generating code (Eugene Koifman, reviewed by Prashanth Jayachandran, Sergey Shelukhin, Elliot West)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ba83fd7b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ba83fd7b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ba83fd7b
Branch: refs/heads/master
Commit: ba83fd7bffde4b6be8c03768a0b421c7b93f3ab1
Parents: 6edb2c2
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Oct 10 10:08:46 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Oct 10 10:08:46 2015 -0700
----------------------------------------------------------------------
.../mutate/worker/BucketIdResolverImpl.java | 16 ++++---------
.../mutate/worker/TestBucketIdResolverImpl.java | 2 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 9 ++++---
.../hadoop/hive/ql/exec/ReduceSinkOperator.java | 23 ++++++++----------
.../hive/ql/io/DefaultHivePartitioner.java | 3 ++-
.../hive/ql/udf/generic/GenericUDFHash.java | 11 ++++-----
.../hive/ql/lockmgr/TestDbTxnManager.java | 6 +++--
.../objectinspector/ObjectInspectorUtils.java | 13 ++++++----
.../TestObjectInspectorUtils.java | 25 ++++++++++++++++++++
9 files changed, 64 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
index dbed9e1..bb9462d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
@@ -56,21 +56,15 @@ public class BucketIdResolverImpl implements BucketIdResolver {
return record;
}
- /** Based on: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */
@Override
public int computeBucketId(Object record) {
- int bucketId = 1;
-
+ Object[] bucketFieldValues = new Object[bucketFields.length];
+ ObjectInspector[] bucketFiledInspectors = new ObjectInspector[bucketFields.length];
for (int columnIndex = 0; columnIndex < bucketFields.length; columnIndex++) {
- Object columnValue = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]);
- bucketId = bucketId * 31 + ObjectInspectorUtils.hashCode(columnValue, bucketFields[columnIndex].getFieldObjectInspector());
- }
-
- if (bucketId < 0) {
- bucketId = -1 * bucketId;
+ bucketFieldValues[columnIndex] = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]);
+ bucketFiledInspectors[columnIndex] = bucketFields[columnIndex].getFieldObjectInspector();
}
-
- return bucketId % totalBuckets;
+ return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketFiledInspectors, totalBuckets);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
index f81373e..5297c5d 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
@@ -23,7 +23,7 @@ public class TestBucketIdResolverImpl {
public void testAttachBucketIdToRecord() {
MutableRecord record = new MutableRecord(1, "hello");
capturingBucketIdResolver.attachBucketIdToRecord(record);
- assertThat(record.rowId, is(new RecordIdentifier(-1L, 8, -1L)));
+ assertThat(record.rowId, is(new RecordIdentifier(-1L, 1, -1L)));
assertThat(record.id, is(1));
assertThat(record.msg.toString(), is("hello"));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 39944a9..e247673 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -791,12 +791,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
if (!multiFileSpray) {
return 0;
} else {
- int keyHashCode = 0;
- for (int i = 0; i < partitionEval.length; i++) {
- Object o = partitionEval[i].evaluate(row);
- keyHashCode = keyHashCode * 31
- + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+ Object[] bucketFieldValues = new Object[partitionEval.length];
+ for(int i = 0; i < partitionEval.length; i++) {
+ bucketFieldValues[i] = partitionEval[i].evaluate(row);
}
+ int keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors);
key.setHashCode(keyHashCode);
int bucketNum = prtner.getBucket(key, null, totalFiles);
return bucketMap.get(bucketNum);
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index f1df608..dd08210 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -405,27 +405,24 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
}
private int computeBucketNumber(Object row, int numBuckets) throws HiveException {
- int buckNum = 0;
-
if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
conf.getWriteType() == AcidUtils.Operation.DELETE) {
- // We don't need to evalute the hash code. Instead read the bucket number directly from
+ // We don't need to evaluate the hash code. Instead read the bucket number directly from
// the row. I don't need to evaluate any expressions as I know I am reading the ROW__ID
// column directly.
Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
- buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
+ int buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
if (isLogTraceEnabled) {
LOG.trace("Acid choosing bucket number " + buckNum);
}
+ return buckNum;
} else {
+ Object[] bucketFieldValues = new Object[bucketEval.length];
for (int i = 0; i < bucketEval.length; i++) {
- Object o = bucketEval[i].evaluate(row);
- buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]);
+ bucketFieldValues[i] = bucketEval[i].evaluate(row);
}
+ return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketObjectInspectors, numBuckets);
}
-
- // similar to hive's default partitioner, refer DefaultHivePartitioner
- return (buckNum & Integer.MAX_VALUE) % numBuckets;
}
private void populateCachedDistributionKeys(Object row, int index) throws HiveException {
@@ -476,11 +473,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
keyHashCode = 1;
}
} else {
- for (int i = 0; i < partitionEval.length; i++) {
- Object o = partitionEval[i].evaluate(row);
- keyHashCode = keyHashCode * 31
- + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+ Object[] bucketFieldValues = new Object[partitionEval.length];
+ for(int i = 0; i < partitionEval.length; i++) {
+ bucketFieldValues[i] = partitionEval[i].evaluate(row);
}
+ keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors);
}
int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
if (isLogTraceEnabled) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
index 6a91cb8..6a14fb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.io;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.mapred.lib.HashPartitioner;
/** Partition keys by their {@link Object#hashCode()}. */
@@ -26,7 +27,7 @@ public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2> impl
/** Use {@link Object#hashCode()} to partition. */
@Override
public int getBucket(K2 key, V2 value, int numBuckets) {
- return (key.hashCode() & Integer.MAX_VALUE) % numBuckets;
+ return ObjectInspectorUtils.getBucketNumber(key.hashCode(), numBuckets);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
index 474f404..fd1fe92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.udf.generic;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -45,13 +44,11 @@ public class GenericUDFHash extends GenericUDF {
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
- // See
- // http://java.sun.com/j2se/1.5.0/docs/api/java/util/List.html#hashCode()
- int r = 0;
- for (int i = 0; i < arguments.length; i++) {
- r = r * 31
- + ObjectInspectorUtils.hashCode(arguments[i].get(), argumentOIs[i]);
+ Object[] fieldValues = new Object[arguments.length];
+ for(int i = 0; i < arguments.length; i++) {
+ fieldValues[i] = arguments[i].get();
}
+ int r = ObjectInspectorUtils.getBucketHashCode(fieldValues, argumentOIs);
result.set(r);
return result;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 8a53ec5..68c6542 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -20,13 +20,15 @@ package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.metadata.DummyPartition;import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.DummyPartition;
+import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
index 54ae48e..09e9108 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
@@ -502,18 +502,23 @@ public final class ObjectInspectorUtils {
* @return the bucket number
*/
public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) {
- int hashCode = getBucketHashCode(bucketFields, bucketFieldInspectors);
- int bucketID = (hashCode & Integer.MAX_VALUE) % totalBuckets;
- return bucketID;
+ return getBucketNumber(getBucketHashCode(bucketFields, bucketFieldInspectors), totalBuckets);
}
/**
+ * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL+BucketedTables
+ * @param hashCode as produced by {@link #getBucketHashCode(Object[], ObjectInspector[])}
+ */
+ public static int getBucketNumber(int hashCode, int numberOfBuckets) {
+ return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
+ }
+ /**
* Computes the hash code for the given bucketed fields
* @param bucketFields
* @param bucketFieldInspectors
* @return
*/
- private static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) {
+ public static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) {
int hashCode = 0;
for (int i = 0; i < bucketFields.length; i++) {
int fieldHash = ObjectInspectorUtils.hashCode(bucketFields[i], bucketFieldInspectors[i]);
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
index ade0ef7..cf73b28 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
@@ -131,4 +131,29 @@ public class TestObjectInspectorUtils extends TestCase {
}
}
+ public void testBucketIdGeneration() {
+ ArrayList<String> fieldNames = new ArrayList<String>();
+ fieldNames.add("firstInteger");
+ fieldNames.add("secondString");
+ fieldNames.add("thirdBoolean");
+ ArrayList<ObjectInspector> fieldObjectInspectors = new ArrayList<ObjectInspector>();
+ fieldObjectInspectors
+ .add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
+ fieldObjectInspectors
+ .add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ fieldObjectInspectors
+ .add(PrimitiveObjectInspectorFactory.javaBooleanObjectInspector);
+
+ StandardStructObjectInspector soi1 = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldObjectInspectors);
+ ArrayList<Object> struct = new ArrayList<Object>(3);
+ struct.add(1);
+ struct.add("two");
+ struct.add(true);
+
+ int hashCode = ObjectInspectorUtils.getBucketHashCode(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()]));
+ assertEquals("", 3574518, hashCode);
+ int bucketId = ObjectInspectorUtils.getBucketNumber(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()]), 16);
+ assertEquals("", 6, bucketId);
+ assertEquals("", bucketId, ObjectInspectorUtils.getBucketNumber(hashCode, 16));
+ }
}