You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/10/10 19:42:13 UTC
[1/3] hive git commit: HIVE-11914 When transactions gets a heartbeat,
it doesn't update the lock heartbeat. (Eugene Koifman, reviewed by Alan Gates)
Repository: hive
Updated Branches:
refs/heads/master 86f7af66f -> ec8c793c3
HIVE-11914 When transactions gets a heartbeat, it doesn't update the lock heartbeat. (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6edb2c2f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6edb2c2f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6edb2c2f
Branch: refs/heads/master
Commit: 6edb2c2f5b22ab84ee0e4150d0982f81c39a5ccc
Parents: 86f7af6
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Oct 10 10:05:16 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Oct 10 10:05:16 2015 -0700
----------------------------------------------------------------------
.../hive/hcatalog/streaming/TestStreaming.java | 27 ++++++++++++++++
.../hadoop/hive/metastore/txn/TxnHandler.java | 4 +++
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 34 +++++++++++++-------
.../hive/ql/txn/compactor/CompactorMR.java | 8 ++---
.../hive/ql/lockmgr/TestDbTxnManager.java | 2 +-
5 files changed, 59 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6edb2c2f/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 2f6baec..340ab6c 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
@@ -553,6 +555,31 @@ public class TestStreaming {
txnBatch.close();
connection.close();
}
+
+ @Test
+ public void testHearbeat() throws Exception {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+ DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false, null);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer);
+ txnBatch.beginNextTransaction();
+ //todo: this should ideally check Transaction heartbeat as well, but heartbeat
+ //timestamp is not reported yet
+ //GetOpenTxnsInfoResponse txnresp = msClient.showTxns();
+ ShowLocksResponse response = msClient.showLocks();
+ Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size());
+ ShowLocksResponseElement lock = response.getLocks().get(0);
+ long acquiredAt = lock.getAcquiredat();
+ long heartbeatAt = lock.getAcquiredat();
+ txnBatch.heartbeat();
+ response = msClient.showLocks();
+ Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size());
+ lock = response.getLocks().get(0);
+ Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat());
+ Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
+ ") > old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() > heartbeatAt);
+ }
@Test
public void testTransactionBatchEmptyAbort() throws Exception {
// 1) to partitioned table
http://git-wip-us.apache.org/repos/asf/hive/blob/6edb2c2f/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 6218a03..ca485fa 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1742,6 +1742,10 @@ public class TxnHandler {
dbConn.rollback();
throw new NoSuchTxnException("No such txn: " + txnid);
}
+ //update locks for this txn to the same heartbeat
+ s = "update HIVE_LOCKS set hl_last_heartbeat = " + now + " where hl_txnid = " + txnid;
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
LOG.debug("Going to commit");
dbConn.commit();
} finally {
http://git-wip-us.apache.org/repos/asf/hive/blob/6edb2c2f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 39b44e8..219a54a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -293,17 +293,28 @@ public class DbTxnManager extends HiveTxnManagerImpl {
@Override
public void heartbeat() throws LockException {
- LOG.debug("Heartbeating lock and transaction " + JavaUtils.txnIdToString(txnId));
- List<HiveLock> locks = lockMgr.getLocks(false, false);
- if (locks.size() == 0) {
- if (!isTxnOpen()) {
- // No locks, no txn, we outta here.
- return;
- } else {
- // Create one dummy lock so we can go through the loop below
- DbLockManager.DbHiveLock dummyLock = new DbLockManager.DbHiveLock(0L);
- locks.add(dummyLock);
+ List<HiveLock> locks;
+ if(isTxnOpen()) {
+ // Create one dummy lock so we can go through the loop below, though we only
+ //really need txnId
+ DbLockManager.DbHiveLock dummyLock = new DbLockManager.DbHiveLock(0L);
+ locks = new ArrayList<>(1);
+ locks.add(dummyLock);
+ }
+ else {
+ locks = lockMgr.getLocks(false, false);
+ }
+ if(LOG.isInfoEnabled()) {
+ StringBuilder sb = new StringBuilder("Sending heartbeat for ")
+ .append(JavaUtils.txnIdToString(txnId)).append(" and");
+ for(HiveLock lock : locks) {
+ sb.append(" ").append(lock.toString());
}
+ LOG.info(sb.toString());
+ }
+ if(!isTxnOpen() && locks.isEmpty()) {
+ // No locks, no txn, we outta here.
+ return;
}
for (HiveLock lock : locks) {
long lockId = ((DbLockManager.DbHiveLock)lock).lockId;
@@ -320,7 +331,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
} catch (TException e) {
throw new LockException(
- ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+ ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg() + "(" + JavaUtils.txnIdToString(txnId)
+ + "," + lock.toString() + ")", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6edb2c2f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 3ee9346..391f99a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -402,10 +402,10 @@ public class CompactorMR {
dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
- for (int j = 0; j < files.length; j++) {
+ for(FileStatus f : files) {
// For each file, figure out which bucket it is.
- Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(files[j].getPath().getName());
- addFileToMap(matcher, files[j].getPath(), sawBase, splitToBucketMap);
+ Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
+ addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap);
}
} else {
// Legacy file, see if it's a bucket file
@@ -434,7 +434,7 @@ public class CompactorMR {
Map<Integer, BucketTracker> splitToBucketMap) {
if (!matcher.find()) {
LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " +
- file.toString());
+ file.toString() + " Matcher=" + matcher.toString());
}
int bucketNum = Integer.valueOf(matcher.group());
BucketTracker bt = splitToBucketMap.get(bucketNum);
http://git-wip-us.apache.org/repos/asf/hive/blob/6edb2c2f/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index db119e1..8a53ec5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -236,7 +236,7 @@ public class TestDbTxnManager {
exception = ex;
}
Assert.assertNotNull("Expected exception3", exception);
- Assert.assertEquals("Wrong Exception3", ErrorMsg.LOCK_NO_SUCH_LOCK, exception.getCanonicalErrorMsg());
+ Assert.assertEquals("Wrong Exception3", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
}
@Test
[3/3] hive git commit: HIVE-12003 Hive Streaming API : Add check to
ensure table is transactional(Roshan Naik via Eugene Koifman)
Posted by ek...@apache.org.
HIVE-12003 Hive Streaming API : Add check to ensure table is transactional(Roshan Naik via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ec8c793c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ec8c793c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ec8c793c
Branch: refs/heads/master
Commit: ec8c793c3bfc6edafada2329939553e5cd6cb0f3
Parents: ba83fd7
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Oct 10 10:11:48 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Oct 10 10:11:48 2015 -0700
----------------------------------------------------------------------
.../hive/hcatalog/streaming/HiveEndPoint.java | 21 ++++++++++
.../hive/hcatalog/streaming/InvalidTable.java | 8 ++++
.../hive/hcatalog/streaming/TestStreaming.java | 41 +++++++++++++++++++-
.../hive/ql/txn/compactor/TestCompactor.java | 13 ++++---
4 files changed, 76 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 7e99008..5de3f1d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -49,6 +49,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
/**
* Information about the hive end point (i.e. table or partition) to write to.
@@ -272,11 +273,31 @@ public class HiveEndPoint {
}
this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
+ checkEndPoint(endPoint, msClient);
if (createPart && !endPoint.partitionVals.isEmpty()) {
createPartitionIfNotExists(endPoint, msClient, conf);
}
}
+ private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) throws InvalidTable {
+ // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
+ try {
+ Table t = msClient.getTable(endPoint.database, endPoint.table);
+ Map<String, String> params = t.getParameters();
+ if(params != null) {
+ String transactionalProp = params.get("transactional");
+ if (transactionalProp != null && transactionalProp.equalsIgnoreCase("true")) {
+ return;
+ }
+ }
+ LOG.error("'transactional' property is not set on Table " + endPoint);
+ throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property is not set on Table");
+ } catch (Exception e) {
+ LOG.warn("Unable to check if Table is transactional. " + endPoint, e);
+ throw new InvalidTable(endPoint.database, endPoint.table, e);
+ }
+ }
+
/**
* Close connection
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
index 903c37e..98ef688 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
@@ -27,4 +27,12 @@ public class InvalidTable extends StreamingException {
public InvalidTable(String db, String table) {
super(makeMsg(db,table), null);
}
+
+ public InvalidTable(String db, String table, String msg) {
+ super(msg);
+ }
+
+ public InvalidTable(String db, String table, Exception inner) {
+ super(inner.getMessage(), inner);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 340ab6c..d9a7eae 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -388,6 +388,44 @@ public class TestStreaming {
}
+ @Test
+ public void testTableValidation() throws Exception {
+ int bucketCount = 100;
+
+ String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+ String tbl1 = "validation1";
+ String tbl2 = "validation2";
+
+ String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
+ String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
+
+ runDDL(driver, "create database testBucketing3");
+ runDDL(driver, "use testBucketing3");
+
+ runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc) ;
+
+ runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')") ;
+
+
+ try {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null);
+ endPt.newConnection(false);
+ Assert.assertTrue("InvalidTable exception was not thrown", false);
+ } catch (InvalidTable e) {
+ // expecting this exception
+ }
+ try {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null);
+ endPt.newConnection(false);
+ Assert.assertTrue("InvalidTable exception was not thrown", false);
+ } catch (InvalidTable e) {
+ // expecting this exception
+ }
+ }
+
+
private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
ValidTxnList txns = msClient.getValidTxns();
@@ -1191,7 +1229,8 @@ public class TestStreaming {
" clustered by ( " + join(bucketCols, ",") + " )" +
" into " + bucketCount + " buckets " +
" stored as orc " +
- " location '" + tableLoc + "'";
+ " location '" + tableLoc + "'" +
+ " TBLPROPERTIES ('transactional'='true') ";
runDDL(driver, crtTbl);
if(partNames!=null && partNames.length!=0) {
return addPartition(driver, tableName, partVals, partNames);
http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index abca1ce..e2910dd 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -154,11 +154,12 @@ public class TestCompactor {
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" PARTITIONED BY(bkt INT)" +
" CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
executeStatementOnDriver("CREATE EXTERNAL TABLE " + tblNameStg + "(a INT, b STRING)" +
" ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'" +
" STORED AS TEXTFILE" +
- " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'", driver);
+ " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'" +
+ " TBLPROPERTIES ('transactional'='true')", driver);
executeStatementOnDriver("load data local inpath '" + BASIC_FILE_NAME +
"' overwrite into table " + tblNameStg, driver);
@@ -411,7 +412,7 @@ public class TestCompactor {
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -468,7 +469,7 @@ public class TestCompactor {
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true') ", driver);
HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -516,7 +517,7 @@ public class TestCompactor {
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -576,7 +577,7 @@ public class TestCompactor {
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
[2/3] hive git commit: HIVE-12025 refactor bucketId generating code
(Eugene Koifman, reviewed by Prashanth Jayachandran, Sergey Shelukhin,
Elliot West)
Posted by ek...@apache.org.
HIVE-12025 refactor bucketId generating code (Eugene Koifman, reviewed by Prashanth Jayachandran, Sergey Shelukhin, Elliot West)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ba83fd7b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ba83fd7b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ba83fd7b
Branch: refs/heads/master
Commit: ba83fd7bffde4b6be8c03768a0b421c7b93f3ab1
Parents: 6edb2c2
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Oct 10 10:08:46 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Oct 10 10:08:46 2015 -0700
----------------------------------------------------------------------
.../mutate/worker/BucketIdResolverImpl.java | 16 ++++---------
.../mutate/worker/TestBucketIdResolverImpl.java | 2 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 9 ++++---
.../hadoop/hive/ql/exec/ReduceSinkOperator.java | 23 ++++++++----------
.../hive/ql/io/DefaultHivePartitioner.java | 3 ++-
.../hive/ql/udf/generic/GenericUDFHash.java | 11 ++++-----
.../hive/ql/lockmgr/TestDbTxnManager.java | 6 +++--
.../objectinspector/ObjectInspectorUtils.java | 13 ++++++----
.../TestObjectInspectorUtils.java | 25 ++++++++++++++++++++
9 files changed, 64 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
index dbed9e1..bb9462d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
@@ -56,21 +56,15 @@ public class BucketIdResolverImpl implements BucketIdResolver {
return record;
}
- /** Based on: {@link org.apache.hadoop.hive.ql.exec.ReduceSinkOperator#computeBucketNumber(Object, int)}. */
@Override
public int computeBucketId(Object record) {
- int bucketId = 1;
-
+ Object[] bucketFieldValues = new Object[bucketFields.length];
+ ObjectInspector[] bucketFiledInspectors = new ObjectInspector[bucketFields.length];
for (int columnIndex = 0; columnIndex < bucketFields.length; columnIndex++) {
- Object columnValue = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]);
- bucketId = bucketId * 31 + ObjectInspectorUtils.hashCode(columnValue, bucketFields[columnIndex].getFieldObjectInspector());
- }
-
- if (bucketId < 0) {
- bucketId = -1 * bucketId;
+ bucketFieldValues[columnIndex] = structObjectInspector.getStructFieldData(record, bucketFields[columnIndex]);
+ bucketFiledInspectors[columnIndex] = bucketFields[columnIndex].getFieldObjectInspector();
}
-
- return bucketId % totalBuckets;
+ return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketFiledInspectors, totalBuckets);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
index f81373e..5297c5d 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
@@ -23,7 +23,7 @@ public class TestBucketIdResolverImpl {
public void testAttachBucketIdToRecord() {
MutableRecord record = new MutableRecord(1, "hello");
capturingBucketIdResolver.attachBucketIdToRecord(record);
- assertThat(record.rowId, is(new RecordIdentifier(-1L, 8, -1L)));
+ assertThat(record.rowId, is(new RecordIdentifier(-1L, 1, -1L)));
assertThat(record.id, is(1));
assertThat(record.msg.toString(), is("hello"));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 39944a9..e247673 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -791,12 +791,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
if (!multiFileSpray) {
return 0;
} else {
- int keyHashCode = 0;
- for (int i = 0; i < partitionEval.length; i++) {
- Object o = partitionEval[i].evaluate(row);
- keyHashCode = keyHashCode * 31
- + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+ Object[] bucketFieldValues = new Object[partitionEval.length];
+ for(int i = 0; i < partitionEval.length; i++) {
+ bucketFieldValues[i] = partitionEval[i].evaluate(row);
}
+ int keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors);
key.setHashCode(keyHashCode);
int bucketNum = prtner.getBucket(key, null, totalFiles);
return bucketMap.get(bucketNum);
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index f1df608..dd08210 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -405,27 +405,24 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
}
private int computeBucketNumber(Object row, int numBuckets) throws HiveException {
- int buckNum = 0;
-
if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
conf.getWriteType() == AcidUtils.Operation.DELETE) {
- // We don't need to evalute the hash code. Instead read the bucket number directly from
+ // We don't need to evaluate the hash code. Instead read the bucket number directly from
// the row. I don't need to evaluate any expressions as I know I am reading the ROW__ID
// column directly.
Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
- buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
+ int buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
if (isLogTraceEnabled) {
LOG.trace("Acid choosing bucket number " + buckNum);
}
+ return buckNum;
} else {
+ Object[] bucketFieldValues = new Object[bucketEval.length];
for (int i = 0; i < bucketEval.length; i++) {
- Object o = bucketEval[i].evaluate(row);
- buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]);
+ bucketFieldValues[i] = bucketEval[i].evaluate(row);
}
+ return ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketObjectInspectors, numBuckets);
}
-
- // similar to hive's default partitioner, refer DefaultHivePartitioner
- return (buckNum & Integer.MAX_VALUE) % numBuckets;
}
private void populateCachedDistributionKeys(Object row, int index) throws HiveException {
@@ -476,11 +473,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
keyHashCode = 1;
}
} else {
- for (int i = 0; i < partitionEval.length; i++) {
- Object o = partitionEval[i].evaluate(row);
- keyHashCode = keyHashCode * 31
- + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+ Object[] bucketFieldValues = new Object[partitionEval.length];
+ for(int i = 0; i < partitionEval.length; i++) {
+ bucketFieldValues[i] = partitionEval[i].evaluate(row);
}
+ keyHashCode = ObjectInspectorUtils.getBucketHashCode(bucketFieldValues, partitionObjectInspectors);
}
int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
if (isLogTraceEnabled) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
index 6a91cb8..6a14fb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.io;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.mapred.lib.HashPartitioner;
/** Partition keys by their {@link Object#hashCode()}. */
@@ -26,7 +27,7 @@ public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2> impl
/** Use {@link Object#hashCode()} to partition. */
@Override
public int getBucket(K2 key, V2 value, int numBuckets) {
- return (key.hashCode() & Integer.MAX_VALUE) % numBuckets;
+ return ObjectInspectorUtils.getBucketNumber(key.hashCode(), numBuckets);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
index 474f404..fd1fe92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFHash.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.udf.generic;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -45,13 +44,11 @@ public class GenericUDFHash extends GenericUDF {
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
- // See
- // http://java.sun.com/j2se/1.5.0/docs/api/java/util/List.html#hashCode()
- int r = 0;
- for (int i = 0; i < arguments.length; i++) {
- r = r * 31
- + ObjectInspectorUtils.hashCode(arguments[i].get(), argumentOIs[i]);
+ Object[] fieldValues = new Object[arguments.length];
+ for(int i = 0; i < arguments.length; i++) {
+ fieldValues[i] = arguments[i].get();
}
+ int r = ObjectInspectorUtils.getBucketHashCode(fieldValues, argumentOIs);
result.set(r);
return result;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 8a53ec5..68c6542 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -20,13 +20,15 @@ package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.metadata.DummyPartition;import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.DummyPartition;
+import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
index 54ae48e..09e9108 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
@@ -502,18 +502,23 @@ public final class ObjectInspectorUtils {
* @return the bucket number
*/
public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) {
- int hashCode = getBucketHashCode(bucketFields, bucketFieldInspectors);
- int bucketID = (hashCode & Integer.MAX_VALUE) % totalBuckets;
- return bucketID;
+ return getBucketNumber(getBucketHashCode(bucketFields, bucketFieldInspectors), totalBuckets);
}
/**
+ * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL+BucketedTables
+ * @param hashCode as produced by {@link #getBucketHashCode(Object[], ObjectInspector[])}
+ */
+ public static int getBucketNumber(int hashCode, int numberOfBuckets) {
+ return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
+ }
+ /**
* Computes the hash code for the given bucketed fields
* @param bucketFields
* @param bucketFieldInspectors
* @return
*/
- private static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) {
+ public static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) {
int hashCode = 0;
for (int i = 0; i < bucketFields.length; i++) {
int fieldHash = ObjectInspectorUtils.hashCode(bucketFields[i], bucketFieldInspectors[i]);
http://git-wip-us.apache.org/repos/asf/hive/blob/ba83fd7b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
index ade0ef7..cf73b28 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestObjectInspectorUtils.java
@@ -131,4 +131,29 @@ public class TestObjectInspectorUtils extends TestCase {
}
}
+ public void testBucketIdGeneration() {
+ ArrayList<String> fieldNames = new ArrayList<String>();
+ fieldNames.add("firstInteger");
+ fieldNames.add("secondString");
+ fieldNames.add("thirdBoolean");
+ ArrayList<ObjectInspector> fieldObjectInspectors = new ArrayList<ObjectInspector>();
+ fieldObjectInspectors
+ .add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
+ fieldObjectInspectors
+ .add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ fieldObjectInspectors
+ .add(PrimitiveObjectInspectorFactory.javaBooleanObjectInspector);
+
+ StandardStructObjectInspector soi1 = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldObjectInspectors);
+ ArrayList<Object> struct = new ArrayList<Object>(3);
+ struct.add(1);
+ struct.add("two");
+ struct.add(true);
+
+ int hashCode = ObjectInspectorUtils.getBucketHashCode(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()]));
+ assertEquals("", 3574518, hashCode);
+ int bucketId = ObjectInspectorUtils.getBucketNumber(struct.toArray(), fieldObjectInspectors.toArray(new ObjectInspector[fieldObjectInspectors.size()]), 16);
+ assertEquals("", 6, bucketId);
+ assertEquals("", bucketId, ObjectInspectorUtils.getBucketNumber(hashCode, 16));
+ }
}