You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/12/07 06:16:56 UTC

[GitHub] [druid] jihoonson commented on a change in pull request #11823: Add Spark connector reader support.

jihoonson commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r763665571



##########
File path: spark/src/main/scala/org/apache/druid/spark/utils/DeepStorageConstructorHelpers.scala
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.utils
+
+import com.fasterxml.jackson.databind.MapperFeature
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.druid.common.aws.{AWSClientConfig, AWSCredentialsConfig, AWSEndpointConfig,
+  AWSModule, AWSProxyConfig}
+import org.apache.druid.common.gcp.GcpModule
+import org.apache.druid.java.util.common.StringUtils
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.TryWithResources
+import org.apache.druid.storage.azure.{AzureAccountConfig, AzureDataSegmentConfig,
+  AzureInputDataConfig, AzureStorage, AzureStorageDruidModule}
+import org.apache.druid.storage.google.{GoogleAccountConfig, GoogleInputDataConfig, GoogleStorage,
+  GoogleStorageDruidModule}
+import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig
+import org.apache.druid.storage.s3.{NoopServerSideEncryption, S3DataSegmentPusherConfig,
+  S3InputDataConfig, S3SSECustomConfig, S3SSEKmsConfig, S3StorageConfig, S3StorageDruidModule,
+  ServerSideEncryptingAmazonS3, ServerSideEncryption}
+import org.apache.hadoop.conf.{Configuration => HConf}
+
+import java.io.{ByteArrayInputStream, DataInputStream}
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+
+object DeepStorageConstructorHelpers extends TryWithResources {
+  /*
+   * Spark DataSourceOption property maps are case insensitive, by which they mean they lower-case all keys. Since all
+   * our user-provided property keys will come to us via a DataSourceOption, we need to use a case-insensisitive jackson
+   * mapper to deserialize property maps into objects. We want to be case-aware in the rest of our code, so we create a
+   * private, case-insensitive copy of our mapper here.
+   */
+  private val caseInsensitiveMapper = MAPPER.copy()
+    .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+    .registerModule(DefaultScalaModule)
+
+  // Local Storage Helpers
+
+  def createLocalDataSegmentPusherConfig(conf: Configuration): LocalDataSegmentPusherConfig = {
+    convertConfToInstance(conf, classOf[LocalDataSegmentPusherConfig])
+  }
+
+  // HDFS Storage Helpers
+
+  def createHdfsDataSegmentPusherConfig(conf: Configuration): HdfsDataSegmentPusherConfig = {
+    convertConfToInstance(conf, classOf[HdfsDataSegmentPusherConfig])
+  }
+
+  def createHadoopConfiguration(conf: Configuration): HConf = {
+    val hadoopConf = new HConf()
+    val confByteStream = new ByteArrayInputStream(
+      StringUtils.decodeBase64String(conf.getString(DruidConfigurationKeys.hdfsHadoopConfKey))
+    )
+    tryWithResources(confByteStream, new DataInputStream(confByteStream)){
+      case (_, inputStream: DataInputStream) => hadoopConf.readFields(inputStream)
+    }
+    hadoopConf
+  }
+
+  // S3 Storage Helpers
+
+  /**
+    * Create an S3DataSegmentPusherConfig from the relevant properties in CONF.
+    *
+    * *** Note that we explicitly override the default for `useS3aSchema`! ***
+    * Almost all users will want to use s3a, not s3n, and we have no backwards-compatibility to maintain.
+    *
+    * @param conf The Configuration object specifying the S3DataSegmentPusherConfig to create.
+    * @return An S3DataSegmentPusherConfig derived from the properties specified in CONF.
+    */
+  def createS3DataSegmentPusherConfig(conf: Configuration): S3DataSegmentPusherConfig = {
+    if (!conf.isPresent(DruidConfigurationKeys.s3UseS3ASchemaKey)) {
+      convertConfToInstance(conf.merge(
+        Configuration.fromKeyValue(DruidConfigurationKeys.s3UseS3ASchemaKey, "true")
+      ), classOf[S3DataSegmentPusherConfig])
+    } else {
+      convertConfToInstance(conf, classOf[S3DataSegmentPusherConfig])
+    }
+  }
+
+  def createS3InputDataConfig(conf: Configuration): S3InputDataConfig = {
+    convertConfToInstance(conf, classOf[S3InputDataConfig])
+  }
+
+  def createServerSideEncryptingAmazonS3(conf: Configuration): ServerSideEncryptingAmazonS3 = {
+    val (credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig) =
+      createConfigsForServerSideEncryptingAmazonS3(conf)
+
+    val awsModule = new AWSModule
+    val s3Module = new S3StorageDruidModule
+    val credentialsProvider = awsModule.getAWSCredentialsProvider(credentialsConfig)
+    s3Module.getAmazonS3Client(
+      s3Module.getServerSideEncryptingAmazonS3Builder(
+        credentialsProvider,
+        proxyConfig,
+        endpointConfig,
+        clientConfig,
+        s3StorageConfig
+      )
+    )
+  }
+
+  def createConfigsForServerSideEncryptingAmazonS3(conf: Configuration):
+  (AWSCredentialsConfig, AWSProxyConfig, AWSEndpointConfig, AWSClientConfig, S3StorageConfig) = {
+    val credentialsConfig = convertConfToInstance(conf, classOf[AWSCredentialsConfig])
+
+    val proxyConfig = convertConfToInstance(conf.dive("proxy"), classOf[AWSProxyConfig])
+
+    val endpointConfig = convertConfToInstance(conf.dive("endpoint"), classOf[AWSEndpointConfig])
+
+    val clientConfig = convertConfToInstance(conf.dive("client"), classOf[AWSClientConfig])
+
+    val s3StorageConfig = createS3StorageConfig(conf.dive(DruidConfigurationKeys.s3ServerSideEncryptionPrefix))
+    (credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig)
+  }
+
+  /**
+    * A helper method for creating instances of S3StorageConfigs from a Configuration. While I'm sure there's a simple
+    * solution I'm missing, I would have thought that something like the following would have worked:
+    *
+    * ```
+    * val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])
+    * caseInsensitiveMapper.setInjectableValues(new InjectableValues.Std().addValue(classOf[S3SSEKmsConfig], kmsConfig))
+    * val ser = caseInsensitiveMapper.writeValueAsString(Map[String, String]("type" -> "kms"))
+    * caseInsensitiveMapper.readValue[ServerSideEncryption](ser, new TypeReference[ServerSideEncryption] {})
+    * ```
+    *
+    * However, the code above throws an com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Invalid
+    * definition for property `config` (of type `org.apache.druid.storage.s3.KmsServerSideEncryption`): Could not find
+    * creator property with name 'config' (known Creator properties: [])
+    *
+    * I _think_ that the root cause is that ServerSideEncryption is abstract, but the error message above isn't
+    * what I would expect. Nevertheless, the simple solution would be to serialize to a KmsServerSideEncryption
+    * instance and then cast to the base ServerSideEncryption to assign. Unfortunately, KmsServerSideEncryption
+    * is package-private, so we can't access the class here. Since we already have the config object and we
+    * need to muck about with field visibility, we take the shortcut and just make the constructor accessible. This
+    * solution generalizes to the CustomServerSideEncyption case as well.
+    */
+  def createS3StorageConfig(conf: Configuration): S3StorageConfig = {
+    // There's probably a more elegant way to do this that would allow us to transparently support new sse types, but
+    // this will work for now.
+    val sseType = conf.get(DruidConfigurationKeys.s3ServerSideEncryptionTypeKey)
+
+    // Getting the list of subtypes since we'll need to use it to grab references to the package-private implementations
+    val config = caseInsensitiveMapper.getDeserializationConfig
+    val ac = AnnotatedClass.constructWithoutSuperTypes(classOf[ServerSideEncryption], config)
+    val subtypes = caseInsensitiveMapper.getSubtypeResolver.collectAndResolveSubtypesByClass(config, ac)
+
+    val serverSideEncryption: ServerSideEncryption = sseType match {
+      case Some("s3") =>
+        val clazz = subtypes.asScala.filter(_.getName == "s3").head.getType
+        val constructor = clazz.getDeclaredConstructor()
+        constructor.setAccessible(true)
+        constructor.newInstance().asInstanceOf[ServerSideEncryption]
+      case Some("kms") =>
+        val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])
+        val clazz = subtypes.asScala.filter(_.getName == "kms").head.getType
+        val constructor = clazz.getDeclaredConstructor(classOf[S3SSEKmsConfig])
+        constructor.setAccessible(true)
+        constructor.newInstance(kmsConfig).asInstanceOf[ServerSideEncryption]
+      case Some("custom") =>
+        val customConfig = convertConfToInstance(conf.dive("custom"), classOf[S3SSECustomConfig])
+        val clazz = subtypes.asScala.filter(_.getName == "custom").head.getType
+        val constructor = clazz.getDeclaredConstructor(classOf[S3SSECustomConfig])
+        constructor.setAccessible(true)
+        constructor.newInstance(customConfig).asInstanceOf[ServerSideEncryption]
+      case _ => new NoopServerSideEncryption
+    }
+    new S3StorageConfig(serverSideEncryption)
+  }
+
+  // GCS Storage Helpers
+
+  def createGoogleAcountConfig(conf: Configuration): GoogleAccountConfig = {
+    convertConfToInstance(conf, classOf[GoogleAccountConfig])
+  }
+
+  def createGoogleInputDataConfig(conf: Configuration): GoogleInputDataConfig = {
+    convertConfToInstance(conf, classOf[GoogleInputDataConfig])
+  }
+
+  def createGoogleStorage(): GoogleStorage = {
+    val gcpModule = new GcpModule
+    val gcpStorageModule = new GoogleStorageDruidModule
+
+    val httpTransport = gcpModule.getHttpTransport
+    val jsonFactory = gcpModule.getJsonFactory
+    val requestInitializer = gcpModule.getHttpRequestInitializer(httpTransport, jsonFactory)
+    gcpStorageModule.getGoogleStorage(httpTransport, jsonFactory, requestInitializer)
+  }
+
+  // Azure Storage Helpers
+
+  def createAzureDataSegmentConfig(conf: Configuration): AzureDataSegmentConfig = {
+    convertConfToInstance(conf, classOf[AzureDataSegmentConfig])
+  }
+
+  def createAzureInputDataConfig(conf: Configuration): AzureInputDataConfig = {
+    convertConfToInstance(conf, classOf[AzureInputDataConfig])
+  }
+
+  def createAzureAccountConfig(conf: Configuration): AzureAccountConfig = {
+    convertConfToInstance(conf, classOf[AzureAccountConfig])
+  }
+
+  def createAzureStorage(conf: Configuration): AzureStorage = {

Review comment:
       I was mostly wondering what people should do if they want to register their custom deep storage type to the spark connector. But now looking at the code again, I think I was misunderstood the code probably. I see that users can register their own custom type in the registry. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org