You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/04/25 19:02:34 UTC
hive git commit: HIVE-19260 - Streaming Ingest API doesn't normalize
db.table names (Eugene Koifman, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/branch-3 656af1411 -> cfdb272c7
HIVE-19260 - Streaming Ingest API doesn't normalize db.table names (Eugene Koifman, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cfdb272c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cfdb272c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cfdb272c
Branch: refs/heads/branch-3
Commit: cfdb272c79bbdfa8b3c0c6481ec057cd91d53ad7
Parents: 656af14
Author: Eugene Koifman <ek...@apache.org>
Authored: Wed Apr 25 12:02:19 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Wed Apr 25 12:02:19 2018 -0700
----------------------------------------------------------------------
.../hive/hcatalog/streaming/HiveEndPoint.java | 4 +-
.../hive/hcatalog/streaming/TestStreaming.java | 11 +-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 119 +++++++++++++++++--
.../hadoop/hive/ql/txn/compactor/Initiator.java | 2 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 22 ++--
.../apache/hive/streaming/TestStreaming.java | 9 +-
6 files changed, 139 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 6d248ea..8582e9a 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -88,13 +88,13 @@ public class HiveEndPoint {
if (database==null) {
throw new IllegalArgumentException("Database cannot be null for HiveEndPoint");
}
- this.database = database;
- this.table = table;
+ this.database = database.toLowerCase();
if (table==null) {
throw new IllegalArgumentException("Table cannot be null for HiveEndPoint");
}
this.partitionVals = partitionVals==null ? new ArrayList<String>()
: new ArrayList<String>( partitionVals );
+ this.table = table.toLowerCase();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 3733e3d..fe2b1c1 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -67,6 +67,8 @@ import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -353,10 +355,10 @@ public class TestStreaming {
//todo: why does it need transactional_properties?
queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')");
queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')");
- List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
+ List<String> rs = queryTable(driver, "select * from default.streamingNoBuckets");
Assert.assertEquals(1, rs.size());
Assert.assertEquals("foo\tbar", rs.get(0));
- HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null);
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "Default", "StreamingNoBuckets", null);
String[] colNames1 = new String[] { "a", "b" };
StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt, connection);
@@ -365,6 +367,11 @@ public class TestStreaming {
txnBatch.beginNextTransaction();
txnBatch.write("a1,b2".getBytes());
txnBatch.write("a3,b4".getBytes());
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest());
+ Assert.assertEquals(resp.getLocksSize(), 1);
+ Assert.assertEquals("streamingnobuckets", resp.getLocks().get(0).getTablename());
+ Assert.assertEquals("default", resp.getLocks().get(0).getDbname());
txnBatch.commit();
txnBatch.beginNextTransaction();
txnBatch.write("a5,b6".getBytes());
http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 4d51bbc..fe6d2d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.slf4j.Logger;
@@ -43,8 +44,11 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -125,7 +129,9 @@ public class Cleaner extends CompactorThread {
}
if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());
-
+ if(LOG.isDebugEnabled()) {
+ dumpLockState(locksResponse);
+ }
for (CompactionInfo ci : toClean) {
// Check to see if we have seen this request before. If so, ignore it. If not,
// add it to our queue.
@@ -163,6 +169,9 @@ public class Cleaner extends CompactorThread {
for (Long lockId : expiredLocks) {
queueEntry.getValue().remove(lockId);
}
+ LOG.info("Skipping cleaning of " +
+ idWatermark(compactId2CompactInfoMap.get(queueEntry.getKey())) +
+ " due to reader present: " + queueEntry.getValue());
}
}
} finally {
@@ -203,9 +212,18 @@ public class Cleaner extends CompactorThread {
private Set<Long> findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) {
Set<Long> relatedLocks = new HashSet<Long>();
for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
- if (ci.dbname.equals(lock.getDbname())) {
+ /**
+ * Hive QL is not case sensitive wrt db/table/column names
+ * Partition names get
+ * normalized (as far as I can tell) by lower casing column name but not partition value.
+ * {@link org.apache.hadoop.hive.metastore.Warehouse#makePartName(List, List, String)}
+ * {@link org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer#getPartSpec(ASTNode)}
+ * Since user input may start out in any case, compare here case-insensitive for db/table
+ * but leave partition name as is.
+ */
+ if (ci.dbname.equalsIgnoreCase(lock.getDbname())) {
if ((ci.tableName == null && lock.getTablename() == null) ||
- (ci.tableName != null && ci.tableName.equals(lock.getTablename()))) {
+ (ci.tableName != null && ci.tableName.equalsIgnoreCase(lock.getTablename()))) {
if ((ci.partName == null && lock.getPartname() == null) ||
(ci.partName != null && ci.partName.equals(lock.getPartname()))) {
relatedLocks.add(lock.getLockid());
@@ -226,12 +244,13 @@ public class Cleaner extends CompactorThread {
}
private void clean(CompactionInfo ci) throws MetaException {
- LOG.info("Starting cleaning for " + ci.getFullPartitionName());
+ LOG.info("Starting cleaning for " + ci);
try {
Table t = resolveTable(ci);
if (t == null) {
// The table was dropped before we got around to cleaning it.
- LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped");
+ LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." +
+ idWatermark(ci));
txnHandler.markCleaned(ci);
return;
}
@@ -241,7 +260,7 @@ public class Cleaner extends CompactorThread {
if (p == null) {
// The partition was dropped before we got around to cleaning it.
LOG.info("Unable to find partition " + ci.getFullPartitionName() +
- ", assuming it was dropped");
+ ", assuming it was dropped." + idWatermark(ci));
txnHandler.markCleaned(ci);
return;
}
@@ -271,7 +290,7 @@ public class Cleaner extends CompactorThread {
: new ValidReaderWriteIdList();
if (runJobAsSelf(ci.runAs)) {
- removeFiles(location, validWriteIdList);
+ removeFiles(location, validWriteIdList, ci);
} else {
LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName());
UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
@@ -279,7 +298,7 @@ public class Cleaner extends CompactorThread {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- removeFiles(location, validWriteIdList);
+ removeFiles(location, validWriteIdList, ci);
return null;
}
});
@@ -287,7 +306,7 @@ public class Cleaner extends CompactorThread {
FileSystem.closeAllForUGI(ugi);
} catch (IOException exception) {
LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " +
- ci.getFullPartitionName(), exception);
+ ci.getFullPartitionName() + idWatermark(ci), exception);
}
}
txnHandler.markCleaned(ci);
@@ -297,20 +316,35 @@ public class Cleaner extends CompactorThread {
txnHandler.markFailed(ci);
}
}
-
- private void removeFiles(String location, ValidWriteIdList writeIdList) throws IOException {
- AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList);
+ private static String idWatermark(CompactionInfo ci) {
+ return " id=" + ci.id;
+ }
+ private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci)
+ throws IOException {
+ Path locPath = new Path(location);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList);
List<FileStatus> obsoleteDirs = dir.getObsolete();
List<Path> filesToDelete = new ArrayList<Path>(obsoleteDirs.size());
+ StringBuilder extraDebugInfo = new StringBuilder("[");
for (FileStatus stat : obsoleteDirs) {
filesToDelete.add(stat.getPath());
+ extraDebugInfo.append(stat.getPath().getName()).append(",");
+ if(!FileUtils.isPathWithinSubtree(stat.getPath(), locPath)) {
+ LOG.info(idWatermark(ci) + " found unexpected file: " + stat.getPath());
+ }
}
+ extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']');
+ List<Long> compactIds = new ArrayList<>(compactId2CompactInfoMap.keySet());
+ Collections.sort(compactIds);
+ extraDebugInfo.append("compactId2CompactInfoMap.keySet(").append(compactIds).append(")");
+ LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() +
+ " obsolete directories from " + location + ". " + extraDebugInfo.toString());
if (filesToDelete.size() < 1) {
LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location +
", that hardly seems right.");
return;
}
- LOG.info("About to remove " + filesToDelete.size() + " obsolete directories from " + location);
+
FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
for (Path dead : filesToDelete) {
@@ -319,4 +353,63 @@ public class Cleaner extends CompactorThread {
fs.delete(dead, true);
}
}
+ private static class LockComparator implements Comparator<ShowLocksResponseElement> {
+ //sort ascending by resource, nulls first
+ @Override
+ public int compare(ShowLocksResponseElement o1, ShowLocksResponseElement o2) {
+ if(o1 == o2) {
+ return 0;
+ }
+ if(o1 == null) {
+ return -1;
+ }
+ if(o2 == null) {
+ return 1;
+ }
+ int v = o1.getDbname().compareToIgnoreCase(o2.getDbname());
+ if(v != 0) {
+ return v;
+ }
+ if(o1.getTablename() == null) {
+ return -1;
+ }
+ if(o2.getTablename() == null) {
+ return 1;
+ }
+ v = o1.getTablename().compareToIgnoreCase(o2.getTablename());
+ if(v != 0) {
+ return v;
+ }
+ if(o1.getPartname() == null) {
+ return -1;
+ }
+ if(o2.getPartname() == null) {
+ return 1;
+ }
+ v = o1.getPartname().compareToIgnoreCase(o2.getPartname());
+ if(v != 0) {
+ return v;
+ }
+ //if still equal, compare by lock ids
+ v = Long.compare(o1.getLockid(), o2.getLockid());
+ if(v != 0) {
+ return v;
+ }
+ return Long.compare(o1.getLockIdInternal(), o2.getLockIdInternal());
+
+ }
+ }
+ private void dumpLockState(ShowLocksResponse slr) {
+ Iterator<ShowLocksResponseElement> l = slr.getLocksIterator();
+ List<ShowLocksResponseElement> sortedList = new ArrayList<>();
+ while(l.hasNext()) {
+ sortedList.add(l.next());
+ }
+ //sort for readability
+ sortedList.sort(new LockComparator());
+ LOG.info("dumping locks");
+ for(ShowLocksResponseElement lock : sortedList) {
+ LOG.info(lock.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 22765b8..c95daaf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -248,7 +248,7 @@ public class Initiator extends CompactorThread {
if (runJobAsSelf(runAs)) {
return determineCompactionType(ci, writeIds, sd, tblproperties);
} else {
- LOG.info("Going to initiate as user " + runAs);
+ LOG.info("Going to initiate as user " + runAs + " for " + ci.getFullPartitionName());
UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
UserGroupInformation.getLoginUser());
CompactionType compactionType = ugi.doAs(new PrivilegedExceptionAction<CompactionType>() {
http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 39a0f31..db596a6 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1656,9 +1656,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if(!updateTxnComponents) {
continue;
}
- String dbName = lc.getDbname();
- String tblName = lc.getTablename();
- String partName = lc.getPartitionname();
+ String dbName = normalizeCase(lc.getDbname());
+ String tblName = normalizeCase(lc.getTablename());
+ String partName = normalizeCase(lc.getPartitionname());
Long writeId = null;
if (tblName != null) {
// It is assumed the caller have already allocated write id for adding/updating data to
@@ -1666,8 +1666,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// may return empty result sets.
// Get the write id allocated by this txn for the given table writes
s = "select t2w_writeid from TXN_TO_WRITE_ID where"
- + " t2w_database = " + quoteString(dbName.toLowerCase())
- + " and t2w_table = " + quoteString(tblName.toLowerCase())
+ + " t2w_database = " + quoteString(dbName)
+ + " and t2w_table = " + quoteString(tblName)
+ " and t2w_txnid = " + txnid;
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
@@ -1704,9 +1704,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
+ lc + " agentInfo=" + rqst.getAgentInfo());
}
intLockId++;
- String dbName = lc.getDbname();
- String tblName = lc.getTablename();
- String partName = lc.getPartitionname();
+ String dbName = normalizeCase(lc.getDbname());
+ String tblName = normalizeCase(lc.getTablename());
+ String partName = normalizeCase(lc.getPartitionname());
LockType lockType = lc.getType();
char lockChar = 'z';
switch (lockType) {
@@ -1764,6 +1764,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
return enqueueLockWithRetry(rqst);
}
}
+ private static String normalizeCase(String s) {
+ return s == null ? null : s.toLowerCase();
+ }
private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId)
throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException {
try {
@@ -2385,7 +2388,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Long writeId = rqst.getWriteid();
List<String> rows = new ArrayList<>();
for (String partName : rqst.getPartitionnames()) {
- rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
+ rows.add(rqst.getTxnid() + "," + quoteString(normalizeCase(rqst.getDbname()))
+ + "," + quoteString(normalizeCase(rqst.getTablename())) +
"," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + "," + writeId);
}
int modCount = 0;
http://git-wip-us.apache.org/repos/asf/hive/blob/cfdb272c/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index e5dd3b3..3343d10 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -69,6 +69,8 @@ import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -356,7 +358,7 @@ public class TestStreaming {
List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
Assert.assertEquals(1, rs.size());
Assert.assertEquals("foo\tbar", rs.get(0));
- HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null);
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "Default", "streamingNoBuckets", null);
String[] colNames1 = new String[] { "a", "b" };
StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt, connection);
@@ -365,6 +367,11 @@ public class TestStreaming {
txnBatch.beginNextTransaction();
txnBatch.write("a1,b2".getBytes());
txnBatch.write("a3,b4".getBytes());
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest());
+ Assert.assertEquals(resp.getLocksSize(), 1);
+ Assert.assertEquals("streamingnobuckets", resp.getLocks().get(0).getTablename());
+ Assert.assertEquals("default", resp.getLocks().get(0).getDbname());
txnBatch.commit();
txnBatch.beginNextTransaction();
txnBatch.write("a5,b6".getBytes());