You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/12 19:51:16 UTC
[08/10] hive git commit: HIVE-12003 Hive Streaming API : Add check to
ensure table is transactional(Roshan Naik via Eugene Koifman)
HIVE-12003 Hive Streaming API : Add check to ensure table is transactional(Roshan Naik via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ec8c793c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ec8c793c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ec8c793c
Branch: refs/heads/llap
Commit: ec8c793c3bfc6edafada2329939553e5cd6cb0f3
Parents: ba83fd7
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Oct 10 10:11:48 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Oct 10 10:11:48 2015 -0700
----------------------------------------------------------------------
.../hive/hcatalog/streaming/HiveEndPoint.java | 21 ++++++++++
.../hive/hcatalog/streaming/InvalidTable.java | 8 ++++
.../hive/hcatalog/streaming/TestStreaming.java | 41 +++++++++++++++++++-
.../hive/ql/txn/compactor/TestCompactor.java | 13 ++++---
4 files changed, 76 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 7e99008..5de3f1d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -49,6 +49,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
/**
* Information about the hive end point (i.e. table or partition) to write to.
@@ -272,11 +273,31 @@ public class HiveEndPoint {
}
this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
+ checkEndPoint(endPoint, msClient);
if (createPart && !endPoint.partitionVals.isEmpty()) {
createPartitionIfNotExists(endPoint, msClient, conf);
}
}
+ private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) throws InvalidTable {
+ // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
+ try {
+ Table t = msClient.getTable(endPoint.database, endPoint.table);
+ Map<String, String> params = t.getParameters();
+ if(params != null) {
+ String transactionalProp = params.get("transactional");
+ if (transactionalProp != null && transactionalProp.equalsIgnoreCase("true")) {
+ return;
+ }
+ }
+ LOG.error("'transactional' property is not set on Table " + endPoint);
+ throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property is not set on Table");
+ } catch (Exception e) {
+ LOG.warn("Unable to check if Table is transactional. " + endPoint, e);
+ throw new InvalidTable(endPoint.database, endPoint.table, e);
+ }
+ }
+
/**
* Close connection
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
index 903c37e..98ef688 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
@@ -27,4 +27,12 @@ public class InvalidTable extends StreamingException {
public InvalidTable(String db, String table) {
super(makeMsg(db,table), null);
}
+
+ public InvalidTable(String db, String table, String msg) {
+ super(msg);
+ }
+
+ public InvalidTable(String db, String table, Exception inner) {
+ super(inner.getMessage(), inner);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 340ab6c..d9a7eae 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -388,6 +388,44 @@ public class TestStreaming {
}
+ @Test
+ public void testTableValidation() throws Exception {
+ int bucketCount = 100;
+
+ String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+ String tbl1 = "validation1";
+ String tbl2 = "validation2";
+
+ String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
+ String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
+
+ runDDL(driver, "create database testBucketing3");
+ runDDL(driver, "use testBucketing3");
+
+ runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc) ;
+
+ runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')") ;
+
+
+ try {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null);
+ endPt.newConnection(false);
+ Assert.assertTrue("InvalidTable exception was not thrown", false);
+ } catch (InvalidTable e) {
+ // expecting this exception
+ }
+ try {
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null);
+ endPt.newConnection(false);
+ Assert.assertTrue("InvalidTable exception was not thrown", false);
+ } catch (InvalidTable e) {
+ // expecting this exception
+ }
+ }
+
+
private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
ValidTxnList txns = msClient.getValidTxns();
@@ -1191,7 +1229,8 @@ public class TestStreaming {
" clustered by ( " + join(bucketCols, ",") + " )" +
" into " + bucketCount + " buckets " +
" stored as orc " +
- " location '" + tableLoc + "'";
+ " location '" + tableLoc + "'" +
+ " TBLPROPERTIES ('transactional'='true') ";
runDDL(driver, crtTbl);
if(partNames!=null && partNames.length!=0) {
return addPartition(driver, tableName, partVals, partNames);
http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index abca1ce..e2910dd 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -154,11 +154,12 @@ public class TestCompactor {
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" PARTITIONED BY(bkt INT)" +
" CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
executeStatementOnDriver("CREATE EXTERNAL TABLE " + tblNameStg + "(a INT, b STRING)" +
" ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'" +
" STORED AS TEXTFILE" +
- " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'", driver);
+ " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'" +
+ " TBLPROPERTIES ('transactional'='true')", driver);
executeStatementOnDriver("load data local inpath '" + BASIC_FILE_NAME +
"' overwrite into table " + tblNameStg, driver);
@@ -411,7 +412,7 @@ public class TestCompactor {
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -468,7 +469,7 @@ public class TestCompactor {
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true') ", driver);
HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -516,7 +517,7 @@ public class TestCompactor {
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -576,7 +577,7 @@ public class TestCompactor {
executeStatementOnDriver("drop table if exists " + tblName, driver);
executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC", driver);
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);