You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2021/08/24 08:26:06 UTC

[kudu] branch master updated: [spark] KUDU-1921 Add ability to require authn/encryption

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 7feceaa  [spark] KUDU-1921 Add ability to require authn/encryption
7feceaa is described below

commit 7feceaa32fa3df5a531ad8f034ee39a9b25e17a2
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Wed Aug 18 16:31:40 2021 +0200

    [spark] KUDU-1921 Add ability to require authn/encryption
    
    Change-Id: Iba1877e13a3218f0c285ded6c0f7047c497ef6aa
    Reviewed-on: http://gerrit.cloudera.org:8080/17786
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../org/apache/kudu/client/AsyncKuduClient.java    |  2 +-
 .../org/apache/kudu/spark/kudu/DefaultSource.scala | 37 ++++++++++++-
 .../org/apache/kudu/spark/kudu/KuduContext.scala   | 18 +++++--
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 61 ++++++++++++++++++++++
 4 files changed, 110 insertions(+), 8 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 390bdd3..bb71e86 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -2728,7 +2728,7 @@ public class AsyncKuduClient implements AutoCloseable {
     }
   }
 
-  enum EncryptionPolicy {
+  public enum EncryptionPolicy {
     // Optional, it uses encrypted connection if the server supports it,
     // but it can connect to insecure servers too.
     OPTIONAL,
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index b873736..f621d7d 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -17,9 +17,10 @@
 
 package org.apache.kudu.spark.kudu
 
+import org.apache.kudu.client.AsyncKuduClient.EncryptionPolicy
+
 import java.net.InetAddress
 import java.util.Locale
-
 import scala.collection.JavaConverters._
 import scala.util.Try
 import org.apache.spark.rdd.RDD
@@ -73,6 +74,8 @@ class DefaultSource
   val USE_DRIVER_METADATA = "kudu.useDriverMetadata"
   val SNAPSHOT_TIMESTAMP_MS = "kudu.snapshotTimestampMs"
   val SASL_PROTOCOL_NAME = "kudu.saslProtocolName"
+  val REQUIRE_AUTHENTICATION = "kudu.requireAuthentication"
+  val ENCRYPTION_POLICY = "kudu.encryptionPolicy"
 
   /**
    * A nice alias for the data source so that when specifying the format
@@ -111,6 +114,8 @@ class DefaultSource
     val kuduMaster = getMasterAddrs(parameters)
     val operationType = getOperationType(parameters)
     val saslProtocolName = getSaslProtocolName(parameters)
+    val requireAuthentication = getRequireAuthentication(parameters)
+    val encryptionPolicy = getEncryptionPolicy(parameters)
     val schemaOption = Option(schema)
     val readOptions = getReadOptions(parameters)
     val writeOptions = getWriteOptions(parameters)
@@ -119,6 +124,8 @@ class DefaultSource
       tableName,
       kuduMaster,
       saslProtocolName,
+      requireAuthentication,
+      encryptionPolicy,
       operationType,
       schemaOption,
       readOptions,
@@ -161,6 +168,8 @@ class DefaultSource
     val masterAddrs = getMasterAddrs(parameters)
     val operationType = getOperationType(parameters)
     val saslProtocolName = getSaslProtocolName(parameters)
+    val requireAuthentication = getRequireAuthentication(parameters)
+    val encryptionPolicy = getEncryptionPolicy(parameters)
     val readOptions = getReadOptions(parameters)
     val writeOptions = getWriteOptions(parameters)
 
@@ -168,6 +177,8 @@ class DefaultSource
       tableName,
       masterAddrs,
       saslProtocolName,
+      requireAuthentication,
+      encryptionPolicy,
       operationType,
       readOptions,
       writeOptions
@@ -236,6 +247,18 @@ class DefaultSource
     parameters.getOrElse(SASL_PROTOCOL_NAME, "kudu")
   }
 
+  private def getRequireAuthentication(parameters: Map[String, String]): Boolean = {
+    parameters.get(REQUIRE_AUTHENTICATION).exists(_.toBoolean)
+  }
+
+  private def getEncryptionPolicy(parameters: Map[String, String]): EncryptionPolicy = {
+    parameters.getOrElse(ENCRYPTION_POLICY, "optional").toLowerCase(Locale.ENGLISH) match {
+      case "optional" => EncryptionPolicy.OPTIONAL
+      case "required" => EncryptionPolicy.REQUIRED
+      case "required_remote" => EncryptionPolicy.REQUIRED_REMOTE
+    }
+  }
+
   private def getScanLocalityType(opParam: String): ReplicaSelection = {
     opParam.toLowerCase(Locale.ENGLISH) match {
       case "leader_only" => ReplicaSelection.LEADER_ONLY
@@ -284,6 +307,8 @@ class KuduRelation(
     val tableName: String,
     val masterAddrs: String,
     val saslProtocolName: String,
+    val requireAuthentication: Boolean = false,
+    val encryptionPolicy: EncryptionPolicy = EncryptionPolicy.OPTIONAL,
     val operationType: OperationType,
     val userSchema: Option[StructType],
     val readOptions: KuduReadOptions = new KuduReadOptions,
@@ -292,7 +317,13 @@ class KuduRelation(
   val log: Logger = LoggerFactory.getLogger(getClass)
 
   private val context: KuduContext =
-    new KuduContext(masterAddrs, sqlContext.sparkContext, None, Some(saslProtocolName))
+    new KuduContext(
+      masterAddrs,
+      sqlContext.sparkContext,
+      None,
+      Some(saslProtocolName),
+      requireAuthentication,
+      encryptionPolicy)
 
   private val table: KuduTable = context.syncClient.openTable(tableName)
 
@@ -509,6 +540,8 @@ class KuduSink(
     val tableName: String,
     val masterAddrs: String,
     val saslProtocolName: String,
+    val requireAuthentication: Boolean = false,
+    val encryptionPolicy: EncryptionPolicy = EncryptionPolicy.OPTIONAL,
     val operationType: OperationType,
     val readOptions: KuduReadOptions = new KuduReadOptions,
     val writeOptions: KuduWriteOptions)(val sqlContext: SQLContext)
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index c364dc8..a5163f3 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -19,12 +19,10 @@ package org.apache.kudu.spark.kudu
 
 import java.security.AccessController
 import java.security.PrivilegedAction
-
 import javax.security.auth.Subject
 import javax.security.auth.login.AppConfigurationEntry
 import javax.security.auth.login.Configuration
 import javax.security.auth.login.LoginContext
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import org.apache.hadoop.util.ShutdownHookManager
@@ -48,6 +46,7 @@ import org.apache.kudu.spark.kudu.SparkUtil.kuduSchema
 import org.apache.kudu.spark.kudu.SparkUtil._
 import org.apache.kudu.Schema
 import org.apache.kudu.Type
+import org.apache.kudu.client.AsyncKuduClient.EncryptionPolicy
 
 /**
  * KuduContext is a serializable container for Kudu client connections.
@@ -63,7 +62,9 @@ class KuduContext(
     val kuduMaster: String,
     sc: SparkContext,
     val socketReadTimeoutMs: Option[Long],
-    val saslProtocolName: Option[String] = None)
+    val saslProtocolName: Option[String] = None,
+    val requireAuthentication: Boolean = false,
+    val encryptionPolicy: EncryptionPolicy = EncryptionPolicy.OPTIONAL)
     extends Serializable {
   val log: Logger = LoggerFactory.getLogger(getClass)
 
@@ -153,7 +154,8 @@ class KuduContext(
   @transient lazy val syncClient: KuduClient = asyncClient.syncClient()
 
   @transient lazy val asyncClient: AsyncKuduClient = {
-    val c = KuduClientCache.getAsyncClient(kuduMaster, saslProtocolName)
+    val c = KuduClientCache
+      .getAsyncClient(kuduMaster, saslProtocolName, requireAuthentication, encryptionPolicy)
     if (authnCredentials != null) {
       c.importAuthenticationCredentials(authnCredentials)
     }
@@ -611,13 +613,19 @@ private object KuduClientCache {
     clientCache.clear()
   }
 
-  def getAsyncClient(kuduMaster: String, saslProtocolName: Option[String]): AsyncKuduClient = {
+  def getAsyncClient(
+      kuduMaster: String,
+      saslProtocolName: Option[String],
+      requireAuthentication: Boolean = false,
+      encryptionPolicy: EncryptionPolicy = EncryptionPolicy.OPTIONAL): AsyncKuduClient = {
     clientCache.synchronized {
       if (!clientCache.contains(kuduMaster)) {
         val builder = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster)
         if (saslProtocolName.nonEmpty) {
           builder.saslProtocolName(saslProtocolName.get)
         }
+        builder.requireAuthentication(requireAuthentication)
+        builder.encryptionPolicy(encryptionPolicy)
         val asyncClient = builder.build()
         val hookHandle = new Runnable {
           override def run(): Unit = asyncClient.close()
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 327ba4c..9ea1818 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -36,6 +36,7 @@ import org.apache.kudu.test.RandomUtils
 import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
 import org.apache.kudu.test.KuduTestHarness.EnableKerberos
 import org.apache.kudu.test.KuduTestHarness.MasterServerConfig
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
 import org.junit.Before
 import org.junit.Test
 import org.scalatest.matchers.should.Matchers
@@ -897,4 +898,64 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
     val df = sqlContext.read.options(kuduOptions).format("kudu").load
     assertEquals(rowCount, df.count())
   }
+
+  @Test
+  def testKuduRequireAuthenticationInsecureCluster(): Unit = {
+    KuduClientCache.clearCacheForTests()
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.requireAuthentication" -> "true"
+    )
+    val exception = intercept[Exception] {
+      val df = sqlContext.read.options(kuduOptions).format("kudu").load
+      df.count
+    }
+    assertTrue(
+      exception.getCause.getMessage
+        .contains("client requires authentication, but server does not have Kerberos enabled"))
+  }
+
+  @Test
+  @MasterServerConfig(flags = Array("--rpc_encryption=disabled", "--rpc_authentication=disabled"))
+  @TabletServerConfig(flags = Array("--rpc_encryption=disabled", "--rpc_authentication=disabled"))
+  def testKuduRequireEncryptionInsecureCluster(): Unit = {
+    KuduClientCache.clearCacheForTests()
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.encryptionPolicy" -> "required_remote"
+    )
+    val exception = intercept[Exception] {
+      val df = sqlContext.read.options(kuduOptions).format("kudu").load
+      df.count
+    }
+    assertTrue(
+      exception.getCause.getMessage.contains("server does not support required TLS encryption"))
+  }
+
+  @Test
+  @EnableKerberos
+  def testKuduRequireAuthenticationAndEncryptionSecureCluster(): Unit = {
+    KuduClientCache.clearCacheForTests()
+    kuduOptions = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.encryptionPolicy" -> "required",
+      "kudu.requireAuthentication" -> "true"
+    )
+
+    val df = sqlContext.read.options(kuduOptions).format("kudu").load
+    assertEquals(rowCount, df.count)
+  }
+
+  @Test
+  @MasterServerConfig(flags = Array("--rpc_encryption=disabled", "--rpc_authentication=disabled"))
+  @TabletServerConfig(flags = Array("--rpc_encryption=disabled", "--rpc_authentication=disabled"))
+  def testKuduInsecureCluster(): Unit = {
+    KuduClientCache.clearCacheForTests()
+
+    val df = sqlContext.read.options(kuduOptions).format("kudu").load
+    assertEquals(rowCount, df.count)
+  }
 }