You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/10/10 19:42:13 UTC

[1/3] hive git commit: HIVE-11914 When transactions gets a heartbeat, it doesn't update the lock heartbeat. (Eugene Koifman, reviewed by Alan Gates)

Repository: hive
Updated Branches:
  refs/heads/master 86f7af66f -> ec8c793c3


HIVE-11914 When transactions gets a heartbeat, it doesn't update the lock heartbeat. (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6edb2c2f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6edb2c2f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6edb2c2f

Branch: refs/heads/master
Commit: 6edb2c2f5b22ab84ee0e4150d0982f81c39a5ccc
Parents: 86f7af6
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Oct 10 10:05:16 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Oct 10 10:05:16 2015 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/TestStreaming.java  | 27 ++++++++++++++++
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  4 +++
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    | 34 +++++++++++++-------
 .../hive/ql/txn/compactor/CompactorMR.java      |  8 ++---
 .../hive/ql/lockmgr/TestDbTxnManager.java       |  2 +-
 5 files changed, 59 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6edb2c2f/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 2f6baec..340ab6c 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
@@ -553,6 +555,31 @@ public class TestStreaming {
     txnBatch.close();
     connection.close();
   }
+
+  @Test
+  public void testHearbeat() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt);
+    StreamingConnection connection = endPt.newConnection(false, null);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(5, writer);
+    txnBatch.beginNextTransaction();
+    //todo: this should ideally check Transaction heartbeat as well, but heartbeat
+    //timestamp is not reported yet
+    //GetOpenTxnsInfoResponse txnresp = msClient.showTxns();
+    ShowLocksResponse response = msClient.showLocks();
+    Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size());
+    ShowLocksResponseElement lock = response.getLocks().get(0);
+    long acquiredAt = lock.getAcquiredat();
+    long heartbeatAt = lock.getAcquiredat();
+    txnBatch.heartbeat();
+    response = msClient.showLocks();
+    Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size());
+    lock = response.getLocks().get(0);
+    Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat());
+    Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
+      ") > old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() > heartbeatAt);
+  }
   @Test
   public void testTransactionBatchEmptyAbort() throws Exception {
     // 1) to partitioned table

http://git-wip-us.apache.org/repos/asf/hive/blob/6edb2c2f/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 6218a03..ca485fa 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1742,6 +1742,10 @@ public class TxnHandler {
         dbConn.rollback();
         throw new NoSuchTxnException("No such txn: " + txnid);
       }
+      //update locks for this txn to the same heartbeat
+      s = "update HIVE_LOCKS set hl_last_heartbeat = " + now + " where hl_txnid = " + txnid;
+      LOG.debug("Going to execute update <" + s + ">");
+      stmt.executeUpdate(s);
       LOG.debug("Going to commit");
       dbConn.commit();
     } finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/6edb2c2f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 39b44e8..219a54a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -293,17 +293,28 @@ public class DbTxnManager extends HiveTxnManagerImpl {
 
   @Override
   public void heartbeat() throws LockException {
-    LOG.debug("Heartbeating lock and transaction " + JavaUtils.txnIdToString(txnId));
-    List<HiveLock> locks = lockMgr.getLocks(false, false);
-    if (locks.size() == 0) {
-      if (!isTxnOpen()) {
-        // No locks, no txn, we outta here.
-        return;
-      } else {
-        // Create one dummy lock so we can go through the loop below
-        DbLockManager.DbHiveLock dummyLock = new DbLockManager.DbHiveLock(0L);
-        locks.add(dummyLock);
+    List<HiveLock> locks;
+    if(isTxnOpen()) {
+      // Create one dummy lock so we can go through the loop below, though we only
+      //really need txnId
+      DbLockManager.DbHiveLock dummyLock = new DbLockManager.DbHiveLock(0L);
+      locks = new ArrayList<>(1);
+      locks.add(dummyLock);
+    }
+    else {
+      locks = lockMgr.getLocks(false, false);
+    }
+    if(LOG.isInfoEnabled()) {
+      StringBuilder sb = new StringBuilder("Sending heartbeat for ")
+        .append(JavaUtils.txnIdToString(txnId)).append(" and");
+      for(HiveLock lock : locks) {
+        sb.append(" ").append(lock.toString());
       }
+      LOG.info(sb.toString());
+    }
+    if(!isTxnOpen() && locks.isEmpty()) {
+      // No locks, no txn, we outta here.
+      return;
     }
     for (HiveLock lock : locks) {
       long lockId = ((DbLockManager.DbHiveLock)lock).lockId;
@@ -320,7 +331,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
         throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
       } catch (TException e) {
         throw new LockException(
-            ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+            ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg() + "(" + JavaUtils.txnIdToString(txnId)
+              + "," + lock.toString() + ")", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6edb2c2f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 3ee9346..391f99a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -402,10 +402,10 @@ public class CompactorMR {
             dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
           boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
           FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
-          for (int j = 0; j < files.length; j++) {
+          for(FileStatus f : files) {
             // For each file, figure out which bucket it is.
-            Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(files[j].getPath().getName());
-            addFileToMap(matcher, files[j].getPath(), sawBase, splitToBucketMap);
+            Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
+            addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap);
           }
         } else {
           // Legacy file, see if it's a bucket file
@@ -434,7 +434,7 @@ public class CompactorMR {
                               Map<Integer, BucketTracker> splitToBucketMap) {
       if (!matcher.find()) {
         LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " +
-            file.toString());
+            file.toString() + " Matcher=" + matcher.toString());
       }
       int bucketNum = Integer.valueOf(matcher.group());
       BucketTracker bt = splitToBucketMap.get(bucketNum);

http://git-wip-us.apache.org/repos/asf/hive/blob/6edb2c2f/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index db119e1..8a53ec5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -236,7 +236,7 @@ public class TestDbTxnManager {
       exception = ex;
     }
     Assert.assertNotNull("Expected exception3", exception);
-    Assert.assertEquals("Wrong Exception3", ErrorMsg.LOCK_NO_SUCH_LOCK, exception.getCanonicalErrorMsg());
+    Assert.assertEquals("Wrong Exception3", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
   }
 
   @Test


[3/3] hive git commit: HIVE-12003 Hive Streaming API : Add check to ensure table is transactional(Roshan Naik via Eugene Koifman)

Posted by ek...@apache.org.
HIVE-12003 Hive Streaming API : Add check to ensure table is transactional(Roshan Naik via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ec8c793c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ec8c793c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ec8c793c

Branch: refs/heads/master
Commit: ec8c793c3bfc6edafada2329939553e5cd6cb0f3
Parents: ba83fd7
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Oct 10 10:11:48 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Oct 10 10:11:48 2015 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/HiveEndPoint.java   | 21 ++++++++++
 .../hive/hcatalog/streaming/InvalidTable.java   |  8 ++++
 .../hive/hcatalog/streaming/TestStreaming.java  | 41 +++++++++++++++++++-
 .../hive/ql/txn/compactor/TestCompactor.java    | 13 ++++---
 4 files changed, 76 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 7e99008..5de3f1d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -49,6 +49,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Information about the hive end point (i.e. table or partition) to write to.
@@ -272,11 +273,31 @@ public class HiveEndPoint {
       }
       this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
       this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
+      checkEndPoint(endPoint, msClient);
       if (createPart  &&  !endPoint.partitionVals.isEmpty()) {
         createPartitionIfNotExists(endPoint, msClient, conf);
       }
     }
 
+    private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) throws InvalidTable {
+      // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
+      try {
+        Table t = msClient.getTable(endPoint.database, endPoint.table);
+        Map<String, String> params = t.getParameters();
+        if(params != null) {
+          String transactionalProp = params.get("transactional");
+          if (transactionalProp != null && transactionalProp.equalsIgnoreCase("true")) {
+            return;
+          }
+        }
+        LOG.error("'transactional' property is not set on Table " + endPoint);
+        throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property is not set on Table");
+      } catch (Exception e) {
+        LOG.warn("Unable to check if Table is transactional. " + endPoint, e);
+        throw new InvalidTable(endPoint.database, endPoint.table, e);
+      }
+    }
+
     /**
      * Close connection
      */

http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
index 903c37e..98ef688 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
@@ -27,4 +27,12 @@ public class InvalidTable extends StreamingException {
   public InvalidTable(String db, String table) {
     super(makeMsg(db,table), null);
   }
+
+  public InvalidTable(String db, String table, String msg) {
+    super(msg);
+  }
+
+  public InvalidTable(String db, String table, Exception inner) {
+    super(inner.getMessage(), inner);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 340ab6c..d9a7eae 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -388,6 +388,44 @@ public class TestStreaming {
   }
 
 
+  @Test
+  public void testTableValidation() throws Exception {
+    int bucketCount = 100;
+
+    String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+    String tbl1 = "validation1";
+    String tbl2 = "validation2";
+
+    String tableLoc  = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
+    String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
+
+    runDDL(driver, "create database testBucketing3");
+    runDDL(driver, "use testBucketing3");
+
+    runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into "
+            + bucketCount + " buckets  stored as orc  location " + tableLoc) ;
+
+    runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into "
+            + bucketCount + " buckets  stored as orc  location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')") ;
+
+
+    try {
+      HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null);
+      endPt.newConnection(false);
+      Assert.assertTrue("InvalidTable exception was not thrown", false);
+    } catch (InvalidTable e) {
+      // expecting this exception
+    }
+    try {
+      HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null);
+      endPt.newConnection(false);
+      Assert.assertTrue("InvalidTable exception was not thrown", false);
+    } catch (InvalidTable e) {
+      // expecting this exception
+    }
+  }
+
+
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                 String... records) throws Exception {
     ValidTxnList txns = msClient.getValidTxns();
@@ -1191,7 +1229,8 @@ public class TestStreaming {
             " clustered by ( " + join(bucketCols, ",") + " )" +
             " into " + bucketCount + " buckets " +
             " stored as orc " +
-            " location '" + tableLoc +  "'";
+            " location '" + tableLoc +  "'" +
+            " TBLPROPERTIES ('transactional'='true') ";
     runDDL(driver, crtTbl);
     if(partNames!=null && partNames.length!=0) {
       return addPartition(driver, tableName, partVals, partNames);

http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index abca1ce..e2910dd 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -154,11 +154,12 @@ public class TestCompactor {
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
       " PARTITIONED BY(bkt INT)" +
       " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
-      " STORED AS ORC", driver);
+      " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
     executeStatementOnDriver("CREATE EXTERNAL TABLE " + tblNameStg + "(a INT, b STRING)" +
       " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'" +
       " STORED AS TEXTFILE" +
-      " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'", driver);
+      " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'" +
+      " TBLPROPERTIES ('transactional'='true')", driver);
 
     executeStatementOnDriver("load data local inpath '" + BASIC_FILE_NAME +
       "' overwrite into table " + tblNameStg, driver);
@@ -411,7 +412,7 @@ public class TestCompactor {
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
-        " STORED AS ORC", driver);
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
 
     HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -468,7 +469,7 @@ public class TestCompactor {
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
-        " STORED AS ORC", driver);
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true') ", driver);
 
     HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -516,7 +517,7 @@ public class TestCompactor {
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
-        " STORED AS ORC", driver);
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
 
     HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -576,7 +577,7 @@ public class TestCompactor {
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
-        " STORED AS ORC", driver);
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
 
     HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);


[2/3] hive git commit: HIVE-12025 refactor bucketId generating code (Eugene Koifman, reviewed by Prashanth Jayachandran, Sergey Shelukhin, Elliot West)

Posted by ek...@apache.org.
HIVE-12025 refactor bucketId generating code (Eugene Koifman, reviewed by Prashanth Jayachandran, Sergey Shelukhin, Elliot West)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ba83fd7b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ba83fd7b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ba83fd7b

Branch: refs/heads/master
Commit: ba83fd7bffde4b6be8c03768a0b421c7b93f3ab1
Parents: 6edb2c2
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Oct 10 10:08:46 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Oct 10 10:08:46 2015 -0700

----------------------------------------------------------------------
 .../mutate/worker/BucketIdResolverImpl.java     | 16 ++++---------
 .../mutate/worker/TestBucketIdResolverImpl.java |  2 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  9 ++++---
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java | 23 ++++++++----------
 .../hive/ql/io/DefaultHivePartitioner.java      |  3 ++-
 .../hive/ql/udf/generic/GenericUDFHash.java     | 11 ++++-----
 .../hive/ql/lockmgr/TestDbTxnManager.java       |  6 +++--
 .../objectinspector/ObjectInspectorUtils.java   | 13 ++++++----
 .../TestObjectInspectorUtils.java               | 25 ++++++++++++++++++++
 9 files changed, 64 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
index dbed9e1..bb9462d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
@@ -56,21 +56,15 @@ public class BucketIdResolverImpl implements BucketIdResolver {
     return record;
   }
 
-  /** Based on: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */
   @Override
   public int computeBucketId(Object record) {
-    int bucketId = 1;
-
+    Object[] bucketFieldValues = new Object[bucketFields.length];
+    ObjectInspector[] bucketFiledInspectors = new ObjectInspector[bucketFields.length];
     for (int columnIndex = 0; columnIndex < bucketFields.length; columnIndex++) {
-      Object columnValue = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]);
-      bucketId = bucketId * 31 + ObjectInspectorUtils.hashCode(columnValue, bucketFields[columnIndex].getFieldObjectInspector());
-    }
-
-    if (bucketId < 0) {
-      bucketId = -1 * bucketId;
+      bucketFieldValues[columnIndex] = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]);
+      bucketFiledInspectors[columnIndex] = bucketFields[columnIndex].getFieldObjectInspector();
     }
-
-    return bucketId % totalBuckets;
+    return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketFiledInspectors, totalBuckets);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
index f81373e..5297c5d 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
@@ -23,7 +23,7 @@ public class TestBucketIdResolverImpl {
   public void testAttachBucketIdToRecord() {
     MutableRecord record = new MutableRecord(1, "hello");
     capturingBucketIdResolver.attachBucketIdToRecord(record);
-    assertThat(record.rowId, is(new RecordIdentifier(-1L, 8, -1L)));
+    assertThat(record.rowId, is(new RecordIdentifier(-1L, 1, -1L)));
     assertThat(record.id, is(1));
     assertThat(record.msg.toString(), is("hello"));
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 39944a9..e247673 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -791,12 +791,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     if (!multiFileSpray) {
       return 0;
     } else {
-      int keyHashCode = 0;
-      for (int i = 0; i < partitionEval.length; i++) {
-        Object o = partitionEval[i].evaluate(row);
-        keyHashCode = keyHashCode * 31
-            + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+      Object[] bucketFieldValues = new Object[partitionEval.length];
+      for(int i = 0; i < partitionEval.length; i++) {
+        bucketFieldValues[i] = partitionEval[i].evaluate(row);
       }
+      int keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors);
       key.setHashCode(keyHashCode);
       int bucketNum = prtner.getBucket(key, null, totalFiles);
       return bucketMap.get(bucketNum);

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index f1df608..dd08210 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -405,27 +405,24 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
   }
 
   private int computeBucketNumber(Object row, int numBuckets) throws HiveException {
-    int buckNum = 0;
-
     if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
         conf.getWriteType() == AcidUtils.Operation.DELETE) {
-      // We don't need to evalute the hash code.  Instead read the bucket number directly from
+      // We don't need to evaluate the hash code.  Instead read the bucket number directly from
       // the row.  I don't need to evaluate any expressions as I know I am reading the ROW__ID
       // column directly.
       Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
-      buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
+      int buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
       if (isLogTraceEnabled) {
         LOG.trace("Acid choosing bucket number " + buckNum);
       }
+      return buckNum;
     } else {
+      Object[] bucketFieldValues = new Object[bucketEval.length];
       for (int i = 0; i < bucketEval.length; i++) {
-        Object o = bucketEval[i].evaluate(row);
-        buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]);
+        bucketFieldValues[i] = bucketEval[i].evaluate(row);
       }
+      return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketObjectInspectors, numBuckets);
     }
-
-    // similar to hive's default partitioner, refer DefaultHivePartitioner
-    return (buckNum & Integer.MAX_VALUE) % numBuckets;
   }
 
   private void populateCachedDistributionKeys(Object row, int index) throws HiveException {
@@ -476,11 +473,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
         keyHashCode = 1;
       }
     } else {
-      for (int i = 0; i < partitionEval.length; i++) {
-        Object o = partitionEval[i].evaluate(row);
-        keyHashCode = keyHashCode * 31
-            + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+      Object[] bucketFieldValues = new Object[partitionEval.length];
+      for(int i = 0; i < partitionEval.length; i++) {
+        bucketFieldValues[i] = partitionEval[i].evaluate(row);
       }
+      keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors);
     }
     int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
     if (isLogTraceEnabled) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
index 6a91cb8..6a14fb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 
 /** Partition keys by their {@link Object#hashCode()}. */
@@ -26,7 +27,7 @@ public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2> impl
   /** Use {@link Object#hashCode()} to partition. */
   @Override
   public int getBucket(K2 key, V2 value, int numBuckets) {
-    return (key.hashCode() & Integer.MAX_VALUE) % numBuckets;
+    return ObjectInspectorUtils.getBucketNumber(key.hashCode(), numBuckets);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
index 474f404..fd1fe92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.udf.generic;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -45,13 +44,11 @@ public class GenericUDFHash extends GenericUDF {
 
   @Override
   public Object evaluate(DeferredObject[] arguments) throws HiveException {
-    // See
-    // http://java.sun.com/j2se/1.5.0/docs/api/java/util/List.html#hashCode()
-    int r = 0;
-    for (int i = 0; i < arguments.length; i++) {
-      r = r * 31
-          + ObjectInspectorUtils.hashCode(arguments[i].get(), argumentOIs[i]);
+    Object[] fieldValues = new Object[arguments.length];
+    for(int i = 0; i < arguments.length; i++) {
+      fieldValues[i] = arguments[i].get();
     }
+    int r = ObjectInspectorUtils.getBucketHashCode(fieldValues, argumentOIs);
     result.set(r);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 8a53ec5..68c6542 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -20,13 +20,15 @@ package org.apache.hadoop.hive.ql.lockmgr;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.metadata.DummyPartition;import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.DummyPartition;
+import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
index 54ae48e..09e9108 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
@@ -502,18 +502,23 @@ public final class ObjectInspectorUtils {
    * @return the bucket number
    */
   public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) {
-    int hashCode = getBucketHashCode(bucketFields, bucketFieldInspectors);
-    int bucketID = (hashCode & Integer.MAX_VALUE) % totalBuckets;
-    return bucketID;
+    return getBucketNumber(getBucketHashCode(bucketFields, bucketFieldInspectors), totalBuckets);
   }
 
   /**
+   * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL+BucketedTables
+   * @param hashCode as produced by {@link #getBucketHashCode(Object[], ObjectInspector[])}
+   */
+  public static int getBucketNumber(int hashCode, int numberOfBuckets) {
+    return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
+  }
+  /**
    * Computes the hash code for the given bucketed fields
    * @param bucketFields
    * @param bucketFieldInspectors
    * @return
    */
-  private static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) {
+  public static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) {
     int hashCode = 0;
     for (int i = 0; i < bucketFields.length; i++) {
       int fieldHash = ObjectInspectorUtils.hashCode(bucketFields[i], bucketFieldInspectors[i]);

http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
index ade0ef7..cf73b28 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
@@ -131,4 +131,29 @@ public class TestObjectInspectorUtils extends TestCase {
     }
 
   }
+  public void testBucketIdGeneration() {
+    ArrayList<String> fieldNames = new ArrayList<String>();
+    fieldNames.add("firstInteger");
+    fieldNames.add("secondString");
+    fieldNames.add("thirdBoolean");
+    ArrayList<ObjectInspector> fieldObjectInspectors = new ArrayList<ObjectInspector>();
+    fieldObjectInspectors
+      .add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
+    fieldObjectInspectors
+      .add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+    fieldObjectInspectors
+      .add(PrimitiveObjectInspectorFactory.javaBooleanObjectInspector);
+
+    StandardStructObjectInspector soi1 = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldObjectInspectors);
+    ArrayList<Object> struct = new ArrayList<Object>(3);
+    struct.add(1);
+    struct.add("two");
+    struct.add(true);
+
+    int hashCode = ObjectInspectorUtils.getBucketHashCode(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()]));
+    assertEquals("", 3574518, hashCode);
+    int bucketId = ObjectInspectorUtils.getBucketNumber(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()]), 16);
+    assertEquals("", 6, bucketId);
+    assertEquals("", bucketId, ObjectInspectorUtils.getBucketNumber(hashCode, 16));
+  }
 }