You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/11/12 04:59:40 UTC
[16/55] [abbrv] hive git commit: HIVE-12252 Streaming API
HiveEndPoint can be created w/o partitionVals for partitioned table (Wei
Zheng via Eugene Koifman)
HIVE-12252 Streaming API HiveEndPoint can be created w/o partitionVals for partitioned table (Wei Zheng via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0918ff95
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0918ff95
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0918ff95
Branch: refs/heads/spark
Commit: 0918ff959e6b0fd67a6b8b478290436af9532a31
Parents: 6d936b5
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Nov 5 10:07:30 2015 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Nov 5 10:07:30 2015 -0800
----------------------------------------------------------------------
.../hcatalog/streaming/ConnectionError.java | 4 ++
.../hive/hcatalog/streaming/HiveEndPoint.java | 51 +++++++++++++++-----
.../hive/hcatalog/streaming/TestStreaming.java | 35 +++++++++++---
3 files changed, 71 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
index 1aeef76..ffa51c9 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/ConnectionError.java
@@ -20,6 +20,10 @@ package org.apache.hive.hcatalog.streaming;
public class ConnectionError extends StreamingException {
+ public ConnectionError(String msg) {
+ super(msg);
+ }
+
public ConnectionError(String msg, Exception innerEx) {
super(msg, innerEx);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 306c93d..2f2d44a 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -279,23 +279,48 @@ public class HiveEndPoint {
}
}
- private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient) throws InvalidTable {
- // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
+ /**
+ * Checks the validity of endpoint
+ * @param endPoint the HiveEndPoint to be checked
+ * @param msClient the metastore client
+ * @throws InvalidTable
+ */
+ private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient)
+ throws InvalidTable, ConnectionError {
+ Table t;
try {
- Table t = msClient.getTable(endPoint.database, endPoint.table);
- Map<String, String> params = t.getParameters();
- if(params != null) {
- String transactionalProp = params.get("transactional");
- if (transactionalProp != null && transactionalProp.equalsIgnoreCase("true")) {
- return;
- }
- }
- LOG.error("'transactional' property is not set on Table " + endPoint);
- throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property is not set on Table");
+ t = msClient.getTable(endPoint.database, endPoint.table);
} catch (Exception e) {
- LOG.warn("Unable to check if Table is transactional. " + endPoint, e);
+ LOG.warn("Unable to check the endPoint: " + endPoint, e);
throw new InvalidTable(endPoint.database, endPoint.table, e);
}
+
+ // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
+ Map<String, String> params = t.getParameters();
+ if (params != null) {
+ String transactionalProp = params.get("transactional");
+ if (transactionalProp == null || !transactionalProp.equalsIgnoreCase("true")) {
+ LOG.error("'transactional' property is not set on Table " + endPoint);
+ throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property" +
+ " is not set on Table"); }
+ }
+
+ // 2 - check if partitionvals are legitimate
+ if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty()
+ && endPoint.partitionVals.isEmpty()) {
+ // Invalid if table is partitioned, but endPoint's partitionVals is empty
+ String errMsg = "HiveEndPoint " + endPoint + " doesn't specify any partitions for " +
+ "partitioned table";
+ LOG.error(errMsg);
+ throw new ConnectionError(errMsg);
+ }
+ if ((t.getPartitionKeys() == null || t.getPartitionKeys().isEmpty())
+ && !endPoint.partitionVals.isEmpty()) {
+ // Invalid if table is not partitioned, but endPoint's partitionVals is not empty
+ String errMsg = "HiveEndPoint" + endPoint + " specifies partitions for unpartitioned table";
+ LOG.error(errMsg);
+ throw new ConnectionError(errMsg);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/0918ff95/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index d9a7eae..58cfbaa 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -204,7 +204,7 @@ public class TestStreaming {
dropDB(msClient, dbName2);
String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
- partLoc2 = createDbAndTable(driver, dbName2, tblName2, partitionVals, colNames, colTypes, bucketCols, partNames, loc2, 2);
+ partLoc2 = createDbAndTable(driver, dbName2, tblName2, null, colNames, colTypes, bucketCols, null, loc2, 2);
String loc3 = dbFolder.newFolder("testing5.db").toString();
createStoreSales("testing5", loc3);
@@ -477,15 +477,38 @@ public class TestStreaming {
@Test
public void testEndpointConnection() throws Exception {
- // 1) Basic
- HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
- , partitionVals);
+ // For partitioned table, partitionVals are specified
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
StreamingConnection connection = endPt.newConnection(false, null); //shouldn't throw
connection.close();
- // 2) Leave partition unspecified
- endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+ // For unpartitioned table, partitionVals are not specified
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
endPt.newConnection(false, null).close(); // should not throw
+
+ // For partitioned table, partitionVals are not specified
+ try {
+ endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+ connection = endPt.newConnection(true);
+ Assert.assertTrue("ConnectionError was not thrown", false);
+ connection.close();
+ } catch (ConnectionError e) {
+ // expecting this exception
+ String errMsg = "doesn't specify any partitions for partitioned table";
+ Assert.assertTrue(e.toString().endsWith(errMsg));
+ }
+
+ // For unpartitioned table, partition values are specified
+ try {
+ endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals);
+ connection = endPt.newConnection(false);
+ Assert.assertTrue("ConnectionError was not thrown", false);
+ connection.close();
+ } catch (ConnectionError e) {
+ // expecting this exception
+ String errMsg = "specifies partitions for unpartitioned table";
+ Assert.assertTrue(e.toString().endsWith(errMsg));
+ }
}
@Test