You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2020/06/11 19:09:01 UTC
[hive] branch master updated: HIVE-21894: Hadoop credential
password storage for the Kafka Storage handler when security is SSL (#839)
This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4ead9d3 HIVE-21894: Hadoop credential password storage for the Kafka Storage handler when security is SSL (#839)
4ead9d3 is described below
commit 4ead9d35eadc997b65ceeb64f1fa33c71e47070d
Author: Justin Leet <ju...@gmail.com>
AuthorDate: Thu Jun 11 15:08:53 2020 -0400
HIVE-21894: Hadoop credential password storage for the Kafka Storage handler when security is SSL (#839)
---
kafka-handler/README.md | 71 +++++++++++++++++---
.../hadoop/hive/kafka/KafkaTableProperties.java | 38 ++++++++++-
.../org/apache/hadoop/hive/kafka/KafkaUtils.java | 78 +++++++++++++++++++++-
.../apache/hadoop/hive/kafka/KafkaUtilsTest.java | 23 +++++++
.../kafka/kafka_storage_handler.q.out | 36 ++++++++++
5 files changed, 235 insertions(+), 11 deletions(-)
diff --git a/kafka-handler/README.md b/kafka-handler/README.md
index e7761e3..e02b5e9 100644
--- a/kafka-handler/README.md
+++ b/kafka-handler/README.md
@@ -216,15 +216,68 @@ GROUP BY
## Table Properties
-| Property | Description | Mandatory | Default |
-|-------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------------------------|
-| kafka.topic | Kafka topic name to map the table to. | Yes | null |
-| kafka.bootstrap.servers | Table property indicating Kafka broker(s) connection string. | Yes | null |
-| kafka.serde.class | Serializer and Deserializer class implementation. | No | org.apache.hadoop.hive.serde2.JsonSerDe |
-| hive.kafka.poll.timeout.ms | Parameter indicating Kafka Consumer poll timeout period in millis. FYI this is independent from internal Kafka consumer timeouts. | No | 5000 (5 Seconds) |
-| hive.kafka.max.retries | Number of retries for Kafka metadata fetch operations. | No | 6 |
-| hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata. | No | 30000 (30 Seconds) |
-| kafka.write.semantic | Writer semantics, allowed values (AT_LEAST_ONCE, EXACTLY_ONCE) | No | AT_LEAST_ONCE |
+| Property | Description | Mandatory | Default |
+|--------------------------------------- |------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------------------------|
+| kafka.topic | Kafka topic name to map the table to. | Yes | null |
+| kafka.bootstrap.servers | Table property indicating Kafka broker(s) connection string. | Yes | null |
+| kafka.serde.class | Serializer and Deserializer class implementation. | No | org.apache.hadoop.hive.serde2.JsonSerDe |
+| hive.kafka.poll.timeout.ms | Parameter indicating Kafka Consumer poll timeout period in millis. FYI this is independent from internal Kafka consumer timeouts. | No | 5000 (5 Seconds) |
+| hive.kafka.max.retries | Number of retries for Kafka metadata fetch operations. | No | 6 |
+| hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata. | No | 30000 (30 Seconds) |
+| kafka.write.semantic | Writer semantics, allowed values (AT_LEAST_ONCE, EXACTLY_ONCE) | No | AT_LEAST_ONCE |
+| hive.kafka.ssl.credential.keystore | Location of credential store that holds SSL credentials. Used to avoid plaintext passwords in table properties | No | |
+| hive.kafka.ssl.truststore.password | The key in the credential store used to retrieve the truststore password. This is NOT the password itself. | No | |
+| hive.kafka.ssl.keystore.password | The key in the credential store used to retrieve the keystore password. This is NOT the password itself. Used for 2-way auth. | No | |
+| hive.kafka.ssl.key.password | The key in the credential store used to retrieve the key password. This is NOT the password itself. | No | |
+| hive.kafka.ssl.truststore.location | The location of the SSL truststore. Requires HDFS for queries that require jobs. Kafka requires this to be local, so pull it down. | No | |
+| hive.kafka.ssl.keystore.location | The location of the SSL keystore. Requires HDFS for queries that require jobs. Kafka requires this to be local, so pull it down. | No | |
+
+### SSL
+The user can create SSL connections to Kafka, via the properties described in the table properties.
+These properties are used to retrieve passwords from a credential store to avoid being in plaintext table properties.
+To ensure security, the credential store should have appropriate permissions applied. Clients that query the table without
+being able to read the credentials store will have the query fail.
+
+Normally, the `<consumer/producer>.ssl.truststore.location` and `<consumer/producer>.ssl.keystore.location` would have to be local. Any job that requires a
+job can retrieve these from an HDFS location, and they will be sourced from HDFS and pulled locally for this purpose.
+
+The producer and consumer stores are both sourced from the same property, e.g. `hive.kafka.ssl.truststore.location`, rather than `kafka.consumer.ssl.truststore.location`.
+
+#### SSL Example
+Table creation is very simple, simply create a table as normal, and supply the appropriate Kafka
+configs (e.g. `kafka.consumer.security.protocol`), along with the credential store configs (e.g. `hive.kafka.ssl.credential.store`).
+
+```
+CREATE EXTERNAL TABLE
+ kafka_ssl (
+ `data` STRING
+)
+STORED BY
+ 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES (
+ "kafka.topic" = "test-topic",
+ "kafka.bootstrap.servers" = 'localhost:9093',
+ 'hive.kafka.ssl.credential.keystore'='jceks://hdfs/tmp/test.jceks',
+ 'hive.kafka.ssl.keystore.password'='keystore.password',
+ 'hive.kafka.ssl.truststore.password'='truststore.password',
+ 'kafka.consumer.security.protocol'='SSL',
+ 'hive.kafka.ssl.keystore.location'='hdfs://cluster/tmp/keystore.jks',
+ 'hive.kafka.ssl.truststore.location'='hdfs://cluster/tmp/keystore.jks'
+);
+```
+
+Now we can query the table as normal.
+```
+SELECT * FROM kafka_ssl LIMIT 10;
+```
+
+Our truststore and keystore are located in HDFS, which means we can also run more complex queries that result in jobs.
+These will still connect from Kafka as expected.
+```
+SELECT `data` FROM kafka_ssl where `__offset` > 0 AND `__offset` < 10000 group by `data`;
+
+```
+
### Setting Extra Consumer/Producer properties.
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
index a4ad01a..3e308e6 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
@@ -53,7 +53,43 @@ enum KafkaTableProperties {
/**
* Table property that indicates if we should commit within the task or delay it to the Metadata Hook Commit call.
*/
- HIVE_KAFKA_OPTIMISTIC_COMMIT("hive.kafka.optimistic.commit", "false");
+ HIVE_KAFKA_OPTIMISTIC_COMMIT("hive.kafka.optimistic.commit", "false"),
+
+ /**
+ * Table property indicating the location of the credential store containing passwords that would otherwise be
+ * exposed in Kafka's SSL parameters.
+ */
+ HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE("hive.kafka.ssl.credential.keystore", ""),
+
+ /**
+ * Table property indicating the key in the credential keystore for the truststore password. This is NOT
+ * the actual password.
+ */
+ HIVE_KAFKA_SSL_TRUSTSTORE_PASSWORD("hive.kafka.ssl.truststore.password", ""),
+
+ /**
+ * Table property indicating the key in the credential keystore for the keystore password. This is NOT
+ * the actual password. Only needed for two way authentication.
+ */
+ HIVE_KAFKA_SSL_KEYSTORE_PASSWORD("hive.kafka.ssl.keystore.password", ""),
+
+ /**
+ * Table property indicating the key in the credential keystore for the key password. This is NOT
+ * the actual password. Only needed for two way authentication.
+ */
+ HIVE_KAFKA_SSL_KEY_PASSWORD("hive.kafka.ssl.key.password", ""),
+
+ /**
+ * Table property indicating the location of the SSL truststore. Kafka cannot normally use an HDFS-based location,
+ * but we'll pull it down locally for each consumer/producer.
+ */
+ HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG("hive.kafka.ssl.truststore.location", ""),
+
+ /**
+ * Table property indicating the location of the SSL keystore. Kafka cannot normally use an HDFS-based location,
+ * but we'll pull it down locally for each consumer/producer.
+ */
+ HIVE_SSL_KEYSTORE_LOCATION_CONFIG("hive.kafka.ssl.keystore.location", "");
/**
* Kafka storage handler table properties constructor.
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
index 81252c5..be7c28c 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
@@ -35,6 +35,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidTopicException;
@@ -46,7 +47,10 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
@@ -115,11 +119,81 @@ final class KafkaUtils {
if (UserGroupInformation.isSecurityEnabled()) {
addKerberosJaasConf(configuration, props);
}
- // user can always override stuff
+
+ // user can always override stuff, but SSL properties are derived from configuration, because they require local
+ // files. These need to modified afterwards. This works because these properties use the standard consumer prefix.
props.putAll(extractExtraProperties(configuration, CONSUMER_CONFIGURATION_PREFIX));
+ setupKafkaSslProperties(configuration, props);
+
return props;
}
+ static void setupKafkaSslProperties(Configuration configuration, Properties props) {
+ // Setup SSL via credentials keystore if necessary
+ final String credKeystore = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName());
+ if (!(credKeystore == null) && !credKeystore.isEmpty()) {
+ final String truststorePasswdConfig =
+ configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_TRUSTSTORE_PASSWORD.getName());
+ final String keystorePasswdConfig =
+ configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_KEYSTORE_PASSWORD.getName());
+ final String keyPasswdConfig = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_KEY_PASSWORD.getName());
+
+ String resourcesDir = HiveConf.getVar(configuration, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR);
+ try {
+ String truststoreLoc = configuration.get(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName());
+ Path truststorePath = new Path(truststoreLoc);
+ props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
+ new File(resourcesDir + "/" + truststorePath.getName()).getAbsolutePath());
+ writeStoreToLocal(configuration, truststoreLoc, new File(resourcesDir).getAbsolutePath());
+
+ final String truststorePasswd = Utilities.getPasswdFromKeystore(credKeystore, truststorePasswdConfig);
+ props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePasswd);
+
+ // ssl.keystore.password is only needed if two-way authentication is configured.
+ if(!keystorePasswdConfig.isEmpty()) {
+ log.info("Kafka keystore configured, configuring local keystore");
+ String keystoreLoc = configuration.get(KafkaTableProperties.HIVE_SSL_KEYSTORE_LOCATION_CONFIG.getName());
+ Path keystorePath = new Path(keystoreLoc);
+ props.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
+ new File(resourcesDir + "/" + keystorePath.getName()).getAbsolutePath());
+ writeStoreToLocal(configuration, keystoreLoc, new File(resourcesDir).getAbsolutePath());
+
+ final String keystorePasswd = Utilities.getPasswdFromKeystore(credKeystore, keystorePasswdConfig);
+ props.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePasswd);
+ }
+
+ // ssl.key.password is optional for clients.
+ if(!keyPasswdConfig.isEmpty()) {
+ final String keyPasswd = Utilities.getPasswdFromKeystore(credKeystore, keyPasswdConfig);
+ props.setProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPasswd);
+ }
+ } catch (IOException | URISyntaxException e) {
+ throw new IllegalStateException("Unable to retrieve password from the credential keystore", e);
+ }
+ }
+ }
+
+ private static void writeStoreToLocal(Configuration configuration, String hdfsLoc, String localDest)
+ throws IOException, URISyntaxException {
+ if(!"hdfs".equals(new URI(hdfsLoc).getScheme())) {
+ throw new IllegalArgumentException("Kafka stores must be located in HDFS, but received: " + hdfsLoc);
+ }
+ try {
+ // Make sure the local resources directory is created
+ File localDir = new File(localDest);
+ if(!localDir.exists()) {
+ if(!localDir.mkdirs()) {
+ throw new IOException("Unable to create local directory, " + localDest);
+ }
+ }
+ URI uri = new URI(hdfsLoc);
+ FileSystem fs = FileSystem.get(new URI(hdfsLoc), configuration);
+ fs.copyToLocalFile(new Path(uri.toString()), new Path(localDest));
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to download store", e);
+ }
+ }
+
private static Map<String, String> extractExtraProperties(final Configuration configuration, String prefix) {
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
final Map<String, String> kafkaProperties = configuration.getValByRegex("^" + prefix + "\\..*");
@@ -150,6 +224,8 @@ final class KafkaUtils {
// user can always override stuff
properties.putAll(extractExtraProperties(configuration, PRODUCER_CONFIGURATION_PREFIX));
+ setupKafkaSslProperties(configuration, properties);
+
String taskId = configuration.get("mapred.task.id", null);
properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG,
taskId == null ? "random_" + UUID.randomUUID().toString() : taskId);
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
index 640b24e..a19adef 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
@@ -64,6 +64,29 @@ public class KafkaUtilsTest {
KafkaUtils.consumerProperties(configuration);
}
+ @Test public void testSetupKafkaSslPropertiesNoSslIsUnchanged() {
+ Configuration config = new Configuration();
+ Properties props = new Properties();
+ KafkaUtils.setupKafkaSslProperties(config, props);
+ Assert.assertEquals(new Properties(), props);
+ }
+
+ @Test(expected = IllegalArgumentException.class) public void testSetupKafkaSslPropertiesSslNotInHdfs() {
+ Configuration config = new Configuration();
+ config.set(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName(), "nonexistentfile");
+ config.set(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName(), "madeup");
+ Properties props = new Properties();
+ KafkaUtils.setupKafkaSslProperties(config, props);
+ }
+
+ @Test(expected = IllegalStateException.class) public void testSetupKafkaSslPropertiesCantRetrieveStore() {
+ Configuration config = new Configuration();
+ config.set(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName(), "nonexistentfile");
+ config.set(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName(), "hdfs://localhost/tmp/madeup");
+ Properties props = new Properties();
+ KafkaUtils.setupKafkaSslProperties(config, props);
+ }
+
@Test public void testMetadataEnumLookupMapper() {
int partition = 1;
long offset = 5L;
diff --git a/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out b/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out
index 75fb823..a9a3f5b 100644
--- a/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out
+++ b/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out
@@ -1201,6 +1201,12 @@ STAGE PLANS:
hive.kafka.metadata.poll.timeout.ms 30000
hive.kafka.optimistic.commit false
hive.kafka.poll.timeout.ms 5000
+ hive.kafka.ssl.credential.keystore
+ hive.kafka.ssl.key.password
+ hive.kafka.ssl.keystore.location
+ hive.kafka.ssl.keystore.password
+ hive.kafka.ssl.truststore.location
+ hive.kafka.ssl.truststore.password
kafka.bootstrap.servers localhost:9093
kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
kafka.topic wiki_kafka_avro_table
@@ -1219,6 +1225,12 @@ STAGE PLANS:
hive.kafka.metadata.poll.timeout.ms 30000
hive.kafka.optimistic.commit false
hive.kafka.poll.timeout.ms 5000
+ hive.kafka.ssl.credential.keystore
+ hive.kafka.ssl.key.password
+ hive.kafka.ssl.keystore.location
+ hive.kafka.ssl.keystore.password
+ hive.kafka.ssl.truststore.location
+ hive.kafka.ssl.truststore.password
kafka.bootstrap.servers localhost:9093
kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
kafka.topic wiki_kafka_avro_table
@@ -1297,6 +1309,12 @@ STAGE PLANS:
hive.kafka.metadata.poll.timeout.ms 30000
hive.kafka.optimistic.commit false
hive.kafka.poll.timeout.ms 5000
+ hive.kafka.ssl.credential.keystore
+ hive.kafka.ssl.key.password
+ hive.kafka.ssl.keystore.location
+ hive.kafka.ssl.keystore.password
+ hive.kafka.ssl.truststore.location
+ hive.kafka.ssl.truststore.password
kafka.bootstrap.servers localhost:9093
kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
kafka.topic wiki_kafka_avro_table
@@ -1514,6 +1532,12 @@ STAGE PLANS:
hive.kafka.metadata.poll.timeout.ms 30000
hive.kafka.optimistic.commit false
hive.kafka.poll.timeout.ms 5000
+ hive.kafka.ssl.credential.keystore
+ hive.kafka.ssl.key.password
+ hive.kafka.ssl.keystore.location
+ hive.kafka.ssl.keystore.password
+ hive.kafka.ssl.truststore.location
+ hive.kafka.ssl.truststore.password
kafka.bootstrap.servers localhost:9093
kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
kafka.topic wiki_kafka_avro_table
@@ -1532,6 +1556,12 @@ STAGE PLANS:
hive.kafka.metadata.poll.timeout.ms 30000
hive.kafka.optimistic.commit false
hive.kafka.poll.timeout.ms 5000
+ hive.kafka.ssl.credential.keystore
+ hive.kafka.ssl.key.password
+ hive.kafka.ssl.keystore.location
+ hive.kafka.ssl.keystore.password
+ hive.kafka.ssl.truststore.location
+ hive.kafka.ssl.truststore.password
kafka.bootstrap.servers localhost:9093
kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
kafka.topic wiki_kafka_avro_table
@@ -1610,6 +1640,12 @@ STAGE PLANS:
hive.kafka.metadata.poll.timeout.ms 30000
hive.kafka.optimistic.commit false
hive.kafka.poll.timeout.ms 5000
+ hive.kafka.ssl.credential.keystore
+ hive.kafka.ssl.key.password
+ hive.kafka.ssl.keystore.location
+ hive.kafka.ssl.keystore.password
+ hive.kafka.ssl.truststore.location
+ hive.kafka.ssl.truststore.password
kafka.bootstrap.servers localhost:9093
kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
kafka.topic wiki_kafka_avro_table