You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/10/10 19:42:14 UTC

[2/3] hive git commit: HIVE-12025 refactor bucketId generating code (Eugene Koifman, reviewed by Prashanth Jayachandran, Sergey Shelukhin, Elliot West)

HIVE-12025 refactor bucketId generating code (Eugene Koifman, reviewed by Prashanth Jayachandran, Sergey Shelukhin, Elliot West)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ba83fd7b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ba83fd7b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ba83fd7b

Branch: refs/heads/master
Commit: ba83fd7bffde4b6be8c03768a0b421c7b93f3ab1
Parents: 6edb2c2
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Oct 10 10:08:46 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Oct 10 10:08:46 2015 -0700

----------------------------------------------------------------------
 .../mutate/worker/BucketIdResolverImpl.java     | 16 ++++---------
 .../mutate/worker/TestBucketIdResolverImpl.java |  2 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  9 ++++---
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java | 23 ++++++++----------
 .../hive/ql/io/DefaultHivePartitioner.java      |  3 ++-
 .../hive/ql/udf/generic/GenericUDFHash.java     | 11 ++++-----
 .../hive/ql/lockmgr/TestDbTxnManager.java       |  6 +++--
 .../objectinspector/ObjectInspectorUtils.java   | 13 ++++++----
 .../TestObjectInspectorUtils.java               | 25 ++++++++++++++++++++
 9 files changed, 64 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
index dbed9e1..bb9462d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
@@ -56,21 +56,15 @@ public class BucketIdResolverImpl implements BucketIdResolver {
     return record;
   }
 
-  /** Based on: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */
   @Override
   public int computeBucketId(Object record) {
-    int bucketId = 1;
-
+    Object[] bucketFieldValues = new Object[bucketFields.length];
+    ObjectInspector[] bucketFiledInspectors = new ObjectInspector[bucketFields.length];
     for (int columnIndex = 0; columnIndex < bucketFields.length; columnIndex++) {
-      Object columnValue = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]);
-      bucketId = bucketId * 31 + ObjectInspectorUtils.hashCode(columnValue, bucketFields[columnIndex].getFieldObjectInspector());
-    }
-
-    if (bucketId < 0) {
-      bucketId = -1 * bucketId;
+      bucketFieldValues[columnIndex] = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]);
+      bucketFiledInspectors[columnIndex] = bucketFields[columnIndex].getFieldObjectInspector();
     }
-
-    return bucketId % totalBuckets;
+    return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketFiledInspectors, totalBuckets);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
index f81373e..5297c5d 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
@@ -23,7 +23,7 @@ public class TestBucketIdResolverImpl {
   public void testAttachBucketIdToRecord() {
     MutableRecord record = new MutableRecord(1, "hello");
     capturingBucketIdResolver.attachBucketIdToRecord(record);
-    assertThat(record.rowId, is(new RecordIdentifier(-1L, 8, -1L)));
+    assertThat(record.rowId, is(new RecordIdentifier(-1L, 1, -1L)));
     assertThat(record.id, is(1));
     assertThat(record.msg.toString(), is("hello"));
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 39944a9..e247673 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -791,12 +791,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     if (!multiFileSpray) {
       return 0;
     } else {
-      int keyHashCode = 0;
-      for (int i = 0; i < partitionEval.length; i++) {
-        Object o = partitionEval[i].evaluate(row);
-        keyHashCode = keyHashCode * 31
-            + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+      Object[] bucketFieldValues = new Object[partitionEval.length];
+      for(int i = 0; i < partitionEval.length; i++) {
+        bucketFieldValues[i] = partitionEval[i].evaluate(row);
       }
+      int keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors);
       key.setHashCode(keyHashCode);
       int bucketNum = prtner.getBucket(key, null, totalFiles);
       return bucketMap.get(bucketNum);

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index f1df608..dd08210 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -405,27 +405,24 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
   }
 
   private int computeBucketNumber(Object row, int numBuckets) throws HiveException {
-    int buckNum = 0;
-
     if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
         conf.getWriteType() == AcidUtils.Operation.DELETE) {
-      // We don't need to evalute the hash code.  Instead read the bucket number directly from
+      // We don't need to evaluate the hash code.  Instead read the bucket number directly from
       // the row.  I don't need to evaluate any expressions as I know I am reading the ROW__ID
       // column directly.
       Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
-      buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
+      int buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
       if (isLogTraceEnabled) {
         LOG.trace("Acid choosing bucket number " + buckNum);
       }
+      return buckNum;
     } else {
+      Object[] bucketFieldValues = new Object[bucketEval.length];
       for (int i = 0; i < bucketEval.length; i++) {
-        Object o = bucketEval[i].evaluate(row);
-        buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]);
+        bucketFieldValues[i] = bucketEval[i].evaluate(row);
       }
+      return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketObjectInspectors, numBuckets);
     }
-
-    // similar to hive's default partitioner, refer DefaultHivePartitioner
-    return (buckNum & Integer.MAX_VALUE) % numBuckets;
   }
 
   private void populateCachedDistributionKeys(Object row, int index) throws HiveException {
@@ -476,11 +473,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
         keyHashCode = 1;
       }
     } else {
-      for (int i = 0; i < partitionEval.length; i++) {
-        Object o = partitionEval[i].evaluate(row);
-        keyHashCode = keyHashCode * 31
-            + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+      Object[] bucketFieldValues = new Object[partitionEval.length];
+      for(int i = 0; i < partitionEval.length; i++) {
+        bucketFieldValues[i] = partitionEval[i].evaluate(row);
       }
+      keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors);
     }
     int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
     if (isLogTraceEnabled) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
index 6a91cb8..6a14fb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 
 /** Partition keys by their {@link Object#hashCode()}. */
@@ -26,7 +27,7 @@ public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2> impl
   /** Use {@link Object#hashCode()} to partition. */
   @Override
   public int getBucket(K2 key, V2 value, int numBuckets) {
-    return (key.hashCode() & Integer.MAX_VALUE) % numBuckets;
+    return ObjectInspectorUtils.getBucketNumber(key.hashCode(), numBuckets);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
index 474f404..fd1fe92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.udf.generic;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -45,13 +44,11 @@ public class GenericUDFHash extends GenericUDF {
 
   @Override
   public Object evaluate(DeferredObject[] arguments) throws HiveException {
-    // See
-    // http://java.sun.com/j2se/1.5.0/docs/api/java/util/List.html#hashCode()
-    int r = 0;
-    for (int i = 0; i < arguments.length; i++) {
-      r = r * 31
-          + ObjectInspectorUtils.hashCode(arguments[i].get(), argumentOIs[i]);
+    Object[] fieldValues = new Object[arguments.length];
+    for(int i = 0; i < arguments.length; i++) {
+      fieldValues[i] = arguments[i].get();
     }
+    int r = ObjectInspectorUtils.getBucketHashCode(fieldValues, argumentOIs);
     result.set(r);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 8a53ec5..68c6542 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -20,13 +20,15 @@ package org.apache.hadoop.hive.ql.lockmgr;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.metadata.DummyPartition;import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.DummyPartition;
+import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
index 54ae48e..09e9108 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
@@ -502,18 +502,23 @@ public final class ObjectInspectorUtils {
    * @return the bucket number
    */
   public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) {
-    int hashCode = getBucketHashCode(bucketFields, bucketFieldInspectors);
-    int bucketID = (hashCode & Integer.MAX_VALUE) % totalBuckets;
-    return bucketID;
+    return getBucketNumber(getBucketHashCode(bucketFields, bucketFieldInspectors), totalBuckets);
   }
 
   /**
+   * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL+BucketedTables
+   * @param hashCode as produced by {@link #getBucketHashCode(Object[], ObjectInspector[])}
+   */
+  public static int getBucketNumber(int hashCode, int numberOfBuckets) {
+    return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
+  }
+  /**
    * Computes the hash code for the given bucketed fields
    * @param bucketFields
    * @param bucketFieldInspectors
    * @return
    */
-  private static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) {
+  public static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) {
     int hashCode = 0;
     for (int i = 0; i < bucketFields.length; i++) {
       int fieldHash = ObjectInspectorUtils.hashCode(bucketFields[i], bucketFieldInspectors[i]);

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
index ade0ef7..cf73b28 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
@@ -131,4 +131,29 @@ public class TestObjectInspectorUtils extends TestCase {
     }
 
   }
+  public void testBucketIdGeneration() {
+    ArrayList<String> fieldNames = new ArrayList<String>();
+    fieldNames.add("firstInteger");
+    fieldNames.add("secondString");
+    fieldNames.add("thirdBoolean");
+    ArrayList<ObjectInspector> fieldObjectInspectors = new ArrayList<ObjectInspector>();
+    fieldObjectInspectors
+      .add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
+    fieldObjectInspectors
+      .add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+    fieldObjectInspectors
+      .add(PrimitiveObjectInspectorFactory.javaBooleanObjectInspector);
+
+    StandardStructObjectInspector soi1 = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldObjectInspectors);
+    ArrayList<Object> struct = new ArrayList<Object>(3);
+    struct.add(1);
+    struct.add("two");
+    struct.add(true);
+
+    int hashCode = ObjectInspectorUtils.getBucketHashCode(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()]));
+    assertEquals("", 3574518, hashCode);
+    int bucketId = ObjectInspectorUtils.getBucketNumber(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()]), 16);
+    assertEquals("", 6, bucketId);
+    assertEquals("", bucketId, ObjectInspectorUtils.getBucketNumber(hashCode, 16));
+  }
 }