You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC
svn commit: r1635536 [27/28] - in /hive/branches/spark: ./ accumulo-handler/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hado...
Modified: hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java (original)
+++ hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java Thu Oct 30 16:22:33 2014
@@ -85,77 +85,59 @@ public class TestAvroSerde {
}
@Test
- public void noSchemaProvidedReturnsErrorSchema() throws SerDeException {
+ public void noSchemaProvidedThrowsException() {
Properties props = new Properties();
- verifyErrorSchemaReturned(props);
+ verifyExpectedException(props);
}
@Test
- public void gibberishSchemaProvidedReturnsErrorSchema() throws SerDeException {
+ public void gibberishSchemaProvidedReturnsErrorSchema() {
Properties props = new Properties();
props.put(AvroSerdeUtils.SCHEMA_LITERAL, "blahblahblah");
- verifyErrorSchemaReturned(props);
+ verifyExpectedException(props);
}
@Test
- public void emptySchemaProvidedReturnsErrorSchema() throws SerDeException {
+ public void emptySchemaProvidedThrowsException() {
Properties props = new Properties();
props.put(AvroSerdeUtils.SCHEMA_LITERAL, "");
- verifyErrorSchemaReturned(props);
+ verifyExpectedException(props);
}
@Test
- public void badSchemaURLProvidedReturnsErrorSchema() throws SerDeException {
+ public void badSchemaURLProvidedThrowsException() {
Properties props = new Properties();
props.put(AvroSerdeUtils.SCHEMA_URL, "not://a/url");
- verifyErrorSchemaReturned(props);
+ verifyExpectedException(props);
}
@Test
- public void emptySchemaURLProvidedReturnsErrorSchema() throws SerDeException {
+ public void emptySchemaURLProvidedThrowsException() {
Properties props = new Properties();
props.put(AvroSerdeUtils.SCHEMA_URL, "");
- verifyErrorSchemaReturned(props);
+ verifyExpectedException(props);
}
@Test
- public void bothPropertiesSetToNoneReturnsErrorSchema() throws SerDeException {
+ public void bothPropertiesSetToNoneThrowsException() {
Properties props = new Properties();
props.put(AvroSerdeUtils.SCHEMA_URL, AvroSerdeUtils.SCHEMA_NONE);
props.put(AvroSerdeUtils.SCHEMA_LITERAL, AvroSerdeUtils.SCHEMA_NONE);
- verifyErrorSchemaReturned(props);
+ verifyExpectedException(props);
}
- private void verifyErrorSchemaReturned(Properties props) throws SerDeException {
+ private void verifyExpectedException(Properties props) {
AvroSerDe asd = new AvroSerDe();
- SerDeUtils.initializeSerDe(asd, new Configuration(), props, null);
- assertTrue(asd.getObjectInspector() instanceof StandardStructObjectInspector);
- StandardStructObjectInspector oi = (StandardStructObjectInspector)asd.getObjectInspector();
- List<? extends StructField> allStructFieldRefs = oi.getAllStructFieldRefs();
- assertEquals(SchemaResolutionProblem.SIGNAL_BAD_SCHEMA.getFields().size(), allStructFieldRefs.size());
- StructField firstField = allStructFieldRefs.get(0);
- assertTrue(firstField.toString().contains("error_error_error_error_error_error_error"));
-
- try {
- Writable mock = Mockito.mock(Writable.class);
- asd.deserialize(mock);
- fail("Should have thrown a BadSchemaException");
- } catch (BadSchemaException bse) {
- // good
- }
-
try {
- Object o = Mockito.mock(Object.class);
- ObjectInspector mockOI = Mockito.mock(ObjectInspector.class);
- asd.serialize(o, mockOI);
- fail("Should have thrown a BadSchemaException");
- } catch (BadSchemaException bse) {
+ SerDeUtils.initializeSerDe(asd, new Configuration(), props, null);
+ fail("Expected Exception did not be thrown");
+ } catch (SerDeException e) {
// good
}
}
Modified: hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/io/TestDateWritable.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/io/TestDateWritable.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/io/TestDateWritable.java (original)
+++ hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/io/TestDateWritable.java Thu Oct 30 16:22:33 2014
@@ -10,6 +10,12 @@ import java.sql.Date;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
public class TestDateWritable {
@@ -135,4 +141,61 @@ public class TestDateWritable {
private static String getRandomDateString() {
return dateStrings[(int) (Math.random() * 365)];
}
+
+ public static class DateTestCallable implements Callable<String> {
+ public DateTestCallable() {
+ }
+
+ @Override
+ public String call() throws Exception {
+ // Iterate through each day of the year, make sure Date/DateWritable match
+ Date originalDate = Date.valueOf("2014-01-01");
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(originalDate.getTime());
+ for (int idx = 0; idx < 365; ++idx) {
+ originalDate = new Date(cal.getTimeInMillis());
+ // Make sure originalDate is at midnight in the local time zone,
+ // since DateWritable will generate dates at that time.
+ originalDate = Date.valueOf(originalDate.toString());
+ DateWritable dateWritable = new DateWritable(originalDate);
+ if (!originalDate.equals(dateWritable.get())) {
+ return originalDate.toString();
+ }
+ cal.add(Calendar.DAY_OF_YEAR, 1);
+ }
+ // Success!
+ return null;
+ }
+ }
+
+ @Test
+ public void testDaylightSavingsTime() throws InterruptedException, ExecutionException {
+ String[] timeZones = {
+ "GMT",
+ "UTC",
+ "America/Godthab",
+ "America/Los_Angeles",
+ "Asia/Jerusalem",
+ "Australia/Melbourne",
+ "Europe/London",
+ // time zones with half hour boundaries
+ "America/St_Johns",
+ "Asia/Tehran",
+ };
+
+ for (String timeZone: timeZones) {
+ TimeZone previousDefault = TimeZone.getDefault();
+ TimeZone.setDefault(TimeZone.getTimeZone(timeZone));
+ assertEquals("Default timezone should now be " + timeZone,
+ timeZone, TimeZone.getDefault().getID());
+ ExecutorService threadPool = Executors.newFixedThreadPool(1);
+ try {
+ Future<String> future = threadPool.submit(new DateTestCallable());
+ String result = future.get();
+ assertNull("Failed at timezone " + timeZone + ", date " + result, result);
+ } finally {
+ threadPool.shutdown(); TimeZone.setDefault(previousDefault);
+ }
+ }
+ }
}
Modified: hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyArrayMapStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyArrayMapStruct.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyArrayMapStruct.java (original)
+++ hive/branches/spark/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyArrayMapStruct.java Thu Oct 30 16:22:33 2014
@@ -194,6 +194,113 @@ public class TestLazyArrayMapStruct exte
}
}
+ /*
+ * test LazyMap with bad entries, e.g., empty key or empty entries
+ * where '[' and ']' don't exist, only for notation purpose,
+ * STX with value of 2 as entry separator, ETX with 3 as key/value separator
+ * */
+ public void testLazyMapWithBadEntries() throws Throwable {
+ try {
+ {
+ // Map of String to String
+ Text nullSequence = new Text("");
+ ObjectInspector oi = LazyFactory.createLazyObjectInspector(
+ TypeInfoUtils.getTypeInfosFromTypeString("map<string,string>").get(
+ 0), new byte[] {'\2', '\3'}, 0, nullSequence,
+ false, (byte) 0);
+ LazyMap b = (LazyMap) LazyFactory.createLazyObject(oi);
+
+ //read friendly string: ak[EXT]av[STX]bk[ETX]bv[STX]ck[ETX]cv[STX]dk[ETX]dv
+ byte[] data = new byte[] {
+ 'a', 'k', '\3', 'a', 'v',
+ '\02', 'b', 'k', '\3', 'b', 'v',
+ '\02', 'c', 'k', '\3', 'c', 'v',
+ '\02', 'd', 'k', '\3', 'd', 'v'};
+ TestLazyPrimitive.initLazyObject(b, data, 0, data.length);
+
+ assertEquals(new Text("av"), ((LazyString) b
+ .getMapValueElement(new Text("ak"))).getWritableObject());
+ assertNull(b.getMapValueElement(new Text("-1")));
+ assertEquals(new Text("bv"), ((LazyString) b
+ .getMapValueElement(new Text("bk"))).getWritableObject());
+ assertEquals(new Text("cv"), ((LazyString) b
+ .getMapValueElement(new Text("ck"))).getWritableObject());
+ assertNull(b.getMapValueElement(new Text("-")));
+ assertEquals(new Text("dv"), ((LazyString) b
+ .getMapValueElement(new Text("dk"))).getWritableObject());
+ assertEquals(4, b.getMapSize());
+ }
+
+ {
+ // Map of String to String, LazyMap allows empty-string style key, e.g., {"" : null}
+ // or {"", ""}, but not null style key, e.g., {null:""}
+ Text nullSequence = new Text("");
+ ObjectInspector oi = LazyFactory.createLazyObjectInspector(
+ TypeInfoUtils.getTypeInfosFromTypeString("map<string,string>").get(
+ 0), new byte[] {'\2', '\3'}, 0, nullSequence,
+ false, (byte) 0);
+ LazyMap b = (LazyMap) LazyFactory.createLazyObject(oi);
+
+ //read friendly string: [STX]ak[EXT]av[STX]bk[ETX]bv[STX]ck[ETX]cv[STX]dk[ETX]dv
+ byte[] data = new byte[] {
+ '\02', 'a', 'k', '\3', 'a', 'v',
+ '\02', 'b', 'k', '\3', 'b', 'v',
+ '\02', 'c', 'k', '\3', 'c', 'v',
+ '\02', 'd', 'k', '\3', 'd', 'v'};
+ TestLazyPrimitive.initLazyObject(b, data, 0, data.length);
+
+ assertNull(b.getMapValueElement(new Text(""))); //{"" : null}
+ assertEquals(new Text("av"), ((LazyString) b
+ .getMapValueElement(new Text("ak"))).getWritableObject());
+ assertNull(b.getMapValueElement(new Text("-1")));
+ assertEquals(new Text("bv"), ((LazyString) b
+ .getMapValueElement(new Text("bk"))).getWritableObject());
+ assertEquals(new Text("cv"), ((LazyString) b
+ .getMapValueElement(new Text("ck"))).getWritableObject());
+ assertNull(b.getMapValueElement(new Text("-")));
+ assertEquals(new Text("dv"), ((LazyString) b
+ .getMapValueElement(new Text("dk"))).getWritableObject());
+ assertEquals(4, b.getMapSize());
+ }
+
+ {
+ // Map of String to String, LazyMap allows empty-string style key, e.g., {"" : null}
+ // or {"", ""}, but not null style key, e.g., {null:""}
+ Text nullSequence = new Text("");
+ ObjectInspector oi = LazyFactory.createLazyObjectInspector(
+ TypeInfoUtils.getTypeInfosFromTypeString("map<string,string>").get(
+ 0), new byte[] {'\2', '\3'}, 0, nullSequence,
+ false, (byte) 0);
+ LazyMap b = (LazyMap) LazyFactory.createLazyObject(oi);
+
+ //read friendly string: [ETX][STX]ak[EXT]av[STX]bk[ETX]bv[STX]ck[ETX]cv[STX]dk[ETX]dv
+ byte[] data = new byte[] {
+ '\03',
+ '\02', 'a', 'k', '\3', 'a', 'v',
+ '\02', 'b', 'k', '\3', 'b', 'v',
+ '\02', 'c', 'k', '\3', 'c', 'v',
+ '\02', 'd', 'k', '\3', 'd', 'v'};
+ TestLazyPrimitive.initLazyObject(b, data, 0, data.length);
+
+ assertNull(b.getMapValueElement(new Text(""))); //{"" : null}
+ assertEquals(new Text("av"), ((LazyString) b
+ .getMapValueElement(new Text("ak"))).getWritableObject());
+ assertNull(b.getMapValueElement(new Text("-1")));
+ assertEquals(new Text("bv"), ((LazyString) b
+ .getMapValueElement(new Text("bk"))).getWritableObject());
+ assertEquals(new Text("cv"), ((LazyString) b
+ .getMapValueElement(new Text("ck"))).getWritableObject());
+ assertNull(b.getMapValueElement(new Text("-")));
+ assertEquals(new Text("dv"), ((LazyString) b
+ .getMapValueElement(new Text("dk"))).getWritableObject());
+ assertEquals(4, b.getMapSize());
+ }
+ } catch(Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
/**
* Test the LazyMap class.
*/
Modified: hive/branches/spark/service/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/pom.xml (original)
+++ hive/branches/spark/service/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java Thu Oct 30 16:22:33 2014
@@ -29,10 +29,11 @@ import javax.security.sasl.Sasl;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.thrift.TProcessorFactory;
@@ -98,7 +99,7 @@ public class HiveAuthFactory {
conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL));
// start delegation token manager
try {
- saslServer.startDelegationTokenSecretManager(conf, null);
+ saslServer.startDelegationTokenSecretManager(conf, null, ServerMode.HIVESERVER2);
} catch (IOException e) {
throw new TTransportException("Failed to start token manager", e);
}
@@ -234,65 +235,72 @@ public class HiveAuthFactory {
// retrieve delegation token for the given user
public String getDelegationToken(String owner, String renewer) throws HiveSQLException {
if (saslServer == null) {
- throw new HiveSQLException("Delegation token only supported over kerberos authentication");
+ throw new HiveSQLException(
+ "Delegation token only supported over kerberos authentication", "08S01");
}
try {
String tokenStr = saslServer.getDelegationTokenWithService(owner, renewer, HS2_CLIENT_TOKEN);
if (tokenStr == null || tokenStr.isEmpty()) {
- throw new HiveSQLException("Received empty retrieving delegation token for user " + owner);
+ throw new HiveSQLException(
+ "Received empty retrieving delegation token for user " + owner, "08S01");
}
return tokenStr;
} catch (IOException e) {
- throw new HiveSQLException("Error retrieving delegation token for user " + owner, e);
+ throw new HiveSQLException(
+ "Error retrieving delegation token for user " + owner, "08S01", e);
} catch (InterruptedException e) {
- throw new HiveSQLException("delegation token retrieval interrupted", e);
+ throw new HiveSQLException("delegation token retrieval interrupted", "08S01", e);
}
}
// cancel given delegation token
public void cancelDelegationToken(String delegationToken) throws HiveSQLException {
if (saslServer == null) {
- throw new HiveSQLException("Delegation token only supported over kerberos authentication");
+ throw new HiveSQLException(
+ "Delegation token only supported over kerberos authentication", "08S01");
}
try {
saslServer.cancelDelegationToken(delegationToken);
} catch (IOException e) {
- throw new HiveSQLException("Error canceling delegation token " + delegationToken, e);
+ throw new HiveSQLException(
+ "Error canceling delegation token " + delegationToken, "08S01", e);
}
}
public void renewDelegationToken(String delegationToken) throws HiveSQLException {
if (saslServer == null) {
- throw new HiveSQLException("Delegation token only supported over kerberos authentication");
+ throw new HiveSQLException(
+ "Delegation token only supported over kerberos authentication", "08S01");
}
try {
saslServer.renewDelegationToken(delegationToken);
} catch (IOException e) {
- throw new HiveSQLException("Error renewing delegation token " + delegationToken, e);
+ throw new HiveSQLException(
+ "Error renewing delegation token " + delegationToken, "08S01", e);
}
}
public String getUserFromToken(String delegationToken) throws HiveSQLException {
if (saslServer == null) {
- throw new HiveSQLException("Delegation token only supported over kerberos authentication");
+ throw new HiveSQLException(
+ "Delegation token only supported over kerberos authentication", "08S01");
}
try {
return saslServer.getUserFromToken(delegationToken);
} catch (IOException e) {
- throw new HiveSQLException("Error extracting user from delegation token " + delegationToken,
- e);
+ throw new HiveSQLException(
+ "Error extracting user from delegation token " + delegationToken, "08S01", e);
}
}
public static void verifyProxyAccess(String realUser, String proxyUser, String ipAddress,
HiveConf hiveConf) throws HiveSQLException {
-
try {
UserGroupInformation sessionUgi;
if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
- KerberosName kerbName = new KerberosName(realUser);
- String shortPrincipalName = kerbName.getServiceName();
+ KerberosNameShim kerbName = ShimLoader.getHadoopShims().getKerberosNameShim(realUser);
+ String shortPrincipalName = kerbName.getServiceName();
sessionUgi = ShimLoader.getHadoopShims().createProxyUser(shortPrincipalName);
} else {
sessionUgi = ShimLoader.getHadoopShims().createRemoteUser(realUser, null);
@@ -303,8 +311,8 @@ public class HiveAuthFactory {
}
} catch (IOException e) {
throw new HiveSQLException(
- "Failed to validate proxy privilege of " + realUser + " for " + proxyUser, e);
+ "Failed to validate proxy privilege of " + realUser + " for " + proxyUser, "08S01", e);
}
}
-
+
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/CLIService.java Thu Oct 30 16:22:33 2014
@@ -44,6 +44,7 @@ import org.apache.hive.service.auth.Hive
import org.apache.hive.service.cli.operation.Operation;
import org.apache.hive.service.cli.session.SessionManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.HiveServer2;
/**
* CLIService.
@@ -64,15 +65,18 @@ public class CLIService extends Composit
private SessionManager sessionManager;
private UserGroupInformation serviceUGI;
private UserGroupInformation httpUGI;
+ // The HiveServer2 instance running this service
+ private final HiveServer2 hiveServer2;
- public CLIService() {
+ public CLIService(HiveServer2 hiveServer2) {
super(CLIService.class.getSimpleName());
+ this.hiveServer2 = hiveServer2;
}
@Override
public synchronized void init(HiveConf hiveConf) {
this.hiveConf = hiveConf;
- sessionManager = new SessionManager();
+ sessionManager = new SessionManager(hiveServer2);
addService(sessionManager);
// If the hadoop cluster is secure, do a kerberos login for the service from the keytab
if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
@@ -201,7 +205,8 @@ public class CLIService extends Composit
* @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle)
*/
@Override
- public void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
+ public void closeSession(SessionHandle sessionHandle)
+ throws HiveSQLException {
sessionManager.closeSession(sessionHandle);
LOG.debug(sessionHandle + ": closeSession()");
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Thu Oct 30 16:22:33 2014
@@ -35,14 +35,16 @@ import org.apache.hadoop.hive.common.cli
import org.apache.hadoop.hive.common.cli.IHiveFileProcessor;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.exec.FetchFormatter;
import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
import org.apache.hadoop.hive.ql.history.HiveHistory;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.SetProcessor;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.*;
@@ -80,7 +82,6 @@ public class HiveSessionImpl implements
private SessionManager sessionManager;
private OperationManager operationManager;
- private IMetaStoreClient metastoreClient = null;
private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>();
private boolean isOperationLogEnabled;
private File sessionLogDir;
@@ -95,6 +96,17 @@ public class HiveSessionImpl implements
this.hiveConf = new HiveConf(serverhiveConf);
this.ipAddress = ipAddress;
+ try {
+ // In non-impersonation mode, map scheduler queue to current user
+ // if fair scheduler is configured.
+ if (! hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
+ hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) {
+ ShimLoader.getHadoopShims().refreshDefaultQueue(hiveConf, username);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error setting scheduler queue: " + e, e);
+ }
+
// Set an explicit session name to control the download directory name
hiveConf.set(ConfVars.HIVESESSIONID.varname,
sessionHandle.getHandleIdentifier().toString());
@@ -315,14 +327,13 @@ public class HiveSessionImpl implements
@Override
public IMetaStoreClient getMetaStoreClient() throws HiveSQLException {
- if (metastoreClient == null) {
- try {
- metastoreClient = new HiveMetaStoreClient(getHiveConf());
- } catch (MetaException e) {
- throw new HiveSQLException(e);
- }
+ try {
+ return Hive.get(getHiveConf()).getMSC();
+ } catch (HiveException e) {
+ throw new HiveSQLException("Failed to get metastore connection", e);
+ } catch (MetaException e) {
+ throw new HiveSQLException("Failed to get metastore connection", e);
}
- return metastoreClient;
}
@Override
@@ -538,14 +549,6 @@ public class HiveSessionImpl implements
public void close() throws HiveSQLException {
try {
acquire(true);
- /**
- * For metadata operations like getTables(), getColumns() etc,
- * the session allocates a private metastore handler which should be
- * closed at the end of the session
- */
- if (metastoreClient != null) {
- metastoreClient.close();
- }
// Iterate through the opHandles and close their operations
for (OperationHandle opHandle : opHandleSet) {
operationManager.closeOperation(opHandle);
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Thu Oct 30 16:22:33 2014
@@ -43,6 +43,7 @@ import org.apache.hive.service.cli.HiveS
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.HiveServer2;
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
/**
@@ -65,9 +66,12 @@ public class SessionManager extends Comp
private long sessionTimeout;
private volatile boolean shutdown;
+ // The HiveServer2 instance running this service
+ private final HiveServer2 hiveServer2;
- public SessionManager() {
+ public SessionManager(HiveServer2 hiveServer2) {
super(SessionManager.class.getSimpleName());
+ this.hiveServer2 = hiveServer2;
}
@Override
@@ -232,10 +236,10 @@ public class SessionManager extends Comp
/**
* Opens a new session and creates a session handle.
* The username passed to this method is the effective username.
- * If withImpersonation is true (==doAs true) we wrap all the calls in HiveSession
+ * If withImpersonation is true (==doAs true) we wrap all the calls in HiveSession
* within a UGI.doAs, where UGI corresponds to the effective user.
- * @see org.apache.hive.service.cli.thrift.ThriftCLIService#getUserName()
- *
+ * @see org.apache.hive.service.cli.thrift.ThriftCLIService#getUserName()
+ *
* @param protocol
* @param username
* @param password
@@ -288,6 +292,24 @@ public class SessionManager extends Comp
throw new HiveSQLException("Session does not exist!");
}
session.close();
+ // Shutdown HiveServer2 if it has been deregistered from ZooKeeper and has no active sessions
+ if (!(hiveServer2 == null) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY))
+ && (!hiveServer2.isRegisteredWithZooKeeper())) {
+ // Asynchronously shutdown this instance of HiveServer2,
+ // if there are no active client sessions
+ if (getOpenSessionCount() == 0) {
+ LOG.info("This instance of HiveServer2 has been removed from the list of server "
+ + "instances available for dynamic service discovery. "
+ + "The last client session has ended - will shutdown now.");
+ Thread shutdownThread = new Thread() {
+ @Override
+ public void run() {
+ hiveServer2.stop();
+ }
+ };
+ shutdownThread.start();
+ }
+ }
}
public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException {
@@ -376,6 +398,5 @@ public class SessionManager extends Comp
public int getOpenSessionCount() {
return handleToSession.size();
}
-
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java Thu Oct 30 16:22:33 2014
@@ -30,7 +30,7 @@ import org.apache.hive.service.cli.ICLIS
public class EmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService {
public EmbeddedThriftBinaryCLIService() {
- super(new CLIService());
+ super(new CLIService(null));
isEmbedded = true;
HiveConf.setLoadHiveServer2Config(true);
cliService.init(new HiveConf());
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Thu Oct 30 16:22:33 2014
@@ -66,7 +66,6 @@ public abstract class ThriftCLIService e
protected int minWorkerThreads;
protected int maxWorkerThreads;
protected long workerKeepAliveTime;
- private HiveServer2 hiveServer2;
public ThriftCLIService(CLIService cliService, String serviceName) {
super(serviceName);
@@ -264,9 +263,9 @@ public abstract class ThriftCLIService e
/**
* Returns the effective username.
- * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user
+ * 1. If hive.server2.allow.user.substitution = false: the username of the connecting user
* 2. If hive.server2.allow.user.substitution = true: the username of the end user,
- * that the connecting user is trying to proxy for.
+ * that the connecting user is trying to proxy for.
* This includes a check whether the connecting user is allowed to proxy for the end user.
* @param req
* @return
@@ -366,24 +365,6 @@ public abstract class ThriftCLIService e
} catch (Exception e) {
LOG.warn("Error closing session: ", e);
resp.setStatus(HiveSQLException.toTStatus(e));
- } finally {
- if (!(isEmbedded) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY))
- && (!hiveServer2.isRegisteredWithZooKeeper())) {
- // Asynchronously shutdown this instance of HiveServer2,
- // if there are no active client sessions
- if (cliService.getSessionManager().getOpenSessionCount() == 0) {
- LOG.info("This instance of HiveServer2 has been removed from the list of server "
- + "instances available for dynamic service discovery. "
- + "The last client session has ended - will shutdown now.");
- Thread shutdownThread = new Thread() {
- @Override
- public void run() {
- hiveServer2.stop();
- }
- };
- shutdownThread.start();
- }
- }
}
return resp;
}
@@ -666,10 +647,4 @@ public abstract class ThriftCLIService e
return cliService.getHiveConf().getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION)
.equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
}
-
- public void setHiveServer2(HiveServer2 hiveServer2) {
- this.hiveServer2 = hiveServer2;
- }
-
}
-
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Thu Oct 30 16:22:33 2014
@@ -29,12 +29,10 @@ import org.apache.hadoop.hive.shims.Shim
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hive.service.auth.HiveAuthFactory;
-import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.thrift.TCLIService.Iface;
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServlet;
@@ -60,9 +58,6 @@ public class ThriftHttpCLIService extend
@Override
public void run() {
try {
- // Verify config validity
- verifyHttpConfiguration(hiveConf);
-
// HTTP Server
httpServer = new org.eclipse.jetty.server.Server();
@@ -162,32 +157,4 @@ public class ThriftHttpCLIService extend
}
return httpPath;
}
-
- /**
- * Verify that this configuration is supported by transportMode of HTTP
- * @param hiveConf
- */
- private static void verifyHttpConfiguration(HiveConf hiveConf) {
- String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
-
- // Error out if KERBEROS auth mode is being used and use SSL is also set to true
- if(authType.equalsIgnoreCase(AuthTypes.KERBEROS.toString()) &&
- hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
- String msg = ConfVars.HIVE_SERVER2_AUTHENTICATION + " setting of " +
- authType + " is not supported with " +
- ConfVars.HIVE_SERVER2_USE_SSL + " set to true";
- LOG.fatal(msg);
- throw new RuntimeException(msg);
- }
-
- // Warn that SASL is not used in http mode
- if(authType.equalsIgnoreCase(AuthTypes.NONE.toString())) {
- // NONE in case of thrift mode uses SASL
- LOG.warn(ConfVars.HIVE_SERVER2_AUTHENTICATION + " setting to " +
- authType + ". SASL is not supported with http transport mode," +
- " so using equivalent of "
- + AuthTypes.NOSASL);
- }
- }
-
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java Thu Oct 30 16:22:33 2014
@@ -31,8 +31,9 @@ import org.apache.commons.codec.binary.B
import org.apache.commons.codec.binary.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hive.service.auth.AuthenticationProviderFactory;
import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
import org.apache.hive.service.auth.HiveAuthFactory;
@@ -237,19 +238,31 @@ public class ThriftHttpServlet extends T
}
}
- private String getPrincipalWithoutRealm(String fullPrincipal) {
- KerberosName fullKerberosName = new KerberosName(fullPrincipal);
+ private String getPrincipalWithoutRealm(String fullPrincipal)
+ throws HttpAuthenticationException {
+ KerberosNameShim fullKerberosName;
+ try {
+ fullKerberosName = ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal);
+ } catch (IOException e) {
+ throw new HttpAuthenticationException(e);
+ }
String serviceName = fullKerberosName.getServiceName();
- String hostName = fullKerberosName.getHostName();
+ String hostName = fullKerberosName.getHostName();
String principalWithoutRealm = serviceName;
if (hostName != null) {
principalWithoutRealm = serviceName + "/" + hostName;
}
return principalWithoutRealm;
}
-
- private String getPrincipalWithoutRealmAndHost(String fullPrincipal) {
- KerberosName fullKerberosName = new KerberosName(fullPrincipal);
+
+ private String getPrincipalWithoutRealmAndHost(String fullPrincipal)
+ throws HttpAuthenticationException {
+ KerberosNameShim fullKerberosName;
+ try {
+ fullKerberosName = ShimLoader.getHadoopShims().getKerberosNameShim(fullPrincipal);
+ } catch (IOException e) {
+ throw new HttpAuthenticationException(e);
+ }
return fullKerberosName.getServiceName();
}
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java Thu Oct 30 16:22:33 2014
@@ -18,8 +18,18 @@
package org.apache.hive.service.server;
+import java.io.IOException;
import java.nio.charset.Charset;
-
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.LogUtils;
@@ -29,10 +39,10 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.hive.service.CompositeService;
-import org.apache.hive.service.ServiceException;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
@@ -42,7 +52,9 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
/**
* HiveServer2.
@@ -65,7 +77,7 @@ public class HiveServer2 extends Composi
@Override
public synchronized void init(HiveConf hiveConf) {
- cliService = new CLIService();
+ cliService = new CLIService(this);
addService(cliService);
if (isHTTPTransportMode(hiveConf)) {
thriftCLIService = new ThriftHttpCLIService(cliService);
@@ -73,7 +85,6 @@ public class HiveServer2 extends Composi
thriftCLIService = new ThriftBinaryCLIService(cliService);
}
addService(thriftCLIService);
- thriftCLIService.setHiveServer2(this);
super.init(hiveConf);
// Add a shutdown hook for catching SIGTERM & SIGINT
@@ -110,14 +121,19 @@ public class HiveServer2 extends Composi
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
String instanceURI = getServerInstanceURI(hiveConf);
byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
+ // Znode ACLs
+ List<ACL> nodeAcls = new ArrayList<ACL>();
+ setUpAuthAndAcls(hiveConf, nodeAcls);
+ // Create a ZooKeeper client
zooKeeperClient =
new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
new ZooKeeperHiveHelper.DummyWatcher());
-
- // Create the parent znodes recursively; ignore if the parent already exists
+ // Create the parent znodes recursively; ignore if the parent already exists.
+ // If pre-creating the parent on a kerberized cluster, ensure that you give ACLs,
+ // as explained in {@link #setUpAuthAndAcls(HiveConf, List<ACL>) setUpAuthAndAcls}
try {
- ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace,
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, nodeAcls,
+ CreateMode.PERSISTENT);
LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
} catch (KeeperException e) {
if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -126,14 +142,14 @@ public class HiveServer2 extends Composi
}
}
// Create a znode under the rootNamespace parent for this instance of the server
- // Znode name: server-host:port-versionInfo-sequence
+ // Znode name: serverUri=host:port;version=versionInfo;sequence=sequenceNumber
try {
- String znodePath =
+ String pathPrefix =
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
- + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "server-" + instanceURI + "-"
- + HiveVersionInfo.getVersion() + "-";
+ + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
+ + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
znodePath =
- zooKeeperClient.create(znodePath, znodeDataUTF8, Ids.OPEN_ACL_UNSAFE,
+ zooKeeperClient.create(pathPrefix, znodeDataUTF8, nodeAcls,
CreateMode.EPHEMERAL_SEQUENTIAL);
setRegisteredWithZooKeeper(true);
// Set a watch on the znode
@@ -149,11 +165,50 @@ public class HiveServer2 extends Composi
}
/**
+ * Set up ACLs for znodes based on whether the cluster is secure or not.
+ * On a kerberized cluster, ZooKeeper performs Kerberos-SASL authentication.
+ * We give Read privilege to the world, but Create/Delete/Write/Admin to the authenticated user.
+ * On a non-kerberized cluster, we give Create/Read/Delete/Write/Admin privileges to the world.
+ *
+ * For a kerberized cluster, we also dynamically set up the client's JAAS conf.
+ * @param hiveConf
+ * @param nodeAcls
+ * @return
+ * @throws Exception
+ */
+ private void setUpAuthAndAcls(HiveConf hiveConf, List<ACL> nodeAcls) throws Exception {
+ if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
+ String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
+ if (principal.isEmpty()) {
+ throw new IOException(
+ "HiveServer2 Kerberos principal is empty");
+ }
+ String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+ if (keyTabFile.isEmpty()) {
+ throw new IOException(
+ "HiveServer2 Kerberos keytab is empty");
+ }
+
+ // Install the JAAS Configuration for the runtime
+ ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keyTabFile);
+ // Read all to the world
+ nodeAcls.addAll(Ids.READ_ACL_UNSAFE);
+ // Create/Delete/Write/Admin to the authenticated user
+ nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS));
+ } else {
+ // ACLs for znodes on a non-kerberized cluster
+ // Create/Read/Delete/Write/Admin to the world
+ nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE);
+ }
+ }
+
+ /**
* The watcher class which sets the de-register flag when the znode corresponding to this server
* instance is deleted. Additionally, it shuts down the server if there are no more active client
* sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
*/
private class DeRegisterWatcher implements Watcher {
+ @Override
public void process(WatchedEvent event) {
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
HiveServer2.this.setRegisteredWithZooKeeper(false);
@@ -233,6 +288,7 @@ public class HiveServer2 extends Composi
private static void startHiveServer2() throws Throwable {
long attempts = 0, maxAttempts = 1;
while (true) {
+ LOG.info("Starting HiveServer2");
HiveConf hiveConf = new HiveConf();
maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
HiveServer2 server = null;
@@ -280,31 +336,206 @@ public class HiveServer2 extends Composi
}
}
+ /**
+ * Remove all znodes corresponding to the given version number from ZooKeeper
+ *
+ * @param versionNumber
+ * @throws Exception
+ */
+ static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception {
+ HiveConf hiveConf = new HiveConf();
+ int zooKeeperSessionTimeout =
+ hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+ String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
+ String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+ ZooKeeper zooKeeperClient =
+ new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
+ new ZooKeeperHiveHelper.DummyWatcher());
+ // Get all znode paths
+ List<String> znodePaths =
+ zooKeeperClient.getChildren(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace,
+ false);
+ // Now for each path that is for the given versionNumber, delete the znode from ZooKeeper
+ for (String znodePath : znodePaths) {
+ if (znodePath.contains("version=" + versionNumber + ";")) {
+ zooKeeperClient.delete(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath, -1);
+ }
+ }
+ }
+
public static void main(String[] args) {
HiveConf.setLoadHiveServer2Config(true);
try {
ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");
- if (!oproc.process(args)) {
- System.err.println("Error starting HiveServer2 with given arguments");
- System.exit(-1);
- }
+ ServerOptionsProcessorResponse oprocResponse = oproc.parse(args);
// NOTE: It is critical to do this here so that log4j is reinitialized
// before any of the other core hive classes are loaded
String initLog4jMessage = LogUtils.initHiveLog4j();
LOG.debug(initLog4jMessage);
-
HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);
- // log debug message from "oproc" after log4j initialize properly
+
+ // Log debug message from "oproc" after log4j initialize properly
LOG.debug(oproc.getDebugMessage().toString());
- startHiveServer2();
+ // Call the executor which will execute the appropriate command based on the parsed options
+ oprocResponse.getServerOptionsExecutor().execute();
} catch (LogInitializationException e) {
LOG.error("Error initializing log: " + e.getMessage(), e);
System.exit(-1);
- } catch (Throwable t) {
- LOG.fatal("Error starting HiveServer2", t);
- System.exit(-1);
+ }
+ }
+
+ /**
+ * ServerOptionsProcessor.
+ * Process arguments given to HiveServer2 (-hiveconf property=value)
+ * Set properties in System properties
+ * Create an appropriate response object,
+ * which has executor to execute the appropriate command based on the parsed options.
+ */
+ static class ServerOptionsProcessor {
+ private final Options options = new Options();
+ private org.apache.commons.cli.CommandLine commandLine;
+ private final String serverName;
+ private final StringBuilder debugMessage = new StringBuilder();
+
+ @SuppressWarnings("static-access")
+ ServerOptionsProcessor(String serverName) {
+ this.serverName = serverName;
+ // -hiveconf x=y
+ options.addOption(OptionBuilder
+ .withValueSeparator()
+ .hasArgs(2)
+ .withArgName("property=value")
+ .withLongOpt("hiveconf")
+ .withDescription("Use value for given property")
+ .create());
+ // -deregister <versionNumber>
+ options.addOption(OptionBuilder
+ .hasArgs(1)
+ .withArgName("versionNumber")
+ .withLongOpt("deregister")
+ .withDescription("Deregister all instances of given version from dynamic service discovery")
+ .create());
+ options.addOption(new Option("H", "help", false, "Print help information"));
+ }
+
+ ServerOptionsProcessorResponse parse(String[] argv) {
+ try {
+ commandLine = new GnuParser().parse(options, argv);
+ // Process --hiveconf
+ // Get hiveconf param values and set the System property values
+ Properties confProps = commandLine.getOptionProperties("hiveconf");
+ for (String propKey : confProps.stringPropertyNames()) {
+ // save logging message for log4j output latter after log4j initialize properly
+ debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n");
+ System.setProperty(propKey, confProps.getProperty(propKey));
+ }
+
+ // Process --help
+ if (commandLine.hasOption('H')) {
+ return new ServerOptionsProcessorResponse(new HelpOptionExecutor(serverName, options));
+ }
+
+ // Process --deregister
+ if (commandLine.hasOption("deregister")) {
+ return new ServerOptionsProcessorResponse(new DeregisterOptionExecutor(
+ commandLine.getOptionValue("deregister")));
+ }
+ } catch (ParseException e) {
+ // Error out & exit - we were not able to parse the args successfully
+ System.err.println("Error starting HiveServer2 with given arguments: ");
+ System.err.println(e.getMessage());
+ System.exit(-1);
+ }
+ // Default executor, when no option is specified
+ return new ServerOptionsProcessorResponse(new StartOptionExecutor());
+ }
+
+ StringBuilder getDebugMessage() {
+ return debugMessage;
+ }
+ }
+
+ /**
+ * The response sent back from {@link ServerOptionsProcessor#parse(String[])}
+ */
+ static class ServerOptionsProcessorResponse {
+ private final ServerOptionsExecutor serverOptionsExecutor;
+
+ ServerOptionsProcessorResponse(ServerOptionsExecutor serverOptionsExecutor) {
+ this.serverOptionsExecutor = serverOptionsExecutor;
+ }
+
+ ServerOptionsExecutor getServerOptionsExecutor() {
+ return serverOptionsExecutor;
+ }
+ }
+
+ /**
+ * The executor interface for running the appropriate HiveServer2 command based on parsed options
+ */
+ static interface ServerOptionsExecutor {
+ public void execute();
+ }
+
+ /**
+ * HelpOptionExecutor: executes the --help option by printing out the usage
+ */
+ static class HelpOptionExecutor implements ServerOptionsExecutor {
+ private final Options options;
+ private final String serverName;
+
+ HelpOptionExecutor(String serverName, Options options) {
+ this.options = options;
+ this.serverName = serverName;
+ }
+
+ @Override
+ public void execute() {
+ new HelpFormatter().printHelp(serverName, options);
+ System.exit(0);
+ }
+ }
+
+ /**
+ * StartOptionExecutor: starts HiveServer2.
+ * This is the default executor, when no option is specified.
+ */
+ static class StartOptionExecutor implements ServerOptionsExecutor {
+ @Override
+ public void execute() {
+ try {
+ startHiveServer2();
+ } catch (Throwable t) {
+ LOG.fatal("Error starting HiveServer2", t);
+ System.exit(-1);
+ }
+ }
+ }
+
+ /**
+ * DeregisterOptionExecutor: executes the --deregister option by
+ * deregistering all HiveServer2 instances from ZooKeeper of a specific version.
+ */
+ static class DeregisterOptionExecutor implements ServerOptionsExecutor {
+ private final String versionNumber;
+
+ DeregisterOptionExecutor(String versionNumber) {
+ this.versionNumber = versionNumber;
+ }
+
+ @Override
+ public void execute() {
+ try {
+ deleteServerInstancesFromZooKeeper(versionNumber);
+ } catch (Exception e) {
+ LOG.fatal("Error deregistering HiveServer2 instances for version: " + versionNumber
+ + " from ZooKeeper", e);
+ System.exit(-1);
+ }
+ System.exit(0);
}
}
}
Modified: hive/branches/spark/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java (original)
+++ hive/branches/spark/service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java Thu Oct 30 16:22:33 2014
@@ -39,7 +39,7 @@ public class TestPlainSaslHelper extends
hconf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS));
- CLIService cliService = new CLIService();
+ CLIService cliService = new CLIService(null);
cliService.init(hconf);
ThriftCLIService tcliService = new ThriftBinaryCLIService(cliService);
tcliService.init(hconf);
Modified: hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java (original)
+++ hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java Thu Oct 30 16:22:33 2014
@@ -52,7 +52,7 @@ public class TestSessionGlobalInitFile e
*/
private class FakeEmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService {
public FakeEmbeddedThriftBinaryCLIService(HiveConf hiveConf) {
- super(new CLIService());
+ super(new CLIService(null));
isEmbedded = true;
cliService.init(hiveConf);
cliService.start();
Modified: hive/branches/spark/service/src/test/org/apache/hive/service/server/TestServerOptionsProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/test/org/apache/hive/service/server/TestServerOptionsProcessor.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/service/src/test/org/apache/hive/service/server/TestServerOptionsProcessor.java (original)
+++ hive/branches/spark/service/src/test/org/apache/hive/service/server/TestServerOptionsProcessor.java Thu Oct 30 16:22:33 2014
@@ -21,6 +21,8 @@ package org.apache.hive.service.server;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.hive.service.server.HiveServer2.ServerOptionsProcessor;
+
/**
* Test ServerOptionsProcessor
*
@@ -39,17 +41,12 @@ public class TestServerOptionsProcessor
null,
System.getProperty(key));
+ optProcessor.parse(args);
- boolean isSuccess = optProcessor.process(args);
- Assert.assertTrue("options processor result", isSuccess);
Assert.assertEquals(
"checking system property after processing options",
value,
System.getProperty(key));
-
-
-
-
}
}
Modified: hive/branches/spark/shims/0.20/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20/pom.xml (original)
+++ hive/branches/spark/shims/0.20/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Thu Oct 30 16:22:33 2014
@@ -619,6 +619,12 @@ public class Hadoop20Shims implements Ha
}
@Override
+ public String getResolvedPrincipal(String principal) throws IOException {
+ // Not supported
+ return null;
+ }
+
+ @Override
public void reLoginUserFromKeytab() throws IOException{
throwKerberosUnsupportedError();
}
@@ -698,7 +704,7 @@ public class Hadoop20Shims implements Ha
}
public class Hadoop20FileStatus implements HdfsFileStatus {
- private FileStatus fileStatus;
+ private final FileStatus fileStatus;
public Hadoop20FileStatus(FileStatus fileStatus) {
this.fileStatus = fileStatus;
}
@@ -706,6 +712,7 @@ public class Hadoop20Shims implements Ha
public FileStatus getFileStatus() {
return fileStatus;
}
+ @Override
public void debugLog() {
if (fileStatus != null) {
LOG.debug(fileStatus.toString());
@@ -824,6 +831,11 @@ public class Hadoop20Shims implements Ha
}
@Override
+ public void refreshDefaultQueue(Configuration conf, String userName) {
+ // MR1 does not expose API required to set MR queue mapping for user
+ }
+
+ @Override
public String getTokenFileLocEnvName() {
throw new UnsupportedOperationException(
"Kerberos not supported in current hadoop version");
@@ -928,4 +940,15 @@ public class Hadoop20Shims implements Ha
public Path getCurrentTrashPath(Configuration conf, FileSystem fs) {
return null;
}
+
+ @Override
+ public KerberosNameShim getKerberosNameShim(String name) throws IOException {
+ // Not supported
+ return null;
+ }
+
+ @Override
+ public void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) {
+ // Not supported
+ }
}
Modified: hive/branches/spark/shims/0.20S/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20S/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20S/pom.xml (original)
+++ hive/branches/spark/shims/0.20S/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Thu Oct 30 16:22:33 2014
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.security.KerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.VersionInfo;
@@ -158,6 +159,11 @@ public class Hadoop20SShims extends Hado
}
@Override
+ public void refreshDefaultQueue(Configuration conf, String userName) {
+ // MR1 does not expose API required to set MR queue mapping for user
+ }
+
+ @Override
public void setTotalOrderPartitionFile(JobConf jobConf, Path partitionFile){
TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
}
@@ -546,4 +552,44 @@ public class Hadoop20SShims extends Hado
public Path getCurrentTrashPath(Configuration conf, FileSystem fs) {
return null;
}
+
+ /**
+ * Returns a shim to wrap KerberosName
+ */
+ @Override
+ public KerberosNameShim getKerberosNameShim(String name) throws IOException {
+ return new KerberosNameShim(name);
+ }
+
+ /**
+ * Shim for KerberosName
+ */
+ public class KerberosNameShim implements HadoopShimsSecure.KerberosNameShim {
+
+ private KerberosName kerberosName;
+
+ public KerberosNameShim(String name) {
+ kerberosName = new KerberosName(name);
+ }
+
+ public String getDefaultRealm() {
+ return kerberosName.getDefaultRealm();
+ }
+
+ public String getServiceName() {
+ return kerberosName.getServiceName();
+ }
+
+ public String getHostName() {
+ return kerberosName.getHostName();
+ }
+
+ public String getRealm() {
+ return kerberosName.getRealm();
+ }
+
+ public String getShortName() throws IOException {
+ return kerberosName.getShortName();
+ }
+ }
}
Modified: hive/branches/spark/shims/0.23/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/0.23/pom.xml (original)
+++ hive/branches/spark/shims/0.23/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
@@ -120,6 +120,11 @@
<optional>true</optional>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+ <version>${hadoop-23.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-tests</artifactId>
<version>${tez.version}</version>
Modified: hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Thu Oct 30 16:22:33 2014
@@ -72,8 +72,13 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
import org.apache.tez.test.MiniTezCluster;
import com.google.common.base.Joiner;
@@ -85,6 +90,7 @@ import com.google.common.collect.Iterabl
* Implemention of shims against Hadoop 0.23.0.
*/
public class Hadoop23Shims extends HadoopShimsSecure {
+ private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename";
HadoopShims.MiniDFSShim cluster = null;
@@ -220,6 +226,30 @@ public class Hadoop23Shims extends Hadoo
}
/**
+ * Load the fair scheduler queue for given user if available.
+ */
+ @Override
+ public void refreshDefaultQueue(Configuration conf, String userName) throws IOException {
+ String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME;
+ if (StringUtils.isNotBlank(userName) && isFairScheduler(conf)) {
+ AllocationConfiguration allocConf = new AllocationConfiguration(conf);
+ QueuePlacementPolicy queuePolicy = allocConf.getPlacementPolicy();
+ if (queuePolicy != null) {
+ requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName);
+ if (StringUtils.isNotBlank(requestedQueue)) {
+ LOG.debug("Setting queue name to " + requestedQueue + " for user " + userName);
+ conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue);
+ }
+ }
+ }
+ }
+
+ private boolean isFairScheduler (Configuration conf) {
+ return FairScheduler.class.getName().
+ equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER));
+ }
+
+ /**
* Returns a shim to wrap MiniMrCluster
*/
@Override
@@ -847,4 +877,44 @@ public class Hadoop23Shims extends Hadoo
TrashPolicy tp = TrashPolicy.getInstance(conf, fs, fs.getHomeDirectory());
return tp.getCurrentTrashDir();
}
+
+ /**
+ * Returns a shim to wrap KerberosName
+ */
+ @Override
+ public KerberosNameShim getKerberosNameShim(String name) throws IOException {
+ return new KerberosNameShim(name);
+ }
+
+ /**
+ * Shim for KerberosName
+ */
+ public class KerberosNameShim implements HadoopShimsSecure.KerberosNameShim {
+
+ private KerberosName kerberosName;
+
+ public KerberosNameShim(String name) {
+ kerberosName = new KerberosName(name);
+ }
+
+ public String getDefaultRealm() {
+ return kerberosName.getDefaultRealm();
+ }
+
+ public String getServiceName() {
+ return kerberosName.getServiceName();
+ }
+
+ public String getHostName() {
+ return kerberosName.getHostName();
+ }
+
+ public String getRealm() {
+ return kerberosName.getRealm();
+ }
+
+ public String getShortName() throws IOException {
+ return kerberosName.getShortName();
+ }
+ }
}
Modified: hive/branches/spark/shims/aggregator/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/aggregator/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/aggregator/pom.xml (original)
+++ hive/branches/spark/shims/aggregator/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/shims/common-secure/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/pom.xml (original)
+++ hive/branches/spark/shims/common-secure/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
@@ -74,6 +74,11 @@
<version>${libthrift.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java Thu Oct 30 16:22:33 2014
@@ -29,11 +29,14 @@ import java.security.PrivilegedException
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-import javax.security.auth.login.LoginException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
@@ -66,6 +69,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -73,6 +77,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.tools.HadoopArchives;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
import com.google.common.primitives.Longs;
@@ -88,6 +93,7 @@ public abstract class HadoopShimsSecure
return HtmlQuoting.unquoteHtmlChars(item);
}
+ @Override
public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() {
return new CombineFileInputFormatShim() {
@Override
@@ -171,6 +177,7 @@ public abstract class HadoopShimsSecure
protected boolean isShrinked;
protected long shrinkedLength;
+ @Override
public boolean next(K key, V value) throws IOException {
while ((curReader == null)
@@ -183,11 +190,13 @@ public abstract class HadoopShimsSecure
return true;
}
+ @Override
public K createKey() {
K newKey = curReader.createKey();
return (K)(new CombineHiveKey(newKey));
}
+ @Override
public V createValue() {
return curReader.createValue();
}
@@ -195,10 +204,12 @@ public abstract class HadoopShimsSecure
/**
* Return the amount of data processed.
*/
+ @Override
public long getPos() throws IOException {
return progress;
}
+ @Override
public void close() throws IOException {
if (curReader != null) {
curReader.close();
@@ -209,6 +220,7 @@ public abstract class HadoopShimsSecure
/**
* Return progress based on the amount of data processed so far.
*/
+ @Override
public float getProgress() throws IOException {
return Math.min(1.0f, progress / (float) (split.getLength()));
}
@@ -309,6 +321,7 @@ public abstract class HadoopShimsSecure
CombineFileInputFormat<K, V>
implements HadoopShims.CombineFileInputFormatShim<K, V> {
+ @Override
public Path[] getInputPathsShim(JobConf conf) {
try {
return FileInputFormat.getInputPaths(conf);
@@ -339,7 +352,7 @@ public abstract class HadoopShimsSecure
super.setMaxSplitSize(minSize);
}
- InputSplit[] splits = (InputSplit[]) super.getSplits(job, numSplits);
+ InputSplit[] splits = super.getSplits(job, numSplits);
ArrayList<InputSplitShim> inputSplitShims = new ArrayList<InputSplitShim>();
for (int pos = 0; pos < splits.length; pos++) {
@@ -359,10 +372,12 @@ public abstract class HadoopShimsSecure
return inputSplitShims.toArray(new InputSplitShim[inputSplitShims.size()]);
}
+ @Override
public InputSplitShim getInputSplitShim() throws IOException {
return new InputSplitShim();
}
+ @Override
public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split,
Reporter reporter,
Class<RecordReader<K, V>> rrClass)
@@ -373,6 +388,7 @@ public abstract class HadoopShimsSecure
}
+ @Override
public String getInputFormatClassName() {
return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat";
}
@@ -401,6 +417,7 @@ public abstract class HadoopShimsSecure
* the archive as compared to the full path in case of earlier versions.
* See this api in Hadoop20Shims for comparison.
*/
+ @Override
public URI getHarUri(URI original, URI base, URI originalBase)
throws URISyntaxException {
URI relative = originalBase.relativize(original);
@@ -431,6 +448,7 @@ public abstract class HadoopShimsSecure
public void abortTask(TaskAttemptContext taskContext) { }
}
+ @Override
public void prepareJobOutput(JobConf conf) {
conf.setOutputCommitter(NullOutputCommitter.class);
@@ -573,6 +591,17 @@ public abstract class HadoopShimsSecure
return UserGroupInformation.loginUserFromKeytabAndReturnUGI(hostPrincipal, keytabFile);
}
+ /**
+ * Convert Kerberos principal name pattern to valid Kerberos principal names.
+ * @param principal (principal name pattern)
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public String getResolvedPrincipal(String principal) throws IOException {
+ return SecurityUtil.getServerPrincipal(principal, "0.0.0.0");
+ }
+
@Override
public String getTokenFileLocEnvName() {
return UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
@@ -675,4 +704,58 @@ public abstract class HadoopShimsSecure
throws IOException, AccessControlException, Exception {
DefaultFileAccess.checkFileAccess(fs, stat, action);
}
+
+ @Override
+ public void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) throws IOException {
+ // ZooKeeper property name to pick the correct JAAS conf section
+ final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient";
+ System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME);
+
+ principal = getResolvedPrincipal(principal);
+ JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile);
+
+ // Install the Configuration in the runtime.
+ javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+ }
+
+ /**
+ * A JAAS configuration for ZooKeeper clients intended to use for SASL
+ * Kerberos.
+ */
+ private static class JaasConfiguration extends javax.security.auth.login.Configuration {
+ // Current installed Configuration
+ private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration
+ .getConfiguration();
+ private final String loginContextName;
+ private final String principal;
+ private final String keyTabFile;
+
+ public JaasConfiguration(String hiveLoginContextName, String principal, String keyTabFile) {
+ this.loginContextName = hiveLoginContextName;
+ this.principal = principal;
+ this.keyTabFile = keyTabFile;
+ }
+
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+ if (loginContextName.equals(appName)) {
+ Map<String, String> krbOptions = new HashMap<String, String>();
+ krbOptions.put("doNotPrompt", "true");
+ krbOptions.put("storeKey", "true");
+ krbOptions.put("useKeyTab", "true");
+ krbOptions.put("principal", principal);
+ krbOptions.put("keyTab", keyTabFile);
+ krbOptions.put("refreshKrb5Config", "true");
+ AppConfigurationEntry hiveZooKeeperClientEntry = new AppConfigurationEntry(
+ KerberosUtil.getKrb5LoginModuleName(), LoginModuleControlFlag.REQUIRED, krbOptions);
+ return new AppConfigurationEntry[] { hiveZooKeeperClientEntry };
+ }
+ // Try the base config
+ if (baseConfig != null) {
+ return baseConfig.getAppConfigurationEntry(appName);
+ }
+ return null;
+ }
+ }
+
}
Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java Thu Oct 30 16:22:33 2014
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
@@ -108,18 +109,17 @@ public class DBTokenStore implements Del
return delTokenIdents;
}
- private Object hmsHandler;
+ private Object rawStore;
@Override
- public void setStore(Object hms) throws TokenStoreException {
- hmsHandler = hms;
+ public void init(Object rawStore, ServerMode smode) throws TokenStoreException {
+ this.rawStore = rawStore;
}
private Object invokeOnRawStore(String methName, Object[] params, Class<?> ... paramTypes)
throws TokenStoreException{
try {
- Object rawStore = hmsHandler.getClass().getMethod("getMS").invoke(hmsHandler);
return rawStore.getClass().getMethod(methName, paramTypes).invoke(rawStore, params);
} catch (IllegalArgumentException e) {
throw new TokenStoreException(e);
@@ -149,5 +149,4 @@ public class DBTokenStore implements Del
// No-op.
}
-
}
Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java Thu Oct 30 16:22:33 2014
@@ -21,6 +21,7 @@ import java.io.Closeable;
import java.util.List;
import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
/**
@@ -108,6 +109,10 @@ public interface DelegationTokenStore ex
*/
List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() throws TokenStoreException;
- void setStore(Object hmsHandler) throws TokenStoreException;
+ /**
+ * @param hmsHandler ObjectStore used by DBTokenStore
+ * @param smode Indicate whether this is a metastore or hiveserver2 token store
+ */
+ void init(Object hmsHandler, ServerMode smode);
}
Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Thu Oct 30 16:22:33 2014
@@ -308,6 +308,10 @@ public class HadoopThriftAuthBridge20S e
"hive.cluster.delegation.token.store.class";
public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
"hive.cluster.delegation.token.store.zookeeper.connectString";
+ // alternate connect string specification configuration
+ public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE =
+ "hive.zookeeper.quorum";
+
public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS =
"hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis";
public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
@@ -315,7 +319,7 @@ public class HadoopThriftAuthBridge20S e
public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
"hive.cluster.delegation.token.store.zookeeper.acl";
public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT =
- "/hive/cluster/delegation";
+ "/hivedelegation";
public Server() throws TTransportException {
try {
@@ -417,7 +421,7 @@ public class HadoopThriftAuthBridge20S e
}
@Override
- public void startDelegationTokenSecretManager(Configuration conf, Object hms)
+ public void startDelegationTokenSecretManager(Configuration conf, Object rawStore, ServerMode smode)
throws IOException{
long secretKeyInterval =
conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
@@ -430,7 +434,7 @@ public class HadoopThriftAuthBridge20S e
DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
DelegationTokenStore dts = getTokenStore(conf);
- dts.setStore(hms);
+ dts.init(rawStore, smode);
secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime,
tokenRenewInterval,
Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java Thu Oct 30 16:22:33 2014
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
/**
@@ -108,8 +109,7 @@ public class MemoryTokenStore implements
}
@Override
- public void setStore(Object hmsHandler) throws TokenStoreException {
+ public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException {
// no-op
}
-
}
Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java Thu Oct 30 16:22:33 2014
@@ -265,6 +265,7 @@ public class TokenStoreDelegationTokenSe
/**
* Extension of rollMasterKey to remove expired keys from store.
+ *
* @throws IOException
*/
protected void rollMasterKeyExt() throws IOException {
@@ -273,18 +274,21 @@ public class TokenStoreDelegationTokenSe
HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this);
List<DelegationKey> keysAfterRoll = Arrays.asList(getAllKeys());
for (DelegationKey key : keysAfterRoll) {
- keys.remove(key.getKeyId());
- if (key.getKeyId() == currentKeyId) {
- tokenStore.updateMasterKey(currentKeyId, encodeWritable(key));
- }
+ keys.remove(key.getKeyId());
+ if (key.getKeyId() == currentKeyId) {
+ tokenStore.updateMasterKey(currentKeyId, encodeWritable(key));
+ }
}
for (DelegationKey expiredKey : keys.values()) {
LOGGER.info("Removing expired key id={}", expiredKey.getKeyId());
- tokenStore.removeMasterKey(expiredKey.getKeyId());
+ try {
+ tokenStore.removeMasterKey(expiredKey.getKeyId());
+ } catch (Exception e) {
+ LOGGER.error("Error removing expired key id={}", expiredKey.getKeyId(), e);
+ }
}
}
-
/**
* Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access
* restriction (there would not be an need to clone the remove thread if the remove logic was