You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2021/08/24 08:26:06 UTC
[kudu] branch master updated: [spark] KUDU-1921 Add ability to
require authn/encryption
This is an automated email from the ASF dual-hosted git repository.
abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 7feceaa [spark] KUDU-1921 Add ability to require authn/encryption
7feceaa is described below
commit 7feceaa32fa3df5a531ad8f034ee39a9b25e17a2
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Wed Aug 18 16:31:40 2021 +0200
[spark] KUDU-1921 Add ability to require authn/encryption
Change-Id: Iba1877e13a3218f0c285ded6c0f7047c497ef6aa
Reviewed-on: http://gerrit.cloudera.org:8080/17786
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins
---
.../org/apache/kudu/client/AsyncKuduClient.java | 2 +-
.../org/apache/kudu/spark/kudu/DefaultSource.scala | 37 ++++++++++++-
.../org/apache/kudu/spark/kudu/KuduContext.scala | 18 +++++--
.../apache/kudu/spark/kudu/DefaultSourceTest.scala | 61 ++++++++++++++++++++++
4 files changed, 110 insertions(+), 8 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 390bdd3..bb71e86 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -2728,7 +2728,7 @@ public class AsyncKuduClient implements AutoCloseable {
}
}
- enum EncryptionPolicy {
+ public enum EncryptionPolicy {
// Optional, it uses encrypted connection if the server supports it,
// but it can connect to insecure servers too.
OPTIONAL,
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index b873736..f621d7d 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -17,9 +17,10 @@
package org.apache.kudu.spark.kudu
+import org.apache.kudu.client.AsyncKuduClient.EncryptionPolicy
+
import java.net.InetAddress
import java.util.Locale
-
import scala.collection.JavaConverters._
import scala.util.Try
import org.apache.spark.rdd.RDD
@@ -73,6 +74,8 @@ class DefaultSource
val USE_DRIVER_METADATA = "kudu.useDriverMetadata"
val SNAPSHOT_TIMESTAMP_MS = "kudu.snapshotTimestampMs"
val SASL_PROTOCOL_NAME = "kudu.saslProtocolName"
+ val REQUIRE_AUTHENTICATION = "kudu.requireAuthentication"
+ val ENCRYPTION_POLICY = "kudu.encryptionPolicy"
/**
* A nice alias for the data source so that when specifying the format
@@ -111,6 +114,8 @@ class DefaultSource
val kuduMaster = getMasterAddrs(parameters)
val operationType = getOperationType(parameters)
val saslProtocolName = getSaslProtocolName(parameters)
+ val requireAuthentication = getRequireAuthentication(parameters)
+ val encryptionPolicy = getEncryptionPolicy(parameters)
val schemaOption = Option(schema)
val readOptions = getReadOptions(parameters)
val writeOptions = getWriteOptions(parameters)
@@ -119,6 +124,8 @@ class DefaultSource
tableName,
kuduMaster,
saslProtocolName,
+ requireAuthentication,
+ encryptionPolicy,
operationType,
schemaOption,
readOptions,
@@ -161,6 +168,8 @@ class DefaultSource
val masterAddrs = getMasterAddrs(parameters)
val operationType = getOperationType(parameters)
val saslProtocolName = getSaslProtocolName(parameters)
+ val requireAuthentication = getRequireAuthentication(parameters)
+ val encryptionPolicy = getEncryptionPolicy(parameters)
val readOptions = getReadOptions(parameters)
val writeOptions = getWriteOptions(parameters)
@@ -168,6 +177,8 @@ class DefaultSource
tableName,
masterAddrs,
saslProtocolName,
+ requireAuthentication,
+ encryptionPolicy,
operationType,
readOptions,
writeOptions
@@ -236,6 +247,18 @@ class DefaultSource
parameters.getOrElse(SASL_PROTOCOL_NAME, "kudu")
}
+ private def getRequireAuthentication(parameters: Map[String, String]): Boolean = {
+ parameters.get(REQUIRE_AUTHENTICATION).exists(_.toBoolean)
+ }
+
+ private def getEncryptionPolicy(parameters: Map[String, String]): EncryptionPolicy = {
+ parameters.getOrElse(ENCRYPTION_POLICY, "optional").toLowerCase(Locale.ENGLISH) match {
+ case "optional" => EncryptionPolicy.OPTIONAL
+ case "required" => EncryptionPolicy.REQUIRED
+ case "required_remote" => EncryptionPolicy.REQUIRED_REMOTE
+ }
+ }
+
private def getScanLocalityType(opParam: String): ReplicaSelection = {
opParam.toLowerCase(Locale.ENGLISH) match {
case "leader_only" => ReplicaSelection.LEADER_ONLY
@@ -284,6 +307,8 @@ class KuduRelation(
val tableName: String,
val masterAddrs: String,
val saslProtocolName: String,
+ val requireAuthentication: Boolean = false,
+ val encryptionPolicy: EncryptionPolicy = EncryptionPolicy.OPTIONAL,
val operationType: OperationType,
val userSchema: Option[StructType],
val readOptions: KuduReadOptions = new KuduReadOptions,
@@ -292,7 +317,13 @@ class KuduRelation(
val log: Logger = LoggerFactory.getLogger(getClass)
private val context: KuduContext =
- new KuduContext(masterAddrs, sqlContext.sparkContext, None, Some(saslProtocolName))
+ new KuduContext(
+ masterAddrs,
+ sqlContext.sparkContext,
+ None,
+ Some(saslProtocolName),
+ requireAuthentication,
+ encryptionPolicy)
private val table: KuduTable = context.syncClient.openTable(tableName)
@@ -509,6 +540,8 @@ class KuduSink(
val tableName: String,
val masterAddrs: String,
val saslProtocolName: String,
+ val requireAuthentication: Boolean = false,
+ val encryptionPolicy: EncryptionPolicy = EncryptionPolicy.OPTIONAL,
val operationType: OperationType,
val readOptions: KuduReadOptions = new KuduReadOptions,
val writeOptions: KuduWriteOptions)(val sqlContext: SQLContext)
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index c364dc8..a5163f3 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -19,12 +19,10 @@ package org.apache.kudu.spark.kudu
import java.security.AccessController
import java.security.PrivilegedAction
-
import javax.security.auth.Subject
import javax.security.auth.login.AppConfigurationEntry
import javax.security.auth.login.Configuration
import javax.security.auth.login.LoginContext
-
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.hadoop.util.ShutdownHookManager
@@ -48,6 +46,7 @@ import org.apache.kudu.spark.kudu.SparkUtil.kuduSchema
import org.apache.kudu.spark.kudu.SparkUtil._
import org.apache.kudu.Schema
import org.apache.kudu.Type
+import org.apache.kudu.client.AsyncKuduClient.EncryptionPolicy
/**
* KuduContext is a serializable container for Kudu client connections.
@@ -63,7 +62,9 @@ class KuduContext(
val kuduMaster: String,
sc: SparkContext,
val socketReadTimeoutMs: Option[Long],
- val saslProtocolName: Option[String] = None)
+ val saslProtocolName: Option[String] = None,
+ val requireAuthentication: Boolean = false,
+ val encryptionPolicy: EncryptionPolicy = EncryptionPolicy.OPTIONAL)
extends Serializable {
val log: Logger = LoggerFactory.getLogger(getClass)
@@ -153,7 +154,8 @@ class KuduContext(
@transient lazy val syncClient: KuduClient = asyncClient.syncClient()
@transient lazy val asyncClient: AsyncKuduClient = {
- val c = KuduClientCache.getAsyncClient(kuduMaster, saslProtocolName)
+ val c = KuduClientCache
+ .getAsyncClient(kuduMaster, saslProtocolName, requireAuthentication, encryptionPolicy)
if (authnCredentials != null) {
c.importAuthenticationCredentials(authnCredentials)
}
@@ -611,13 +613,19 @@ private object KuduClientCache {
clientCache.clear()
}
- def getAsyncClient(kuduMaster: String, saslProtocolName: Option[String]): AsyncKuduClient = {
+ def getAsyncClient(
+ kuduMaster: String,
+ saslProtocolName: Option[String],
+ requireAuthentication: Boolean = false,
+ encryptionPolicy: EncryptionPolicy = EncryptionPolicy.OPTIONAL): AsyncKuduClient = {
clientCache.synchronized {
if (!clientCache.contains(kuduMaster)) {
val builder = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster)
if (saslProtocolName.nonEmpty) {
builder.saslProtocolName(saslProtocolName.get)
}
+ builder.requireAuthentication(requireAuthentication)
+ builder.encryptionPolicy(encryptionPolicy)
val asyncClient = builder.build()
val hookHandle = new Runnable {
override def run(): Unit = asyncClient.close()
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 327ba4c..9ea1818 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -36,6 +36,7 @@ import org.apache.kudu.test.RandomUtils
import org.apache.kudu.spark.kudu.SparkListenerUtil.withJobTaskCounter
import org.apache.kudu.test.KuduTestHarness.EnableKerberos
import org.apache.kudu.test.KuduTestHarness.MasterServerConfig
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
import org.junit.Before
import org.junit.Test
import org.scalatest.matchers.should.Matchers
@@ -897,4 +898,64 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
assertEquals(rowCount, df.count())
}
+
+ @Test
+ def testKuduRequireAuthenticationInsecureCluster(): Unit = {
+ KuduClientCache.clearCacheForTests()
+ kuduOptions = Map(
+ "kudu.table" -> tableName,
+ "kudu.master" -> harness.getMasterAddressesAsString,
+ "kudu.requireAuthentication" -> "true"
+ )
+ val exception = intercept[Exception] {
+ val df = sqlContext.read.options(kuduOptions).format("kudu").load
+ df.count
+ }
+ assertTrue(
+ exception.getCause.getMessage
+ .contains("client requires authentication, but server does not have Kerberos enabled"))
+ }
+
+ @Test
+ @MasterServerConfig(flags = Array("--rpc_encryption=disabled", "--rpc_authentication=disabled"))
+ @TabletServerConfig(flags = Array("--rpc_encryption=disabled", "--rpc_authentication=disabled"))
+ def testKuduRequireEncryptionInsecureCluster(): Unit = {
+ KuduClientCache.clearCacheForTests()
+ kuduOptions = Map(
+ "kudu.table" -> tableName,
+ "kudu.master" -> harness.getMasterAddressesAsString,
+ "kudu.encryptionPolicy" -> "required_remote"
+ )
+ val exception = intercept[Exception] {
+ val df = sqlContext.read.options(kuduOptions).format("kudu").load
+ df.count
+ }
+ assertTrue(
+ exception.getCause.getMessage.contains("server does not support required TLS encryption"))
+ }
+
+ @Test
+ @EnableKerberos
+ def testKuduRequireAuthenticationAndEncryptionSecureCluster(): Unit = {
+ KuduClientCache.clearCacheForTests()
+ kuduOptions = Map(
+ "kudu.table" -> tableName,
+ "kudu.master" -> harness.getMasterAddressesAsString,
+ "kudu.encryptionPolicy" -> "required",
+ "kudu.requireAuthentication" -> "true"
+ )
+
+ val df = sqlContext.read.options(kuduOptions).format("kudu").load
+ assertEquals(rowCount, df.count)
+ }
+
+ @Test
+ @MasterServerConfig(flags = Array("--rpc_encryption=disabled", "--rpc_authentication=disabled"))
+ @TabletServerConfig(flags = Array("--rpc_encryption=disabled", "--rpc_authentication=disabled"))
+ def testKuduInsecureCluster(): Unit = {
+ KuduClientCache.clearCacheForTests()
+
+ val df = sqlContext.read.options(kuduOptions).format("kudu").load
+ assertEquals(rowCount, df.count)
+ }
}