You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2020/06/11 19:09:01 UTC

[hive] branch master updated: HIVE-21894: Hadoop credential password storage for the Kafka Storage handler when security is SSL (#839)

This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ead9d3  HIVE-21894: Hadoop credential password storage for the Kafka Storage handler when security is SSL (#839)
4ead9d3 is described below

commit 4ead9d35eadc997b65ceeb64f1fa33c71e47070d
Author: Justin Leet <ju...@gmail.com>
AuthorDate: Thu Jun 11 15:08:53 2020 -0400

    HIVE-21894: Hadoop credential password storage for the Kafka Storage handler when security is SSL (#839)
---
 kafka-handler/README.md                            | 71 +++++++++++++++++---
 .../hadoop/hive/kafka/KafkaTableProperties.java    | 38 ++++++++++-
 .../org/apache/hadoop/hive/kafka/KafkaUtils.java   | 78 +++++++++++++++++++++-
 .../apache/hadoop/hive/kafka/KafkaUtilsTest.java   | 23 +++++++
 .../kafka/kafka_storage_handler.q.out              | 36 ++++++++++
 5 files changed, 235 insertions(+), 11 deletions(-)

diff --git a/kafka-handler/README.md b/kafka-handler/README.md
index e7761e3..e02b5e9 100644
--- a/kafka-handler/README.md
+++ b/kafka-handler/README.md
@@ -216,15 +216,68 @@ GROUP BY
 
 ## Table Properties
 
-| Property                            | Description                                                                                                                        | Mandatory | Default                                 |
-|-------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------------------------|
-| kafka.topic                         | Kafka topic name to map the table to.                                                                                              | Yes       | null                                    |
-| kafka.bootstrap.servers             | Table property indicating Kafka broker(s) connection string.                                                                       | Yes       | null                                    |
-| kafka.serde.class                   | Serializer and Deserializer class implementation.                                                                                  | No        | org.apache.hadoop.hive.serde2.JsonSerDe |
-| hive.kafka.poll.timeout.ms          | Parameter indicating Kafka Consumer poll timeout period in millis.  FYI this is independent from internal Kafka consumer timeouts. | No        | 5000 (5 Seconds)                        |
-| hive.kafka.max.retries              | Number of retries for Kafka metadata fetch operations.                                                                             | No        | 6                                       |
-| hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata.                                                         | No        | 30000 (30 Seconds)                      |
-| kafka.write.semantic                | Writer semantics, allowed values (AT_LEAST_ONCE, EXACTLY_ONCE)                                                         | No        | AT_LEAST_ONCE                           |
+| Property                               | Description                                                                                                                        | Mandatory | Default                                 |
+|--------------------------------------- |------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------------------------|
+| kafka.topic                            | Kafka topic name to map the table to.                                                                                              | Yes       | null                                    |
+| kafka.bootstrap.servers                | Table property indicating Kafka broker(s) connection string.                                                                       | Yes       | null                                    |
+| kafka.serde.class                      | Serializer and Deserializer class implementation.                                                                                  | No        | org.apache.hadoop.hive.serde2.JsonSerDe |
+| hive.kafka.poll.timeout.ms             | Parameter indicating Kafka Consumer poll timeout period in millis.  FYI this is independent from internal Kafka consumer timeouts. | No        | 5000 (5 Seconds)                        |
+| hive.kafka.max.retries                 | Number of retries for Kafka metadata fetch operations.                                                                             | No        | 6                                       |
+| hive.kafka.metadata.poll.timeout.ms    | Number of milliseconds before consumer timeout on fetching Kafka metadata.                                                         | No        | 30000 (30 Seconds)                      |
+| kafka.write.semantic                   | Writer semantics, allowed values (AT_LEAST_ONCE, EXACTLY_ONCE)                                                                     | No        | AT_LEAST_ONCE                           |
+| hive.kafka.ssl.credential.keystore     | Location of credential store that holds SSL credentials. Used to avoid plaintext passwords in table properties                     | No        |                                         |
+| hive.kafka.ssl.truststore.password     | The key in the credential store used to retrieve the truststore password. This is NOT the password itself.                         | No        |                                         |
+| hive.kafka.ssl.keystore.password       | The key in the credential store used to retrieve the keystore password. This is NOT the password itself. Used for 2-way auth.      | No        |                                         |
+| hive.kafka.ssl.key.password            | The key in the credential store used to retrieve the key password. This is NOT the password itself.                                | No        |                                         |
+| hive.kafka.ssl.truststore.location     | The location of the SSL truststore. Requires HDFS for queries that require jobs. Kafka requires this to be local, so pull it down. | No        |                                         |
+| hive.kafka.ssl.keystore.location       | The location of the SSL keystore. Requires HDFS for queries that require jobs. Kafka requires this to be local, so pull it down.   | No        |                                         |
+
+### SSL
+The user can create SSL connections to Kafka, via the properties described in the table properties.
+These properties are used to retrieve passwords from a credential store to avoid being in plaintext table properties.
+To ensure security, the credential store should have appropriate permissions applied. Clients that query the table without
+being able to read the credentials store will have the query fail.
+
+Normally, the `<consumer/producer>.ssl.truststore.location` and `<consumer/producer>.ssl.keystore.location` would have to be local. Any job that requires a
+job can retrieve these from an HDFS location, and they will be sourced from HDFS and pulled locally for this purpose.
+
+The producer and consumer stores are both sourced from the same property, e.g. `hive.kafka.ssl.truststore.location`, rather than `kafka.consumer.ssl.truststore.location`.
+
+#### SSL Example
+Table creation is very simple, simply create a table as normal, and supply the appropriate Kafka
+configs (e.g. `kafka.consumer.security.protocol`), along with the credential store configs (e.g. `hive.kafka.ssl.credential.store`).
+
+```
+CREATE EXTERNAL TABLE 
+  kafka_ssl (
+    `data` STRING
+)
+STORED BY 
+  'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES ( 
+  "kafka.topic" = "test-topic",
+  "kafka.bootstrap.servers" = 'localhost:9093',
+   'hive.kafka.ssl.credential.keystore'='jceks://hdfs/tmp/test.jceks',
+   'hive.kafka.ssl.keystore.password'='keystore.password',
+   'hive.kafka.ssl.truststore.password'='truststore.password',
+   'kafka.consumer.security.protocol'='SSL',
+   'hive.kafka.ssl.keystore.location'='hdfs://cluster/tmp/keystore.jks',
+   'hive.kafka.ssl.truststore.location'='hdfs://cluster/tmp/keystore.jks'
+);
+```
+
+Now we can query the table as normal.
+```
+SELECT * FROM kafka_ssl LIMIT 10;
+```
+
+Our truststore and keystore are located in HDFS, which means we can also run more complex queries that result in jobs.
+These will still connect from Kafka as expected.
+```
+SELECT `data` FROM kafka_ssl where `__offset` > 0 AND `__offset` < 10000 group by `data`;
+
+```
+
 
 
 ### Setting Extra Consumer/Producer properties.
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
index a4ad01a..3e308e6 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
@@ -53,7 +53,43 @@ enum KafkaTableProperties {
   /**
    * Table property that indicates if we should commit within the task or delay it to the Metadata Hook Commit call.
    */
-  HIVE_KAFKA_OPTIMISTIC_COMMIT("hive.kafka.optimistic.commit", "false");
+  HIVE_KAFKA_OPTIMISTIC_COMMIT("hive.kafka.optimistic.commit", "false"),
+
+  /**
+   * Table property indicating the location of the credential store containing passwords that would otherwise be
+   * exposed in Kafka's SSL parameters.
+   */
+  HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE("hive.kafka.ssl.credential.keystore", ""),
+
+  /**
+   * Table property indicating the key in the credential keystore for the truststore password. This is NOT
+   * the actual password.
+   */
+  HIVE_KAFKA_SSL_TRUSTSTORE_PASSWORD("hive.kafka.ssl.truststore.password", ""),
+
+  /**
+   * Table property indicating the key in the credential keystore for the keystore password. This is NOT
+   * the actual password. Only needed for two way authentication.
+   */
+  HIVE_KAFKA_SSL_KEYSTORE_PASSWORD("hive.kafka.ssl.keystore.password", ""),
+
+  /**
+   * Table property indicating the key in the credential keystore for the key password. This is NOT
+   * the actual password. Only needed for two way authentication.
+   */
+  HIVE_KAFKA_SSL_KEY_PASSWORD("hive.kafka.ssl.key.password", ""),
+
+  /**
+   * Table property indicating the location of the SSL truststore. Kafka cannot normally use an HDFS-based location,
+   * but we'll pull it down locally for each consumer/producer.
+   */
+  HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG("hive.kafka.ssl.truststore.location", ""),
+
+  /**
+   * Table property indicating the location of the SSL keystore. Kafka cannot normally use an HDFS-based location,
+   * but we'll pull it down locally for each consumer/producer.
+   */
+  HIVE_SSL_KEYSTORE_LOCATION_CONFIG("hive.kafka.ssl.keystore.location", "");
 
   /**
    * Kafka storage handler table properties constructor.
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
index 81252c5..be7c28c 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
@@ -35,6 +35,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.InvalidTopicException;
@@ -46,7 +47,10 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
@@ -115,11 +119,81 @@ final class KafkaUtils {
     if (UserGroupInformation.isSecurityEnabled()) {
       addKerberosJaasConf(configuration, props);
     }
-    // user can always override stuff
+
+    // user can always override stuff, but SSL properties are derived from configuration, because they require local
+    //   files. These need to modified afterwards. This works because these properties use the standard consumer prefix.
     props.putAll(extractExtraProperties(configuration, CONSUMER_CONFIGURATION_PREFIX));
+    setupKafkaSslProperties(configuration, props);
+
     return props;
   }
 
+  static void setupKafkaSslProperties(Configuration configuration, Properties props) {
+    // Setup SSL via credentials keystore if necessary
+    final String credKeystore = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName());
+    if (!(credKeystore == null) && !credKeystore.isEmpty()) {
+      final String truststorePasswdConfig =
+          configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_TRUSTSTORE_PASSWORD.getName());
+      final String keystorePasswdConfig =
+          configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_KEYSTORE_PASSWORD.getName());
+      final String keyPasswdConfig = configuration.get(KafkaTableProperties.HIVE_KAFKA_SSL_KEY_PASSWORD.getName());
+
+      String resourcesDir = HiveConf.getVar(configuration, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR);
+      try {
+        String truststoreLoc = configuration.get(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName());
+        Path truststorePath = new Path(truststoreLoc);
+        props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
+            new File(resourcesDir + "/" + truststorePath.getName()).getAbsolutePath());
+        writeStoreToLocal(configuration, truststoreLoc, new File(resourcesDir).getAbsolutePath());
+
+        final String truststorePasswd = Utilities.getPasswdFromKeystore(credKeystore, truststorePasswdConfig);
+        props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePasswd);
+
+        // ssl.keystore.password is only needed if two-way authentication is configured.
+        if(!keystorePasswdConfig.isEmpty()) {
+          log.info("Kafka keystore configured, configuring local keystore");
+          String keystoreLoc = configuration.get(KafkaTableProperties.HIVE_SSL_KEYSTORE_LOCATION_CONFIG.getName());
+          Path keystorePath = new Path(keystoreLoc);
+          props.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
+              new File(resourcesDir + "/" + keystorePath.getName()).getAbsolutePath());
+          writeStoreToLocal(configuration, keystoreLoc, new File(resourcesDir).getAbsolutePath());
+
+          final String keystorePasswd = Utilities.getPasswdFromKeystore(credKeystore, keystorePasswdConfig);
+          props.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePasswd);
+        }
+
+        // ssl.key.password is optional for clients.
+        if(!keyPasswdConfig.isEmpty()) {
+          final String keyPasswd = Utilities.getPasswdFromKeystore(credKeystore, keyPasswdConfig);
+          props.setProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPasswd);
+        }
+      } catch (IOException | URISyntaxException e) {
+        throw new IllegalStateException("Unable to retrieve password from the credential keystore", e);
+      }
+    }
+  }
+
+  private static void writeStoreToLocal(Configuration configuration, String hdfsLoc, String localDest)
+      throws IOException, URISyntaxException {
+    if(!"hdfs".equals(new URI(hdfsLoc).getScheme())) {
+      throw new IllegalArgumentException("Kafka stores must be located in HDFS, but received: " + hdfsLoc);
+    }
+    try {
+      // Make sure the local resources directory is created
+      File localDir = new File(localDest);
+      if(!localDir.exists()) {
+        if(!localDir.mkdirs()) {
+          throw new IOException("Unable to create local directory, " + localDest);
+        }
+      }
+      URI uri = new URI(hdfsLoc);
+      FileSystem fs = FileSystem.get(new URI(hdfsLoc), configuration);
+      fs.copyToLocalFile(new Path(uri.toString()), new Path(localDest));
+    } catch (URISyntaxException e) {
+      throw new IOException("Unable to download store", e);
+    }
+  }
+
   private static Map<String, String> extractExtraProperties(final Configuration configuration, String prefix) {
     ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
     final Map<String, String> kafkaProperties = configuration.getValByRegex("^" + prefix + "\\..*");
@@ -150,6 +224,8 @@ final class KafkaUtils {
 
     // user can always override stuff
     properties.putAll(extractExtraProperties(configuration, PRODUCER_CONFIGURATION_PREFIX));
+    setupKafkaSslProperties(configuration, properties);
+
     String taskId = configuration.get("mapred.task.id", null);
     properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG,
         taskId == null ? "random_" + UUID.randomUUID().toString() : taskId);
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
index 640b24e..a19adef 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
@@ -64,6 +64,29 @@ public class KafkaUtilsTest {
     KafkaUtils.consumerProperties(configuration);
   }
 
+  @Test public void testSetupKafkaSslPropertiesNoSslIsUnchanged() {
+    Configuration config = new Configuration();
+    Properties props = new Properties();
+    KafkaUtils.setupKafkaSslProperties(config, props);
+    Assert.assertEquals(new Properties(), props);
+  }
+
+  @Test(expected = IllegalArgumentException.class) public void testSetupKafkaSslPropertiesSslNotInHdfs() {
+    Configuration config = new Configuration();
+    config.set(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName(), "nonexistentfile");
+    config.set(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName(), "madeup");
+    Properties props = new Properties();
+    KafkaUtils.setupKafkaSslProperties(config, props);
+  }
+
+  @Test(expected = IllegalStateException.class) public void testSetupKafkaSslPropertiesCantRetrieveStore() {
+    Configuration config = new Configuration();
+    config.set(KafkaTableProperties.HIVE_KAFKA_SSL_CREDENTIAL_KEYSTORE.getName(), "nonexistentfile");
+    config.set(KafkaTableProperties.HIVE_SSL_TRUSTSTORE_LOCATION_CONFIG.getName(), "hdfs://localhost/tmp/madeup");
+    Properties props = new Properties();
+    KafkaUtils.setupKafkaSslProperties(config, props);
+  }
+
   @Test public void testMetadataEnumLookupMapper() {
     int partition = 1;
     long offset = 5L;
diff --git a/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out b/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out
index 75fb823..a9a3f5b 100644
--- a/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out
+++ b/ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out
@@ -1201,6 +1201,12 @@ STAGE PLANS:
                     hive.kafka.metadata.poll.timeout.ms 30000
                     hive.kafka.optimistic.commit false
                     hive.kafka.poll.timeout.ms 5000
+                    hive.kafka.ssl.credential.keystore 
+                    hive.kafka.ssl.key.password 
+                    hive.kafka.ssl.keystore.location 
+                    hive.kafka.ssl.keystore.password 
+                    hive.kafka.ssl.truststore.location 
+                    hive.kafka.ssl.truststore.password 
                     kafka.bootstrap.servers localhost:9093
                     kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
                     kafka.topic wiki_kafka_avro_table
@@ -1219,6 +1225,12 @@ STAGE PLANS:
                       hive.kafka.metadata.poll.timeout.ms 30000
                       hive.kafka.optimistic.commit false
                       hive.kafka.poll.timeout.ms 5000
+                      hive.kafka.ssl.credential.keystore 
+                      hive.kafka.ssl.key.password 
+                      hive.kafka.ssl.keystore.location 
+                      hive.kafka.ssl.keystore.password 
+                      hive.kafka.ssl.truststore.location 
+                      hive.kafka.ssl.truststore.password 
                       kafka.bootstrap.servers localhost:9093
                       kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
                       kafka.topic wiki_kafka_avro_table
@@ -1297,6 +1309,12 @@ STAGE PLANS:
                       hive.kafka.metadata.poll.timeout.ms 30000
                       hive.kafka.optimistic.commit false
                       hive.kafka.poll.timeout.ms 5000
+                      hive.kafka.ssl.credential.keystore 
+                      hive.kafka.ssl.key.password 
+                      hive.kafka.ssl.keystore.location 
+                      hive.kafka.ssl.keystore.password 
+                      hive.kafka.ssl.truststore.location 
+                      hive.kafka.ssl.truststore.password 
                       kafka.bootstrap.servers localhost:9093
                       kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
                       kafka.topic wiki_kafka_avro_table
@@ -1514,6 +1532,12 @@ STAGE PLANS:
                     hive.kafka.metadata.poll.timeout.ms 30000
                     hive.kafka.optimistic.commit false
                     hive.kafka.poll.timeout.ms 5000
+                    hive.kafka.ssl.credential.keystore 
+                    hive.kafka.ssl.key.password 
+                    hive.kafka.ssl.keystore.location 
+                    hive.kafka.ssl.keystore.password 
+                    hive.kafka.ssl.truststore.location 
+                    hive.kafka.ssl.truststore.password 
                     kafka.bootstrap.servers localhost:9093
                     kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
                     kafka.topic wiki_kafka_avro_table
@@ -1532,6 +1556,12 @@ STAGE PLANS:
                       hive.kafka.metadata.poll.timeout.ms 30000
                       hive.kafka.optimistic.commit false
                       hive.kafka.poll.timeout.ms 5000
+                      hive.kafka.ssl.credential.keystore 
+                      hive.kafka.ssl.key.password 
+                      hive.kafka.ssl.keystore.location 
+                      hive.kafka.ssl.keystore.password 
+                      hive.kafka.ssl.truststore.location 
+                      hive.kafka.ssl.truststore.password 
                       kafka.bootstrap.servers localhost:9093
                       kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
                       kafka.topic wiki_kafka_avro_table
@@ -1610,6 +1640,12 @@ STAGE PLANS:
                       hive.kafka.metadata.poll.timeout.ms 30000
                       hive.kafka.optimistic.commit false
                       hive.kafka.poll.timeout.ms 5000
+                      hive.kafka.ssl.credential.keystore 
+                      hive.kafka.ssl.key.password 
+                      hive.kafka.ssl.keystore.location 
+                      hive.kafka.ssl.keystore.password 
+                      hive.kafka.ssl.truststore.location 
+                      hive.kafka.ssl.truststore.password 
                       kafka.bootstrap.servers localhost:9093
                       kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe
                       kafka.topic wiki_kafka_avro_table