You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC
svn commit: r1635536 [4/28] - in /hive/branches/spark: ./ accumulo-handler/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoo...
Modified: hive/branches/spark/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf (original)
+++ hive/branches/spark/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf Thu Oct 30 16:22:33 2014
@@ -524,7 +524,7 @@ $cfg =
'num' => 1,
'method' => 'POST',
'url' => ':TEMPLETON_URL:/templeton/v1/sqoop?user.name=:UNAME:',
- 'post_options' => ['command=export --connect :DB_CONNECTION_STRING: --username :DB_USER_NAME: --password :DB_PASSWORD: --export-dir :INPDIR_HDFS:/sqoop --table person','statusdir=sqoop.output' ],
+ 'post_options' => ['libdir=hdfs:///apps/templeton/jdbc', 'command=export --connect :DB_CONNECTION_STRING: --username :DB_USER_NAME: --password :DB_PASSWORD: --export-dir :INPDIR_HDFS:/sqoop --table person','statusdir=TestSqoop_:TNUM:' ],
'json_field_substr_match' => { 'id' => '\d+'},
#results
'status_code' => 200,
@@ -539,7 +539,7 @@ $cfg =
'num' => 2,
'method' => 'POST',
'url' => ':TEMPLETON_URL:/templeton/v1/sqoop?user.name=:UNAME:',
- 'post_options' => ['files=:INPDIR_HDFS:/sqoopcommand.txt','command=import --connect :DB_CONNECTION_STRING: --username :DB_USER_NAME: --password :DB_PASSWORD: --options-file sqoopcommand.txt','statusdir=sqoop.output' ],
+ 'post_options' => ['libdir=hdfs:///apps/templeton/jdbc', 'files=:INPDIR_HDFS:/sqoopcommand.txt','command=import --connect :DB_CONNECTION_STRING: --username :DB_USER_NAME: --password :DB_PASSWORD: --options-file sqoopcommand.txt','statusdir=TestSqoop_:TNUM:' ],
'json_field_substr_match' => { 'id' => '\d+'},
#results
'status_code' => 200,
@@ -554,7 +554,7 @@ $cfg =
'num' => 3,
'method' => 'POST',
'url' => ':TEMPLETON_URL:/templeton/v1/sqoop?user.name=:UNAME:',
- 'post_options' => ['files=:INPDIR_HDFS:/sqoopcommand.txt','statusdir=sqoop.output' ],
+ 'post_options' => ['files=:INPDIR_HDFS:/sqoopcommand.txt','statusdir=TestSqoop_:TNUM:' ],
'json_field_substr_match' => { 'error' => 'Must define Sqoop command or a optionsfile contains Sqoop command to run Sqoop job.'},
#results
'status_code' => 400,
@@ -564,7 +564,7 @@ $cfg =
'num' => 4,
'method' => 'POST',
'url' => ':TEMPLETON_URL:/templeton/v1/sqoop?user.name=:UNAME:',
- 'post_options' => ['optionsfile=:INPDIR_HDFS:/sqoopcommand.txt','command=import --connect :DB_CONNECTION_STRING: --username :DB_USER_NAME: --password :DB_PASSWORD: --options-file sqoopcommand.txt','statusdir=sqoop.output' ],
+ 'post_options' => ['optionsfile=:INPDIR_HDFS:/sqoopcommand.txt','command=import --connect :DB_CONNECTION_STRING: --username :DB_USER_NAME: --password :DB_PASSWORD: --options-file sqoopcommand.txt','statusdir=TestSqoop_:TNUM:' ],
'json_field_substr_match' => { 'error' => 'Cannot set command and optionsfile at the same time.'},
#results
'status_code' => 400,
Modified: hive/branches/spark/hcatalog/streaming/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/streaming/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/streaming/pom.xml (original)
+++ hive/branches/spark/hcatalog/streaming/pom.xml Thu Oct 30 16:22:33 2014
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java (original)
+++ hive/branches/spark/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java Thu Oct 30 16:22:33 2014
@@ -130,11 +130,16 @@ public class HiveEndPoint {
}
/**
- * Acquire a new connection to MetaStore for streaming
+ * Acquire a new connection to MetaStore for streaming. To connect using Kerberos,
+ * 'authenticatedUser' argument should have been used to do a kerberos login. Additionally the
+ * 'hive.metastore.kerberos.principal' setting should be set correctly either in hive-site.xml or
+ * in the 'conf' argument (if not null). If using hive-site.xml, it should be in classpath.
+ *
* @param createPartIfNotExists If true, the partition specified in the endpoint
* will be auto created if it does not exist
+ * @param conf HiveConf object to be used for the connection. Can be null.
* @param authenticatedUser UserGroupInformation object obtained from successful authentication.
- * Uses insecure mode if this argument is null.
+ * Uses non-secure mode if this argument is null.
* @return
* @throws ConnectionError if there is a connection problem
* @throws InvalidPartition if specified partition is not valid (createPartIfNotExists = false)
@@ -240,6 +245,7 @@ public class HiveEndPoint {
private final HiveEndPoint endPt;
private final UserGroupInformation ugi;
private final String username;
+ private final boolean secureMode;
/**
* @param endPoint end point to connect to
@@ -261,7 +267,11 @@ public class HiveEndPoint {
if (conf==null) {
conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri);
}
- this.msClient = getMetaStoreClient(endPoint, conf);
+ else {
+ overrideConfSettings(conf);
+ }
+ this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
+ this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
if (createPart && !endPoint.partitionVals.isEmpty()) {
createPartitionIfNotExists(endPoint, msClient, conf);
}
@@ -425,13 +435,15 @@ public class HiveEndPoint {
return buff.toString();
}
- private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf)
+ private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf, boolean secureMode)
throws ConnectionError {
if (endPoint.metaStoreUri!= null) {
conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri);
}
-
+ if(secureMode) {
+ conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL,true);
+ }
try {
return new HiveMetaStoreClient(conf);
} catch (MetaException e) {
@@ -828,14 +840,35 @@ public class HiveEndPoint {
static HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
HiveConf conf = new HiveConf(clazz);
- conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER,
- "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
- conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
- conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
if (metaStoreUri!= null) {
- conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+ setHiveConf(conf, HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
}
+ HiveEndPoint.overrideConfSettings(conf);
return conf;
}
+ private static void overrideConfSettings(HiveConf conf) {
+ setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER,
+ "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ setHiveConf(conf, HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+ // Avoids creating Tez Client sessions internally as it takes much longer currently
+ setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+ }
+
+ private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) {
+ if( LOG.isDebugEnabled() ) {
+ LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+ }
+ conf.setVar(var, value);
+ }
+
+ private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) {
+ if( LOG.isDebugEnabled() ) {
+ LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+ }
+ conf.setBoolVar(var, value);
+ }
+
+
} // class HiveEndPoint
Modified: hive/branches/spark/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java (original)
+++ hive/branches/spark/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java Thu Oct 30 16:22:33 2014
@@ -731,7 +731,7 @@ public class TestStreaming {
throws Exception {
Database db = new Database();
db.setName(databaseName);
- String dbLocation = "raw://" + dbFolder.newFolder(databaseName + ".db").getCanonicalPath();
+ String dbLocation = "raw://" + dbFolder.newFolder(databaseName + ".db").toURI().getPath();
db.setLocationUri(dbLocation);
client.createDatabase(db);
Modified: hive/branches/spark/hcatalog/webhcat/java-client/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/java-client/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/java-client/pom.xml (original)
+++ hive/branches/spark/hcatalog/webhcat/java-client/pom.xml Thu Oct 30 16:22:33 2014
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java (original)
+++ hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java Thu Oct 30 16:22:33 2014
@@ -329,8 +329,7 @@ public class HCatClientHMSImpl extends H
throws HCatException {
List<HCatPartition> hcatPtns = new ArrayList<HCatPartition>();
try {
- Table table = hmsClient.getTable(dbName, tblName);
- HCatTable hcatTable = new HCatTable(table);
+ HCatTable hcatTable = getTable(dbName, tblName);
List<Partition> hivePtns = hmsClient.listPartitions(
checkDB(dbName), tblName, (short) -1);
for (Partition ptn : hivePtns) {
@@ -374,7 +373,7 @@ public class HCatClientHMSImpl extends H
Map<String, String> partitionSpec) throws HCatException {
HCatPartition partition = null;
try {
- HCatTable hcatTable = getTable(checkDB(dbName), tableName);
+ HCatTable hcatTable = getTable(dbName, tableName);
List<HCatFieldSchema> partitionColumns = hcatTable.getPartCols();
if (partitionColumns.size() != partitionSpec.size()) {
throw new HCatException("Partition-spec doesn't have the right number of partition keys.");
@@ -494,7 +493,7 @@ public class HCatClientHMSImpl extends H
try {
HCatTable table = getTable(dbName, tblName);
List<Partition> hivePtns = hmsClient.listPartitionsByFilter(
- checkDB(dbName), tblName, filter, (short) -1);
+ table.getDbName(), table.getTableName(), filter, (short) -1);
for (Partition ptn : hivePtns) {
hcatPtns.add(new HCatPartition(table, ptn));
}
Modified: hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java (original)
+++ hive/branches/spark/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartition.java Thu Oct 30 16:22:33 2014
@@ -188,6 +188,15 @@ public class HCatPartition {
}
/**
+ * Gets the partition columns of the table.
+ *
+ * @return the partition columns
+ */
+ public List<HCatFieldSchema> getPartColumns() {
+ return hcatTable.getPartCols();
+ }
+
+ /**
* Gets the input format.
*
* @return the input format
Modified: hive/branches/spark/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java (original)
+++ hive/branches/spark/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java Thu Oct 30 16:22:33 2014
@@ -157,7 +157,7 @@ public class TestHCatClient {
assertTrue(testDb.getProperties().size() == 0);
String warehouseDir = System
.getProperty("test.warehouse.dir", "/user/hive/warehouse");
- String expectedDir = fixPath(warehouseDir).replaceFirst("pfile:///", "pfile:/");
+ String expectedDir = warehouseDir.replaceFirst("pfile:///", "pfile:/");
assertEquals(expectedDir + "/" + db + ".db", testDb.getLocation());
ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
cols.add(new HCatFieldSchema("id", Type.INT, "id comment"));
@@ -758,6 +758,11 @@ public class TestHCatClient {
assertEquals("Unexpected number of partitions.", 1, partitions.size());
assertArrayEquals("Mismatched partition.", new String[]{"2011_12_31", "AB"}, partitions.get(0).getValues().toArray());
+ List<HCatFieldSchema> partColumns = partitions.get(0).getPartColumns();
+ assertEquals(2, partColumns.size());
+ assertEquals("dt", partColumns.get(0).getName());
+ assertEquals("grid", partColumns.get(1).getName());
+
client.dropDatabase(dbName, false, HCatClient.DropDBMode.CASCADE);
}
catch (Exception unexpected) {
Modified: hive/branches/spark/hcatalog/webhcat/svr/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/pom.xml (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/pom.xml Thu Oct 30 16:22:33 2014
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
@@ -38,7 +38,7 @@
</properties>
<dependencies>
- <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+ <!-- dependencies are always listed in sorted order by groupId, artifactId -->
<!-- intra-project -->
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
@@ -73,6 +73,11 @@
<version>${commons-exec.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/bin/webhcat_server.sh
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/bin/webhcat_server.sh?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/bin/webhcat_server.sh (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/bin/webhcat_server.sh Thu Oct 30 16:22:33 2014
@@ -138,6 +138,9 @@ function start_webhcat() {
log "starting ..."
log "$start_cmd"
+ if [ ! -d ${WEBHCAT_LOG_DIR} ]; then
+ mkdir ${WEBHCAT_LOG_DIR}
+ fi
nohup $start_cmd >>$CONSOLE_LOG 2>>$ERROR_LOG &
local pid=$!
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java Thu Oct 30 16:22:33 2014
@@ -119,6 +119,7 @@ public class AppConfig extends Configura
public static final String HIVE_PROPS_NAME = "templeton.hive.properties";
public static final String SQOOP_ARCHIVE_NAME = "templeton.sqoop.archive";
public static final String SQOOP_PATH_NAME = "templeton.sqoop.path";
+ public static final String SQOOP_HOME_PATH = "templeton.sqoop.home";
public static final String LIB_JARS_NAME = "templeton.libjars";
public static final String PIG_ARCHIVE_NAME = "templeton.pig.archive";
public static final String PIG_PATH_NAME = "templeton.pig.path";
@@ -297,6 +298,7 @@ public class AppConfig extends Configura
public String hiveArchive() { return get(HIVE_ARCHIVE_NAME); }
public String sqoopPath() { return get(SQOOP_PATH_NAME); }
public String sqoopArchive() { return get(SQOOP_ARCHIVE_NAME); }
+ public String sqoopHome() { return get(SQOOP_HOME_PATH); }
public String streamingJar() { return get(STREAMING_JAR_NAME); }
public String kerberosSecret() { return get(KERBEROS_SECRET); }
public String kerberosPrincipal(){ return get(KERBEROS_PRINCIPAL); }
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java Thu Oct 30 16:22:33 2014
@@ -85,6 +85,9 @@ public class JarDelegator extends Launch
if (TempletonUtils.isset(libjars)) {
String libjarsListAsString =
TempletonUtils.hadoopFsListAsString(libjars, appConf, runAs);
+ //This will work only if the files are local files on webhcat server
+ // (which is not very useful since users might not have access to that file system).
+ //This is likely the HIVE-5188 issue
args.add("-libjars");
args.add(TempletonUtils.quoteForWindows(libjarsListAsString));
}
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java Thu Oct 30 16:22:33 2014
@@ -121,6 +121,8 @@ public class LauncherDelegator extends T
JobType jobType) {
ArrayList<String> args = new ArrayList<String>();
+ //note that in ToolRunner this is expected to be a local FS path
+ //see GenericOptionsParser.getLibJars()
args.add("-libjars");
// Include shim and admin specified libjars
@@ -136,6 +138,7 @@ public class LauncherDelegator extends T
// Internal vars
addDef(args, TempletonControllerJob.STATUSDIR_NAME, statusdir);
+ //Use of ToolRunner "-files" option could be considered here
addDef(args, TempletonControllerJob.COPY_NAME,
TempletonUtils.encodeArray(copyFiles));
addDef(args, TempletonControllerJob.OVERRIDE_CLASSPATH,
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java Thu Oct 30 16:22:33 2014
@@ -778,6 +778,7 @@ public class Server {
* @param optionsFile name of option file which contains Sqoop command to run
* @param otherFiles additional files to be shipped to the launcher, such as option
files which contain part of the Sqoop command
+ * @param libdir dir containing JDBC jars that Sqoop will need to interact with the database
* @param statusdir where the stderr/stdout of templeton controller job goes
* @param callback URL which WebHCat will call when the sqoop job finishes
* @param enablelog whether to collect mapreduce log into statusdir/logs
@@ -787,12 +788,13 @@ public class Server {
@Produces({MediaType.APPLICATION_JSON})
public EnqueueBean sqoop(@FormParam("command") String command,
@FormParam("optionsfile") String optionsFile,
+ @FormParam("libdir") String libdir,
@FormParam("files") String otherFiles,
@FormParam("statusdir") String statusdir,
@FormParam("callback") String callback,
@FormParam("enablelog") boolean enablelog)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
- ExecuteException, IOException, InterruptedException {
+ IOException, InterruptedException {
verifyUser();
if (command == null && optionsFile == null)
throw new BadParam("Must define Sqoop command or a optionsfile contains Sqoop command to run Sqoop job.");
@@ -805,13 +807,14 @@ public class Server {
userArgs.put("user.name", getDoAsUser());
userArgs.put("command", command);
userArgs.put("optionsfile", optionsFile);
+ userArgs.put("libdir", libdir);
userArgs.put("files", otherFiles);
userArgs.put("statusdir", statusdir);
userArgs.put("callback", callback);
userArgs.put("enablelog", Boolean.toString(enablelog));
SqoopDelegator d = new SqoopDelegator(appConf);
return d.run(getDoAsUser(), userArgs, command, optionsFile, otherFiles,
- statusdir, callback, getCompletedUrl(), enablelog);
+ statusdir, callback, getCompletedUrl(), enablelog, libdir);
}
/**
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java Thu Oct 30 16:22:33 2014
@@ -25,7 +25,10 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import org.apache.commons.exec.ExecuteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hive.hcatalog.templeton.tool.JobSubmissionConstants;
import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
import org.apache.hive.hcatalog.templeton.tool.TempletonUtils;
@@ -35,6 +38,7 @@ import org.apache.hive.hcatalog.templeto
* This is the backend of the Sqoop web service.
*/
public class SqoopDelegator extends LauncherDelegator {
+ private static final Log LOG = LogFactory.getLog(SqoopDelegator.class);
public SqoopDelegator(AppConfig appConf) {
super(appConf);
@@ -43,24 +47,29 @@ public class SqoopDelegator extends Laun
public EnqueueBean run(String user,
Map<String, Object> userArgs, String command,
String optionsFile, String otherFiles, String statusdir,
- String callback, String completedUrl, boolean enablelog)
+ String callback, String completedUrl, boolean enablelog, String libdir)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
- ExecuteException, IOException, InterruptedException
+ IOException, InterruptedException
{
+ if(TempletonUtils.isset(appConf.sqoopArchive())) {
+ if(!TempletonUtils.isset(appConf.sqoopPath()) && !TempletonUtils.isset(appConf.sqoopHome())) {
+ throw new IllegalStateException("If '" + AppConfig.SQOOP_ARCHIVE_NAME + "' is defined, '" +
+ AppConfig.SQOOP_PATH_NAME + "' and '" + AppConfig.SQOOP_HOME_PATH + "' must be defined");
+ }
+ }
runAs = user;
List<String> args = makeArgs(command, optionsFile, otherFiles, statusdir,
- completedUrl, enablelog);
+ completedUrl, enablelog, libdir);
return enqueueController(user, userArgs, callback, args);
}
-
- List<String> makeArgs(String command, String optionsFile, String otherFiles,
- String statusdir, String completedUrl, boolean enablelog)
+ private List<String> makeArgs(String command, String optionsFile, String otherFiles,
+ String statusdir, String completedUrl, boolean enablelog, String libdir)
throws BadParam, IOException, InterruptedException
{
ArrayList<String> args = new ArrayList<String>();
try {
- args.addAll(makeBasicArgs(optionsFile, otherFiles, statusdir, completedUrl, enablelog));
+ args.addAll(makeBasicArgs(optionsFile, otherFiles, statusdir, completedUrl, enablelog, libdir));
args.add("--");
TempletonUtils.addCmdForWindows(args);
args.add(appConf.sqoopPath());
@@ -89,7 +98,7 @@ public class SqoopDelegator extends Laun
}
private List<String> makeBasicArgs(String optionsFile, String otherFiles,
- String statusdir, String completedUrl, boolean enablelog)
+ String statusdir, String completedUrl, boolean enablelog, String libdir)
throws URISyntaxException, FileNotFoundException, IOException,
InterruptedException
{
@@ -101,9 +110,34 @@ public class SqoopDelegator extends Laun
String[] ofs = TempletonUtils.hadoopFsListAsArray(otherFiles, appConf, runAs);
allFiles.addAll(Arrays.asList(ofs));
}
+ if(TempletonUtils.isset(libdir) && TempletonUtils.isset(appConf.sqoopArchive())) {
+ /**Sqoop accesses databases via JDBC. This means it needs to have appropriate JDBC
+ drivers available. Normally, the user would install Sqoop and place these jars
+ into SQOOP_HOME/lib. When WebHCat is configured to auto-ship the Sqoop tar file, we
+ need to make sure that relevant JDBC jars are available on target node.
+ The user is expected to place any JDBC jars into an HDFS directory and specify this
+ dir in "libdir" parameter. All the files in this dir will be copied to lib/ of the
+ exploded Sqoop tar ball on target node.
+ {@link org.apache.hive.hcatalog.templeton.tool.LaunchMapper#handleSqoop(org.apache.hadoop.conf.Configuration, java.util.Map)}
+ */
+ LOG.debug("libdir=" + libdir);
+ List<Path> jarList = TempletonUtils.hadoopFsListChildren(libdir, appConf, runAs);
+ if(TempletonUtils.isset(jarList)) {
+ StringBuilder sb = new StringBuilder();
+ for(Path jar : jarList) {
+ allFiles.add(jar.toString());
+ sb.append(jar.getName()).append(',');
+ }
+ sb.setLength(sb.length() - 1);
+ //we use the same mechanism to copy "files"/"otherFiles" and "libdir", but we only want to put
+ //contents of "libdir" in Sqoop/lib, thus we pass the list of names here
+ addDef(args, JobSubmissionConstants.Sqoop.LIB_JARS, sb.toString());
+ addDef(args, AppConfig.SQOOP_HOME_PATH, appConf.get(AppConfig.SQOOP_HOME_PATH));
+ }
+ }
args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles,
enablelog, JobType.SQOOP));
- if (appConf.sqoopArchive() != null && !appConf.sqoopArchive().equals("")) {
+ if(TempletonUtils.isset(appConf.sqoopArchive())) {
args.add("-archives");
args.add(appConf.sqoopArchive());
}
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java Thu Oct 30 16:22:33 2014
@@ -25,9 +25,7 @@ import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -116,32 +114,6 @@ public class HDFSStorage implements Temp
}
@Override
- public Map<String, String> getFields(Type type, String id) {
- HashMap<String, String> map = new HashMap<String, String>();
- BufferedReader in = null;
- Path p = new Path(getPath(type) + "/" + id);
- try {
- for (FileStatus status : fs.listStatus(p)) {
- in = new BufferedReader(new InputStreamReader(fs.open(status.getPath())));
- String line = null;
- String val = "";
- while ((line = in.readLine()) != null) {
- if (!val.equals("")) {
- val += "\n";
- }
- val += line;
- }
- map.put(status.getPath().getName(), val);
- }
- } catch (IOException e) {
- LOG.trace("Couldn't find " + p);
- } finally {
- close(in);
- }
- return map;
- }
-
- @Override
public boolean delete(Type type, String id) throws NotFoundException {
Path p = new Path(getPath(type) + "/" + id);
try {
@@ -153,14 +125,6 @@ public class HDFSStorage implements Temp
return false;
}
- @Override
- public List<String> getAll() {
- ArrayList<String> allNodes = new ArrayList<String>();
- for (Type type : Type.values()) {
- allNodes.addAll(getAllForType(type));
- }
- return allNodes;
- }
@Override
public List<String> getAllForType(Type type) {
@@ -177,40 +141,6 @@ public class HDFSStorage implements Temp
}
@Override
- public List<String> getAllForKey(String key, String value) {
- ArrayList<String> allNodes = new ArrayList<String>();
- try {
- for (Type type : Type.values()) {
- allNodes.addAll(getAllForTypeAndKey(type, key, value));
- }
- } catch (Exception e) {
- LOG.trace("Couldn't find children for key " + key + ": " +
- e.getMessage());
- }
- return allNodes;
- }
-
- @Override
- public List<String> getAllForTypeAndKey(Type type, String key, String value) {
- ArrayList<String> allNodes = new ArrayList<String>();
- HashMap<String, String> map = new HashMap<String, String>();
- try {
- for (FileStatus status :
- fs.listStatus(new Path(getPath(type)))) {
- map = (HashMap<String, String>)
- getFields(type, status.getPath().getName());
- if (map.get(key).equals(value)) {
- allNodes.add(status.getPath().getName());
- }
- }
- } catch (Exception e) {
- LOG.trace("Couldn't find children for key " + key + ": " +
- e.getMessage());
- }
- return allNodes;
- }
-
- @Override
public void openStorage(Configuration config) throws IOException {
storage_root = config.get(TempletonStorage.STORAGE_ROOT);
if (fs == null) {
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobStateTracker.java Thu Oct 30 16:22:33 2014
@@ -24,19 +24,30 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
+/*
+ * The general idea here is to create
+ * /created/1
+ * /created/2
+ * /created/3 ....
+ * for each job submitted. The node number is generated by ZK (PERSISTENT_SEQUENTIAL) and the
+ * payload is the JobId. Basically this keeps track of the order in which jobs were submitted,
+ * and ZooKeeperCleanup uses this to purge old job info.
+ * Since the /jobs/<id> node has a create/update timestamp
+ * (http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#sc_zkStatStructure) this whole
+ * thing can be removed.
+*/
public class JobStateTracker {
// The path to the tracking root
private String job_trackingroot = null;
// The zookeeper connection to use
- private ZooKeeper zk;
+ private CuratorFramework zk;
// The id of the tracking node -- must be a SEQUENTIAL node
private String trackingnode;
@@ -51,7 +62,7 @@ public class JobStateTracker {
* Constructor for a new node -- takes the jobid of an existing job
*
*/
- public JobStateTracker(String node, ZooKeeper zk, boolean nodeIsTracker,
+ public JobStateTracker(String node, CuratorFramework zk, boolean nodeIsTracker,
String job_trackingpath) {
this.zk = zk;
if (nodeIsTracker) {
@@ -65,30 +76,25 @@ public class JobStateTracker {
/**
* Create the parent znode for this job state.
*/
- public void create()
- throws IOException {
- String[] paths = ZooKeeperStorage.getPaths(job_trackingroot);
- for (String znode : paths) {
- try {
- zk.create(znode, new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException e) {
- } catch (Exception e) {
- throw new IOException("Unable to create parent nodes");
- }
+ public void create() throws IOException {
+ try {
+ zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+ .withACL(Ids.OPEN_ACL_UNSAFE).forPath(job_trackingroot);
+ } catch (KeeperException.NodeExistsException e) {
+ //root must exist already
+ } catch (Exception e) {
+ throw new IOException("Unable to create parent nodes");
}
try {
- trackingnode = zk.create(makeTrackingZnode(), jobid.getBytes(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+ trackingnode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
+ .withACL(Ids.OPEN_ACL_UNSAFE).forPath(makeTrackingZnode(), jobid.getBytes());
} catch (Exception e) {
throw new IOException("Unable to create " + makeTrackingZnode());
}
}
-
- public void delete()
- throws IOException {
+ public void delete() throws IOException {
try {
- zk.delete(makeTrackingJobZnode(trackingnode), -1);
+ zk.delete().forPath(makeTrackingJobZnode(trackingnode));
} catch (Exception e) {
// Might have been deleted already
LOG.info("Couldn't delete " + makeTrackingJobZnode(trackingnode));
@@ -101,13 +107,10 @@ public class JobStateTracker {
*/
public String getJobID() throws IOException {
try {
- return new String(zk.getData(makeTrackingJobZnode(trackingnode),
- false, new Stat()));
- } catch (KeeperException e) {
+ return new String(zk.getData().forPath(makeTrackingJobZnode(trackingnode)));
+ } catch (Exception e) {
// It was deleted during the transaction
- throw new IOException("Node already deleted " + trackingnode);
- } catch (InterruptedException e) {
- throw new IOException("Couldn't read node " + trackingnode);
+ throw new IOException("Node already deleted " + trackingnode, e);
}
}
@@ -129,13 +132,13 @@ public class JobStateTracker {
* Get the list of tracking jobs. These can be used to determine which jobs have
* expired.
*/
- public static List<String> getTrackingJobs(Configuration conf, ZooKeeper zk)
+ public static List<String> getTrackingJobs(Configuration conf, CuratorFramework zk)
throws IOException {
ArrayList<String> jobs = new ArrayList<String>();
try {
- for (String myid : zk.getChildren(
+ for (String myid : zk.getChildren().forPath(
conf.get(TempletonStorage.STORAGE_ROOT)
- + ZooKeeperStorage.TRACKINGDIR, false)) {
+ + ZooKeeperStorage.TRACKINGDIR)) {
jobs.add(myid);
}
} catch (Exception e) {
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java Thu Oct 30 16:22:33 2014
@@ -58,4 +58,10 @@ public interface JobSubmissionConstants
public static final String HCAT_HOME = "HCAT_HOME";
public static final String PIG_OPTS = "PIG_OPTS";
}
+ public static interface Sqoop {
+ /**
+ * comma-separated list of jar names (short name) which are needed for Sqoop JDBC access
+ */
+ public static final String LIB_JARS = "templeton.sqoop.lib.jar";
+ }
}
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java Thu Oct 30 16:22:33 2014
@@ -18,6 +18,7 @@
*/
package org.apache.hive.hcatalog.templeton.tool;
+import com.google.common.io.Files;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -33,6 +34,7 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.hcatalog.templeton.AppConfig;
import org.apache.hive.hcatalog.templeton.BadParam;
import org.apache.hive.hcatalog.templeton.LauncherDelegator;
@@ -89,6 +91,24 @@ public class LaunchMapper extends Mapper
env.put(PigConstants.PIG_OPTS, pigOpts.toString());
}
}
+
+ /**
+ * {@link #copyLocal(String, org.apache.hadoop.conf.Configuration)} should be called before this
+ * See {@link org.apache.hive.hcatalog.templeton.SqoopDelegator#makeBasicArgs(String, String, String, String, boolean, String)}
+ * for more comments
+ */
+ private static void handleSqoop(Configuration conf, Map<String, String> env) throws IOException {
+ if(TempletonUtils.isset(conf.get(Sqoop.LIB_JARS))) {
+ //LIB_JARS should only be set if Sqoop is auto-shipped
+ LOG.debug(Sqoop.LIB_JARS + "=" + conf.get(Sqoop.LIB_JARS));
+ //copy these (which have now been localized) jars to sqoop/lib
+ String destDir = conf.get(AppConfig.SQOOP_HOME_PATH) + File.separator + "lib";
+ String[] files = conf.getStrings(Sqoop.LIB_JARS);
+ for(String f : files) {
+ Files.copy(new File(f), new File(destDir + File.separator + f));
+ }
+ }
+ }
protected Process startJob(Context context, String user, String overrideClasspath)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
@@ -108,6 +128,7 @@ public class LaunchMapper extends Mapper
removeEnv.add("mapredcommand");
Map<String, String> env = TempletonUtils.hadoopUserEnv(user, overrideClasspath);
handlePigEnvVars(conf, env);
+ handleSqoop(conf, env);
List<String> jarArgsList = new LinkedList<String>(Arrays.asList(jarArgs));
handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER, "mapreduce.job.credentials.binary");
handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ, "tez.credentials.path");
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonStorage.java Thu Oct 30 16:22:33 2014
@@ -20,7 +20,6 @@ package org.apache.hive.hcatalog.templet
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -45,7 +44,7 @@ import org.apache.hadoop.conf.Configurat
public interface TempletonStorage {
// These are the possible types referenced by 'type' below.
public enum Type {
- UNKNOWN, JOB, JOBTRACKING, TEMPLETONOVERHEAD
+ UNKNOWN, JOB, JOBTRACKING
}
public static final String STORAGE_CLASS = "templeton.storage.class";
@@ -79,20 +78,6 @@ public interface TempletonStorage {
public String getField(Type type, String id, String key);
/**
- * Get all the name/value pairs stored for this id.
- * Be careful using getFields() -- optimistic locking will mean that
- * your odds of a conflict are decreased if you read/write one field
- * at a time. getFields() is intended for read-only usage.
- *
- * If the type is UNKNOWN, search for the id in all types.
- *
- * @param type The data type (as listed above)
- * @param id The String id of this data grouping (jobid, etc.)
- * @return A Map of key/value pairs found for this type/id.
- */
- public Map<String, String> getFields(Type type, String id);
-
- /**
* Delete a data grouping (all data for a jobid, all tracking data
* for a job, etc.). If the type is UNKNOWN, search for the id
* in all types.
@@ -105,13 +90,6 @@ public interface TempletonStorage {
public boolean delete(Type type, String id) throws NotFoundException;
/**
- * Get the id of each data grouping in the storage system.
- *
- * @return An ArrayList<String> of ids.
- */
- public List<String> getAll();
-
- /**
* Get the id of each data grouping of a given type in the storage
* system.
* @param type The data type (as listed above)
@@ -120,26 +98,6 @@ public interface TempletonStorage {
public List<String> getAllForType(Type type);
/**
- * Get the id of each data grouping that has the specific key/value
- * pair.
- * @param key The name of the field to search for
- * @param value The value of the field to search for
- * @return An ArrayList<String> of ids.
- */
- public List<String> getAllForKey(String key, String value);
-
- /**
- * Get the id of each data grouping of a given type that has the
- * specific key/value pair.
- * @param type The data type (as listed above)
- * @param key The name of the field to search for
- * @param value The value of the field to search for
- * @return An ArrayList<String> of ids.
- */
- public List<String> getAllForTypeAndKey(Type type, String key,
- String value);
-
- /**
* For storage methods that require a connection, this is a hint
* that it's time to open a connection.
*/
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java Thu Oct 30 16:22:33 2014
@@ -29,6 +29,7 @@ import java.net.URLDecoder;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
@@ -39,8 +40,10 @@ import java.util.regex.Pattern;
import javax.ws.rs.core.UriBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@@ -211,6 +214,28 @@ public class TempletonUtils {
}
/**
+ * Returns all files (non-recursive) in {@code dirName}
+ */
+ public static List<Path> hadoopFsListChildren(String dirName, Configuration conf, String user)
+ throws URISyntaxException, IOException, InterruptedException {
+
+ Path p = hadoopFsPath(dirName, conf, user);
+ FileSystem fs = p.getFileSystem(conf);
+ if(!fs.exists(p)) {
+ return Collections.emptyList();
+ }
+ List<FileStatus> children = ShimLoader.getHadoopShims().listLocatedStatus(fs, p, null);
+ if(!isset(children)) {
+ return Collections.emptyList();
+ }
+ List<Path> files = new ArrayList<Path>();
+ for(FileStatus stat : children) {
+ files.add(stat.getPath());
+ }
+ return files;
+ }
+
+ /**
* @return true iff we are sure the file is not there.
*/
public static boolean hadoopFsIsMissing(FileSystem fs, Path p) {
@@ -239,8 +264,7 @@ public class TempletonUtils {
}
public static Path hadoopFsPath(String fname, final Configuration conf, String user)
- throws URISyntaxException, IOException,
- InterruptedException {
+ throws URISyntaxException, IOException, InterruptedException {
if (fname == null || conf == null) {
return null;
}
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperCleanup.java Thu Oct 30 16:22:33 2014
@@ -24,8 +24,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Date;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.conf.Configuration;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -89,7 +89,7 @@ public class ZooKeeperCleanup extends Th
* @throws IOException
*/
public void run() {
- ZooKeeper zk = null;
+ CuratorFramework zk = null;
List<String> nodes = null;
isRunning = true;
while (!stop) {
@@ -112,13 +112,7 @@ public class ZooKeeperCleanup extends Th
} catch (Exception e) {
LOG.error("Cleanup cycle failed: " + e.getMessage());
} finally {
- if (zk != null) {
- try {
- zk.close();
- } catch (InterruptedException e) {
- // We're trying to exit anyway, just ignore.
- }
- }
+ if (zk != null) zk.close();
}
long sleepMillis = (long) (Math.random() * interval);
@@ -140,7 +134,7 @@ public class ZooKeeperCleanup extends Th
*
* @throws IOException
*/
- public List<String> getChildList(ZooKeeper zk) {
+ public List<String> getChildList(CuratorFramework zk) {
try {
List<String> jobs = JobStateTracker.getTrackingJobs(appConf, zk);
Collections.sort(jobs);
@@ -154,7 +148,7 @@ public class ZooKeeperCleanup extends Th
/**
* Check to see if a job is more than maxage old, and delete it if so.
*/
- public boolean checkAndDelete(String node, ZooKeeper zk) {
+ public boolean checkAndDelete(String node, CuratorFramework zk) {
JobState state = null;
try {
JobStateTracker tracker = new JobStateTracker(node, zk, true,
@@ -167,8 +161,11 @@ public class ZooKeeperCleanup extends Th
// an error in creation, and we want to delete it anyway.
long then = 0;
if (state.getCreated() != null) {
+ //this is set in ZooKeeperStorage.create()
then = state.getCreated();
}
+ //todo: this should check that the job actually completed and likely use completion time
+ //which is not tracked directly but available on /jobs/<id> node via "mtime" in Stat
if (now - then > maxage) {
LOG.info("Deleting " + tracker.getJobID());
state.delete();
@@ -177,7 +174,7 @@ public class ZooKeeperCleanup extends Th
}
return false;
} catch (Exception e) {
- LOG.info("checkAndDelete failed for " + node);
+ LOG.info("checkAndDelete failed for " + node + " due to: " + e.getMessage());
// We don't throw a new exception for this -- just keep going with the
// next one.
return true;
Modified: hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java (original)
+++ hive/branches/spark/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/ZooKeeperStorage.java Thu Oct 30 16:22:33 2014
@@ -19,21 +19,18 @@
package org.apache.hive.hcatalog.templeton.tool;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
/**
* A storage implementation based on storing everything in ZooKeeper.
@@ -60,29 +57,29 @@ public class ZooKeeperStorage implements
private static final Log LOG = LogFactory.getLog(ZooKeeperStorage.class);
- private ZooKeeper zk;
+ private CuratorFramework zk;
/**
* Open a ZooKeeper connection for the JobState.
*/
- public static ZooKeeper zkOpen(String zkHosts, int zkSessionTimeout)
+ public static CuratorFramework zkOpen(String zkHosts, int zkSessionTimeoutMs)
throws IOException {
- return new ZooKeeper(zkHosts,
- zkSessionTimeout,
- new Watcher() {
- @Override
- synchronized public void process(WatchedEvent event) {
- }
- });
+ //do we need to add a connection status listener? What will that do?
+ ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ CuratorFramework zk = CuratorFrameworkFactory.newClient(zkHosts, zkSessionTimeoutMs,
+ CuratorFrameworkFactory.builder().getConnectionTimeoutMs(), retryPolicy);
+ zk.start();
+ return zk;
}
/**
* Open a ZooKeeper connection for the JobState.
*/
- public static ZooKeeper zkOpen(Configuration conf)
- throws IOException {
+ public static CuratorFramework zkOpen(Configuration conf) throws IOException {
+ /*the silly looking call to Builder below is to get the default value of session timeout
+ from Curator which itself exposes it as system property*/
return zkOpen(conf.get(ZK_HOSTS),
- conf.getInt(ZK_SESSION_TIMEOUT, 30000));
+ conf.getInt(ZK_SESSION_TIMEOUT, CuratorFrameworkFactory.builder().getSessionTimeoutMs()));
}
public ZooKeeperStorage() {
@@ -93,15 +90,9 @@ public class ZooKeeperStorage implements
/**
* Close this ZK connection.
*/
- public void close()
- throws IOException {
+ public void close() throws IOException {
if (zk != null) {
- try {
- zk.close();
- zk = null;
- } catch (InterruptedException e) {
- throw new IOException("Closing ZooKeeper connection", e);
- }
+ zk.close();
}
}
@@ -118,48 +109,54 @@ public class ZooKeeperStorage implements
*/
public void create(Type type, String id)
throws IOException {
+ boolean wasCreated = false;
try {
- String[] paths = getPaths(makeZnode(type, id));
- boolean wasCreated = false;
- for (String znode : paths) {
- try {
- zk.create(znode, new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- wasCreated = true;
- } catch (KeeperException.NodeExistsException e) {
+ zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(makeZnode(type, id));
+ wasCreated = true;
+ }
+ catch(KeeperException.NodeExistsException ex) {
+ //we just created top level node for this jobId
+ }
+ catch(Exception ex) {
+ throw new IOException("Error creating " + makeZnode(type, id), ex);
+ }
+ if(wasCreated) {
+ try {
+ // Really not sure if this should go here. Will have
+ // to see how the storage mechanism evolves.
+ if (type.equals(Type.JOB)) {
+ JobStateTracker jt = new JobStateTracker(id, zk, false, job_trackingpath);
+ jt.create();
}
- }
- if (wasCreated) {
+ } catch (Exception e) {
+ LOG.error("Error tracking (jobId=" + id + "): " + e.getMessage());
+ // If we couldn't create the tracker node, don't create the main node.
try {
- // Really not sure if this should go here. Will have
- // to see how the storage mechanism evolves.
- if (type.equals(Type.JOB)) {
- JobStateTracker jt = new JobStateTracker(id, zk, false,
- job_trackingpath);
- jt.create();
- }
- } catch (Exception e) {
- LOG.warn("Error tracking: " + e.getMessage());
- // If we couldn't create the tracker node, don't
- // create the main node.
- zk.delete(makeZnode(type, id), -1);
+ zk.delete().forPath(makeZnode(type, id));//default version is -1
+ }
+ catch(Exception ex) {
+ //EK: it's not obvious that this is the right logic, if we don't record the 'callback'
+ //for example and never notify the client of job completion
+ throw new IOException("Failed to delete " + makeZnode(type, id) + ":" + ex);
}
}
- if (zk.exists(makeZnode(type, id), false) == null)
+ }
+ try {
+ if (zk.checkExists().forPath(makeZnode(type, id)) == null) {
throw new IOException("Unable to create " + makeZnode(type, id));
- if (wasCreated) {
- try {
- saveField(type, id, "created",
- Long.toString(System.currentTimeMillis()));
- } catch (NotFoundException nfe) {
- // Wow, something's really wrong.
- throw new IOException("Couldn't write to node " + id, nfe);
- }
}
- } catch (KeeperException e) {
- throw new IOException("Creating " + id, e);
- } catch (InterruptedException e) {
- throw new IOException("Creating " + id, e);
+ }
+ catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ if (wasCreated) {
+ try {
+ saveField(type, id, "created",
+ Long.toString(System.currentTimeMillis()));
+ } catch (NotFoundException nfe) {
+ // Wow, something's really wrong.
+ throw new IOException("Couldn't write to node " + id, nfe);
+ }
}
}
@@ -198,25 +195,14 @@ public class ZooKeeperStorage implements
/**
* A helper method that sets a field value.
- * @param type
- * @param id
- * @param name
- * @param val
- * @throws KeeperException
- * @throws UnsupportedEncodingException
- * @throws InterruptedException
+ * @throws java.lang.Exception
*/
- private void setFieldData(Type type, String id, String name, String val)
- throws KeeperException, UnsupportedEncodingException, InterruptedException {
+ private void setFieldData(Type type, String id, String name, String val) throws Exception {
try {
- zk.create(makeFieldZnode(type, id, name),
- val.getBytes(ENCODING),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ zk.create().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE)
+ .forPath(makeFieldZnode(type, id, name), val.getBytes(ENCODING));
} catch (KeeperException.NodeExistsException e) {
- zk.setData(makeFieldZnode(type, id, name),
- val.getBytes(ENCODING),
- -1);
+ zk.setData().forPath(makeFieldZnode(type, id, name), val.getBytes(ENCODING));
}
}
@@ -251,7 +237,7 @@ public class ZooKeeperStorage implements
@Override
public String getField(Type type, String id, String key) {
try {
- byte[] b = zk.getData(makeFieldZnode(type, id, key), false, null);
+ byte[] b = zk.getData().forPath(makeFieldZnode(type, id, key));
return new String(b, ENCODING);
} catch (Exception e) {
return null;
@@ -259,26 +245,12 @@ public class ZooKeeperStorage implements
}
@Override
- public Map<String, String> getFields(Type type, String id) {
- HashMap<String, String> map = new HashMap<String, String>();
- try {
- for (String node : zk.getChildren(makeZnode(type, id), false)) {
- byte[] b = zk.getData(makeFieldZnode(type, id, node),
- false, null);
- map.put(node, new String(b, ENCODING));
- }
- } catch (Exception e) {
- return map;
- }
- return map;
- }
-
- @Override
public boolean delete(Type type, String id) throws NotFoundException {
try {
- for (String child : zk.getChildren(makeZnode(type, id), false)) {
+
+ for (String child : zk.getChildren().forPath(makeZnode(type, id))) {
try {
- zk.delete(makeFieldZnode(type, id, child), -1);
+ zk.delete().forPath(makeFieldZnode(type, id, child));
} catch (Exception e) {
// Other nodes may be trying to delete this at the same time,
// so just log errors and skip them.
@@ -287,7 +259,7 @@ public class ZooKeeperStorage implements
}
}
try {
- zk.delete(makeZnode(type, id), -1);
+ zk.delete().forPath(makeZnode(type, id));
} catch (Exception e) {
// Same thing -- might be deleted by other nodes, so just go on.
throw new NotFoundException("Couldn't delete " +
@@ -302,58 +274,15 @@ public class ZooKeeperStorage implements
}
@Override
- public List<String> getAll() {
- ArrayList<String> allNodes = new ArrayList<String>();
- for (Type type : Type.values()) {
- allNodes.addAll(getAllForType(type));
- }
- return allNodes;
- }
-
- @Override
public List<String> getAllForType(Type type) {
try {
- return zk.getChildren(getPath(type), false);
+ return zk.getChildren().forPath(getPath(type));
} catch (Exception e) {
return new ArrayList<String>();
}
}
@Override
- public List<String> getAllForKey(String key, String value) {
- ArrayList<String> allNodes = new ArrayList<String>();
- try {
- for (Type type : Type.values()) {
- allNodes.addAll(getAllForTypeAndKey(type, key, value));
- }
- } catch (Exception e) {
- LOG.info("Couldn't find children.");
- }
- return allNodes;
- }
-
- @Override
- public List<String> getAllForTypeAndKey(Type type, String key, String value) {
- ArrayList<String> allNodes = new ArrayList<String>();
- try {
- for (String id : zk.getChildren(getPath(type), false)) {
- for (String field : zk.getChildren(id, false)) {
- if (field.endsWith("/" + key)) {
- byte[] b = zk.getData(field, false, null);
- if (new String(b, ENCODING).equals(value)) {
- allNodes.add(id);
- }
- }
- }
- }
- } catch (Exception e) {
- // Log and go to the next type -- this one might not exist
- LOG.info("Couldn't find children of " + getPath(type));
- }
- return allNodes;
- }
-
- @Override
public void openStorage(Configuration config) throws IOException {
storage_root = config.get(STORAGE_ROOT);
job_path = storage_root + "/jobs";
Modified: hive/branches/spark/hwi/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/hwi/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/hwi/pom.xml (original)
+++ hive/branches/spark/hwi/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/itests/custom-serde/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/custom-serde/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/custom-serde/pom.xml (original)
+++ hive/branches/spark/itests/custom-serde/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe1.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe1.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe1.java (original)
+++ hive/branches/spark/itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe1.java Thu Oct 30 16:22:33 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES})
public class CustomSerDe1 extends AbstractSerDe {
int numColumns;
Modified: hive/branches/spark/itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe2.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe2.java (original)
+++ hive/branches/spark/itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe2.java Thu Oct 30 16:22:33 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES})
public class CustomSerDe2 extends AbstractSerDe {
int numColumns;
Modified: hive/branches/spark/itests/hcatalog-unit/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hcatalog-unit/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/hcatalog-unit/pom.xml (original)
+++ hive/branches/spark/itests/hcatalog-unit/pom.xml Thu Oct 30 16:22:33 2014
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/itests/hive-minikdc/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-minikdc/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/hive-minikdc/pom.xml (original)
+++ hive/branches/spark/itests/hive-minikdc/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -60,6 +60,13 @@
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
+ <artifactId>hive-it-unit</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
Modified: hive/branches/spark/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java (original)
+++ hive/branches/spark/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestHs2HooksWithMiniKdc.java Thu Oct 30 16:22:33 2014
@@ -28,73 +28,32 @@ import junit.framework.Assert;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
-import org.apache.hadoop.hive.ql.hooks.HookContext;
-import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+import org.apache.hadoop.hive.hooks.TestHs2Hooks.PostExecHook;
+import org.apache.hadoop.hive.hooks.TestHs2Hooks.PreExecHook;
+import org.apache.hadoop.hive.hooks.TestHs2Hooks.SemanticAnalysisHook;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Tests information retrieved from hooks, in Kerberos mode.
*/
public class TestHs2HooksWithMiniKdc {
- private static final Logger LOG = LoggerFactory.getLogger(TestHs2HooksWithMiniKdc.class);
-
- public static class PostExecHook implements ExecuteWithHookContext {
- private static String userName;
- private static String ipAddress;
- private static String operation;
- private static Throwable error;
-
- public void run(HookContext hookContext) {
- try {
- if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) {
- ipAddress = hookContext.getIpAddress();
- userName = hookContext.getUserName();
- operation = hookContext.getOperationName();
- }
- } catch (Throwable t) {
- LOG.error("Error in PostExecHook: " + t, t);
- error = t;
- }
- }
- }
-
- public static class PreExecHook implements ExecuteWithHookContext {
- private static String userName;
- private static String ipAddress;
- private static String operation;
- private static Throwable error;
-
- public void run(HookContext hookContext) {
- try {
- if (hookContext.getHookType().equals(HookType.PRE_EXEC_HOOK)) {
- ipAddress = hookContext.getIpAddress();
- userName = hookContext.getUserName();
- operation = hookContext.getOperationName();
- }
- } catch (Throwable t) {
- LOG.error("Error in PreExecHook: " + t, t);
- error = t;
- }
- }
- }
private static MiniHS2 miniHS2 = null;
private static MiniHiveKdc miniHiveKdc = null;
private static Map<String, String> confOverlay = new HashMap<String, String>();
private Connection hs2Conn;
@BeforeClass
- public static void beforeTest() throws Exception {
+ public static void setUpBeforeClass() throws Exception {
Class.forName(MiniHS2.getJdbcDriverName());
confOverlay.put(ConfVars.POSTEXECHOOKS.varname, PostExecHook.class.getName());
confOverlay.put(ConfVars.PREEXECHOOKS.varname, PreExecHook.class.getName());
+ confOverlay.put(ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ SemanticAnalysisHook.class.getName());
HiveConf hiveConf = new HiveConf();
miniHiveKdc = MiniHiveKdc.getMiniHiveKdc(hiveConf);
@@ -102,12 +61,30 @@ public class TestHs2HooksWithMiniKdc {
miniHS2.start(confOverlay);
}
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ miniHS2.stop();
+ }
+
@Before
- public void setUp() throws Exception {
+ public void setUpTest() throws Exception {
+ PreExecHook.userName = null;
+ PreExecHook.ipAddress = null;
+ PreExecHook.operation = null;
+ PreExecHook.error = null;
+ PostExecHook.userName = null;
+ PostExecHook.ipAddress = null;
+ PostExecHook.operation = null;
+ PostExecHook.error = null;
+ SemanticAnalysisHook.userName = null;
+ SemanticAnalysisHook.ipAddress = null;
+ SemanticAnalysisHook.command = null;
+ SemanticAnalysisHook.preAnalyzeError = null;
+ SemanticAnalysisHook.postAnalyzeError = null;
}
@After
- public void tearDown() throws Exception {
+ public void tearDownTest() throws Exception {
if (hs2Conn != null) {
try {
hs2Conn.close();
@@ -117,16 +94,11 @@ public class TestHs2HooksWithMiniKdc {
}
}
- @AfterClass
- public static void afterTest() throws Exception {
- miniHS2.stop();
- }
-
/**
- * Test get IpAddress and username from hook.
+ * Test that hook context properties are correctly set.
*/
@Test
- public void testIpUserName() throws Throwable {
+ public void testHookContexts() throws Throwable {
miniHiveKdc.loginUser(MiniHiveKdc.HIVE_TEST_USER_1);
hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL());
@@ -155,5 +127,24 @@ public class TestHs2HooksWithMiniKdc {
Assert.assertEquals(MiniHiveKdc.HIVE_TEST_USER_1, PreExecHook.userName);
Assert.assertTrue(PreExecHook.ipAddress, PreExecHook.ipAddress.contains("127.0.0.1"));
Assert.assertEquals("SHOWTABLES", PreExecHook.operation);
+
+ error = SemanticAnalysisHook.preAnalyzeError;
+ if (error != null) {
+ throw error;
+ }
+ error = SemanticAnalysisHook.postAnalyzeError;
+ if (error != null) {
+ throw error;
+ }
+
+ Assert.assertNotNull(SemanticAnalysisHook.ipAddress,
+ "semantic hook context ipaddress is null");
+ Assert.assertNotNull(SemanticAnalysisHook.userName,
+ "semantic hook context userName is null");
+ Assert.assertNotNull(SemanticAnalysisHook.command ,
+ "semantic hook context command is null");
+ Assert.assertTrue(SemanticAnalysisHook.ipAddress,
+ SemanticAnalysisHook.ipAddress.contains("127.0.0.1"));
+ Assert.assertEquals("show tables", SemanticAnalysisHook.command);
}
-}
+}
\ No newline at end of file
Modified: hive/branches/spark/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java (original)
+++ hive/branches/spark/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java Thu Oct 30 16:22:33 2014
@@ -19,7 +19,6 @@
package org.apache.hive.minikdc;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -153,7 +152,7 @@ public class TestJdbcWithMiniKdc {
}
/***
- * Negtive test for token based authentication
+ * Negative test for token based authentication
* Verify that a user can't retrieve a token for user that
* it's not allowed to impersonate
* @throws Exception
@@ -163,13 +162,20 @@ public class TestJdbcWithMiniKdc {
miniHiveKdc.loginUser(MiniHiveKdc.HIVE_TEST_SUPER_USER);
hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL());
- // retrieve token and store in the cache
- String token = ((HiveConnection)hs2Conn).getDelegationToken(
- MiniHiveKdc.HIVE_TEST_USER_2, MiniHiveKdc.HIVE_SERVICE_PRINCIPAL);
- hs2Conn.close();
+ try {
+ // retrieve token and store in the cache
+ String token = ((HiveConnection)hs2Conn).getDelegationToken(
+ MiniHiveKdc.HIVE_TEST_USER_2, MiniHiveKdc.HIVE_SERVICE_PRINCIPAL);
- assertNull(MiniHiveKdc.HIVE_TEST_SUPER_USER + " shouldn't be allowed to create token for " +
- MiniHiveKdc.HIVE_TEST_USER_2, token);
+ fail(MiniHiveKdc.HIVE_TEST_SUPER_USER + " shouldn't be allowed to retrieve token for " +
+ MiniHiveKdc.HIVE_TEST_USER_2);
+ } catch (SQLException e) {
+ // Expected error
+ assertTrue(e.getMessage().contains("Failed to validate proxy privilege"));
+ assertTrue(e.getCause().getCause().getMessage().contains("Failed to validate proxy privilege"));
+ } finally {
+ hs2Conn.close();
+ }
}
/**
@@ -201,7 +207,9 @@ public class TestJdbcWithMiniKdc {
+ MiniHiveKdc.HIVE_TEST_USER_2);
} catch (SQLException e) {
// Expected error
- assertEquals("08S01", e.getSQLState().trim());
+ e.printStackTrace();
+ assertTrue(e.getMessage().contains("Failed to validate proxy privilege"));
+ assertTrue(e.getCause().getCause().getMessage().contains("is not allowed to impersonate"));
}
}
Modified: hive/branches/spark/itests/hive-unit-hadoop2/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-unit-hadoop2/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/hive-unit-hadoop2/pom.xml (original)
+++ hive/branches/spark/itests/hive-unit-hadoop2/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -118,6 +118,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-it-unit</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<!-- hadoop-2 dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
Modified: hive/branches/spark/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java (original)
+++ hive/branches/spark/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/ql/security/TestStorageBasedMetastoreAuthorizationProviderWithACL.java Thu Oct 30 16:22:33 2014
@@ -68,6 +68,10 @@ public class TestStorageBasedMetastoreAu
conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, warehouseDir.toString());
conf.setBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS, true);
+ // Set up scratch directory
+ Path scratchDir = new Path(new Path(fs.getUri()), "/scratchdir");
+ conf.setVar(HiveConf.ConfVars.SCRATCHDIR, scratchDir.toString());
+
return conf;
}
Modified: hive/branches/spark/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1635536&r1=1630990&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (original)
+++ hive/branches/spark/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Thu Oct 30 16:22:33 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@@ -98,9 +99,9 @@ public class TestHadoop20SAuthBridge ext
}
@Override
- public void startDelegationTokenSecretManager(Configuration conf, Object hms)
+ public void startDelegationTokenSecretManager(Configuration conf, Object hms, ServerMode sm)
throws IOException{
- super.startDelegationTokenSecretManager(conf, hms);
+ super.startDelegationTokenSecretManager(conf, hms, sm);
isMetastoreTokenManagerInited = true;
}
Modified: hive/branches/spark/itests/hive-unit/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-unit/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/hive-unit/pom.xml (original)
+++ hive/branches/spark/itests/hive-unit/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java (original)
+++ hive/branches/spark/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java Thu Oct 30 16:22:33 2014
@@ -176,7 +176,7 @@ public class MiniHS2 extends AbstractHiv
baseDfsDir = new Path(new Path(fs.getUri()), "/base");
} else {
fs = FileSystem.getLocal(hiveConf);
- baseDfsDir = new Path("file://"+ baseDir.getPath());
+ baseDfsDir = new Path("file://"+ baseDir.toURI().getPath());
}
if (useMiniKdc) {
hiveConf.setVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL, serverPrincipal);