You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/10/10 19:42:15 UTC

[3/3] hive git commit: HIVE-12003 Hive Streaming API : Add check to ensure table is transactional(Roshan Naik via Eugene Koifman)

HIVE-12003 Hive Streaming API : Add check to ensure table is transactional(Roshan Naik via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ec8c793c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ec8c793c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ec8c793c

Branch: refs/heads/master
Commit: ec8c793c3bfc6edafada2329939553e5cd6cb0f3
Parents: ba83fd7
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Oct 10 10:11:48 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Oct 10 10:11:48 2015 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/HiveEndPoint.java   | 21 ++++++++++
 .../hive/hcatalog/streaming/InvalidTable.java   |  8 ++++
 .../hive/hcatalog/streaming/TestStreaming.java  | 41 +++++++++++++++++++-
 .../hive/ql/txn/compactor/TestCompactor.java    | 13 ++++---
 4 files changed, 76 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 7e99008..5de3f1d 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -49,6 +49,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Information about the hive end point (i.e. table or partition) to write to.
@@ -272,11 +273,31 @@ public class HiveEndPoint {
       }
       this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
       this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
+      checkEndPoint(endPoint, msClient);
       if (createPart  &&  !endPoint.partitionVals.isEmpty()) {
         createPartitionIfNotExists(endPoint, msClient, conf);
       }
     }
 
+    private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) throws InvalidTable {
+      // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
+      try {
+        Table t = msClient.getTable(endPoint.database, endPoint.table);
+        Map<String, String> params = t.getParameters();
+        if(params != null) {
+          String transactionalProp = params.get("transactional");
+          if (transactionalProp != null && transactionalProp.equalsIgnoreCase("true")) {
+            return;
+          }
+        }
+        LOG.error("'transactional' property is not set on Table " + endPoint);
+        throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property is not set on Table");
+      } catch (Exception e) {
+        LOG.warn("Unable to check if Table is transactional. " + endPoint, e);
+        throw new InvalidTable(endPoint.database, endPoint.table, e);
+      }
+    }
+
     /**
      * Close connection
      */

http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
index 903c37e..98ef688 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/InvalidTable.java
@@ -27,4 +27,12 @@ public class InvalidTable extends StreamingException {
   public InvalidTable(String db, String table) {
     super(makeMsg(db,table), null);
   }
+
+  public InvalidTable(String db, String table, String msg) {
+    super(msg);
+  }
+
+  public InvalidTable(String db, String table, Exception inner) {
+    super(inner.getMessage(), inner);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 340ab6c..d9a7eae 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -388,6 +388,44 @@ public class TestStreaming {
   }
 
 
+  @Test
+  public void testTableValidation() throws Exception {
+    int bucketCount = 100;
+
+    String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+    String tbl1 = "validation1";
+    String tbl2 = "validation2";
+
+    String tableLoc  = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
+    String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
+
+    runDDL(driver, "create database testBucketing3");
+    runDDL(driver, "use testBucketing3");
+
+    runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into "
+            + bucketCount + " buckets  stored as orc  location " + tableLoc) ;
+
+    runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into "
+            + bucketCount + " buckets  stored as orc  location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')") ;
+
+
+    try {
+      HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null);
+      endPt.newConnection(false);
+      Assert.assertTrue("InvalidTable exception was not thrown", false);
+    } catch (InvalidTable e) {
+      // expecting this exception
+    }
+    try {
+      HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null);
+      endPt.newConnection(false);
+      Assert.assertTrue("InvalidTable exception was not thrown", false);
+    } catch (InvalidTable e) {
+      // expecting this exception
+    }
+  }
+
+
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                 String... records) throws Exception {
     ValidTxnList txns = msClient.getValidTxns();
@@ -1191,7 +1229,8 @@ public class TestStreaming {
             " clustered by ( " + join(bucketCols, ",") + " )" +
             " into " + bucketCount + " buckets " +
             " stored as orc " +
-            " location '" + tableLoc +  "'";
+            " location '" + tableLoc +  "'" +
+            " TBLPROPERTIES ('transactional'='true') ";
     runDDL(driver, crtTbl);
     if(partNames!=null && partNames.length!=0) {
       return addPartition(driver, tableName, partVals, partNames);

http://git-wip-us.apache.org/repos/asf/hive/blob/ec8c793c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index abca1ce..e2910dd 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -154,11 +154,12 @@ public class TestCompactor {
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
       " PARTITIONED BY(bkt INT)" +
       " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed
-      " STORED AS ORC", driver);
+      " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
     executeStatementOnDriver("CREATE EXTERNAL TABLE " + tblNameStg + "(a INT, b STRING)" +
       " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'" +
       " STORED AS TEXTFILE" +
-      " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'", driver);
+      " LOCATION '" + stagingFolder.newFolder().toURI().getPath() + "'" +
+      " TBLPROPERTIES ('transactional'='true')", driver);
 
     executeStatementOnDriver("load data local inpath '" + BASIC_FILE_NAME +
       "' overwrite into table " + tblNameStg, driver);
@@ -411,7 +412,7 @@ public class TestCompactor {
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
-        " STORED AS ORC", driver);
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
 
     HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -468,7 +469,7 @@ public class TestCompactor {
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
-        " STORED AS ORC", driver);
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true') ", driver);
 
     HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -516,7 +517,7 @@ public class TestCompactor {
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
-        " STORED AS ORC", driver);
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
 
     HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
@@ -576,7 +577,7 @@ public class TestCompactor {
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
-        " STORED AS ORC", driver);
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
 
     HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);