You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/12/10 09:39:26 UTC

[GitHub] [druid] JulianJaffePinterest opened a new pull request #11823: Add Spark connector reader support.

JulianJaffePinterest opened a new pull request #11823:
URL: https://github.com/apache/druid/pull/11823


   Add code, utilities, tests, and documentation for reading data from Druid using Spark.
   
   (See #10920  and #11474 for context)
   
   This PR splits out the reader logic from #10920 into a standalone PR as discussed. This PR also reworks how the connector handles segment loading to allow for more extensibility.
   
   ### Description
   
   
   At a high level, this connector reads data from Druid into Spark by querying the Druid metadata SQL server for the segment load specs for the specified data source and time period and then fetching and loading those segments from deep storage. On the Spark side, querying the Druid cluster and determining which segments to load is done on the driver side, while the actual fetching and loading of segment files is done on the executors.
   
   A more granular walk-through follows:
   
   #### Determining which segments to load
   
   When a user calls `.load()` to read in a Druid data source into a Spark DataFrame, the first step for the connector is to determine which segments need to be read. This is done by querying a Druid cluster's backing metadata database for the specified data source and time range via the `DruidMetadataClient` (we need to query the backing SQL database directly because segment load specs are pruned when served from the broker). Additionally, if the user does not provide a schema for the data source, we need to construct the correct schema ourselves. We do this via SegmentMetadata queries sent to a Druid broker.
   
   ##### Key Classes
   `DruidDataSourceReader`
   `DruidMetadataClient`
   `DruidClient` (already reviewed in #11474)
   
   #### Distributing the segments
   
   Once we've determined which Druid segments we need to load, we need to assign the segments to Spark partitions that will actually do the reading. For now, we simply assign each Druid segment file to a Spark partition, although in the future this could be extended to do smarter things (for instance, we could map multiple Druid segments to a single Spark partition and thus allow the user to specify how many Spark partitions they wanted their DataFrame to have, regardless of the number of underlying Druid segment files).
   
   ##### Key Classes
   `DruidDataSourceReader` (the `PlanInputPartition` methods)
   `DruidInputPartition`
   
   #### Reading the data
   
   Once we've assigned a Druid segment file to a Spark partition, we need to actually fetch the segment file from deep storage and read its data. This is handled by an `InputPartitionReader` (`DruidInputPartitionReader` for row-based reads; `DruidColumnarInputPartitionReader` for vectorized reads). Using either default or custom, user-provided logic the input partition reader pulls the segment file from deep storage and opens it as a Druid QueryableIndex. The reader then applies any pushed-down filters, projects the specified columns (if the user provided a schema explicitly), and fills Spark rows or vectors with the segment's data.
   
   A key piece here (and an enhancement from #10920) is the logic used to pull the segment file from deep storage. There are two supported approaches: by default, the partition reader will attempt to deserialize the load spec for it assigned segment and delegate to a Druid DataSegmentPuller to handle fetching. This requires a user to configure the reader with all necessary properties to connect and authenticate to their deep storage, but all Druid "core" deep storages (local, HDFS, S3, GCS, and Azure) are supported. Alternatively, users can defer to their Spark application's configuration. In this scenario, the reader extracts a URI from it's assigned segment's load spec and then constructs a FileSystem from the Spark application's configuration. The reader uses the file system to pull the extracted URI, meaning that users are not responsible for handling connection and authentication to deep storage. This second case is useful for users running on clusters that rely on GCS ADCs or A
 WS IAM roles for machine authorization to GCS/S3, or for clusters that manage access keys for their users. Only local, HDFS, S3, and GCS deep storage implementation are supported out of the box for this approach (Azure users will need to use the first approach or register a custom load function via the `SegmentReaderRegistry`).
   
   ##### Key Classes
   `DruidBaseInputPartitionReader`
   `DruidColumnarInputPartitionReader`
   `DruidInputPartitionReader`
   `SegmentReaderRegistry`
   
   #### User interface
   
   Users use the connector like any other Spark reader: they call `.read` on their Spark session, set the format (in this case, `"druid"`), specify the properties to use, and then call `load()`. For example:
   
   ```scala
   sparkSession
     .read
     .format("druid")
     .options(Map[String, String](
       "metadata.dbType" -> "mysql",
       "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
       "metadata.user" -> "druid",
       "metadata.password" -> "diurd",
       "broker.host" -> "localhost",
       "broker.port" -> 8082,
       "table" -> "dataSource",
       "reader.deepStorageType" -> "local",
       "local.storageDirectory" -> "/mnt/druid/druid-segments/"
   ))
     .load()
   ```
   
   For convenience, a more strongly typed way to apply configure the reader is also provided:
   
   ```
   import org.apache.druid.spark.DruidDataFrameReader
   
   val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
   
   sparkSession
     .read
     .brokerHost("localhost")
     .brokerPort(8082)
     .metadataDbType("mysql")
     .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
     .metadataUser("druid")
     .metadataPassword("diurd")
     .dataSource("dataSource")
     .deepStorage(deepStorageConfig)
     .druid()
   ```
   
   User documentation with examples and configuration option descriptions is provided in [spark.md](docs/operations/spark.md).
   
   Additionally, because the connector does not run in a Druid cluster, we can't use Druid's dependency injection to transparently handle custom extensions and behavior. In order to support users who use unsupported or custom complex metrics, deep storage implementations, or metadata databases, the connector uses a plugin-based architecture. All Druid core extensions are supported out of the box. If users need to use their own custom logic, they can register the appropriate functions with the corresponding registry (`ComplexMetricRegistry` for complex metrics, `SQLConnectorRegistry` for metadata databases, and `SegmentReaderRegistry` for loading segments from deep storage).
   
   ##### Key Classes
   `ComplexMetricRegistry`
   `SQLConnectorRegistry`
   `SegmentReaderRegistry`
   `DruidConfigurationKeys`
   `spark.md`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [x] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [x] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [x] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-952640178


   Ah I see #11814. Let me see if the solution to that issue also solves this issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r749914212



##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReader.scala
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import org.apache.druid.java.util.common.{Intervals, JodaUtils}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.clients.{DruidClient, DruidMetadataClient}
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.utils.{FilterUtils, SchemaUtils}
+import org.apache.druid.timeline.DataSegment
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition,
+  SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.joda.time.Interval
+
+import java.util.{List => JList}
+import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}
+
+/**
+  * A DruidDataSourceReader handles the actual work of reading data from Druid. It does this by querying to determine
+  * where Druid segments live in deep storage and then reading those segments into memory in order to avoid straining
+  * the Druid cluster. In general, users should not directly instantiate instances of this class but instead use
+  * sparkSession.read.format("druid").options(Map(...)).load(). If the schema of the data in Druid is known, overhead
+  * can be further reduced by providing it directly (e.g. sparkSession.read.format("druid").schema(schema).options...)
+  *
+  * To aid comprehensibility, some idiomatic Scala has been somewhat java-fied.
+  */
+class DruidDataSourceReader(
+                             var schema: Option[StructType] = None,
+                             conf: Configuration
+                           ) extends DataSourceReader
+  with SupportsPushDownRequiredColumns with SupportsPushDownFilters with SupportsScanColumnarBatch with Logging {
+  private lazy val metadataClient =
+    DruidDataSourceReader.createDruidMetaDataClient(conf)
+  private lazy val druidClient = DruidDataSourceReader.createDruidClient(conf)
+
+  private var filters: Array[Filter] = Array.empty
+  private var druidColumnTypes: Option[Set[String]] = Option.empty
+
+  override def readSchema(): StructType = {
+    if (schema.isDefined) {
+      schema.get
+    } else {
+      require(conf.isPresent(DruidConfigurationKeys.tableKey),
+        s"Must set ${DruidConfigurationKeys.tableKey}!")
+      // TODO: Optionally accept a granularity so that if lowerBound to upperBound spans more than
+      //  twice the granularity duration, we can send a list with two disjoint intervals and
+      //  minimize the load on the broker from having to merge large numbers of segments
+      val (lowerBound, upperBound) = FilterUtils.getTimeFilterBounds(filters)
+      val columnMap = druidClient.getSchema(
+        conf.getString(DruidConfigurationKeys.tableKey),
+        Some(List[Interval](Intervals.utc(
+          lowerBound.getOrElse(JodaUtils.MIN_INSTANT),
+          upperBound.getOrElse(JodaUtils.MAX_INSTANT)
+        )))
+      )
+      schema = Option(SchemaUtils.convertDruidSchemaToSparkSchema(columnMap))
+      druidColumnTypes = Option(columnMap.map(_._2._1).toSet)
+      schema.get
+    }
+  }
+
+  override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
+    // For now, one partition for each Druid segment partition
+    // Future improvements can use information from SegmentAnalyzer results to do smart things
+    if (schema.isEmpty) {
+      readSchema()
+    }
+    val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)
+    val filter = FilterUtils.mapFilters(filters, schema.get)
+    val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)
+    val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)
+    val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
+
+    // Allow passing hard-coded list of segments to load
+    if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {
+      val segments: JList[DataSegment] = MAPPER.readValue(
+        readerConf.getString(DruidConfigurationKeys.segmentsKey),
+        new TypeReference[JList[DataSegment]]() {}
+      )
+      segments.asScala
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    } else {
+      getSegments
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    }
+  }
+
+  override def pruneColumns(structType: StructType): Unit = {
+    schema = Option(structType)
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    readSchema()
+    filters.partition(FilterUtils.isSupportedFilter(_, schema.get)) match {
+      case (supported, unsupported) =>
+        this.filters = supported
+        unsupported
+    }
+  }
+
+  override def pushedFilters(): Array[Filter] = filters
+
+  private[v2] def getSegments: Seq[DataSegment] = {
+    require(conf.isPresent(DruidConfigurationKeys.tableKey),
+      s"Must set ${DruidConfigurationKeys.tableKey}!")
+
+    // Check filters for any bounds on __time
+    // Otherwise, we'd need to full scan the segments table
+    val (lowerTimeBound, upperTimeBound) = FilterUtils.getTimeFilterBounds(filters)
+
+    metadataClient.getSegmentPayloads(
+      conf.getString(DruidConfigurationKeys.tableKey),
+      lowerTimeBound,
+      upperTimeBound,
+      conf.getBoolean(DruidConfigurationKeys.allowIncompletePartitionsDefaultKey)
+    )
+  }
+
+  override def planBatchInputPartitions(): JList[InputPartition[ColumnarBatch]] = {
+    if (schema.isEmpty) {
+      readSchema()
+    }
+    val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)
+    val filter = FilterUtils.mapFilters(filters, schema.get)
+    val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)
+    val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)
+    val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
+    val batchSize = readerConf.getInt(DruidConfigurationKeys.batchSizeDefaultKey)
+
+    // Allow passing hard-coded list of segments to load
+    if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {
+      val segments: JList[DataSegment] = MAPPER.readValue(
+        readerConf.getString(DruidConfigurationKeys.segmentsKey),
+        new TypeReference[JList[DataSegment]]() {}
+      )
+      segments.asScala
+        .map(segment =>
+          new DruidColumnarInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling,
+            batchSize
+          ): InputPartition[ColumnarBatch]
+        ).asJava
+    } else {
+      getSegments
+        .map(segment =>
+          new DruidColumnarInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling,
+            batchSize
+          ): InputPartition[ColumnarBatch]
+        ).asJava
+    }
+  }
+
+  override def enableBatchRead(): Boolean = {
+    // Fail fast
+    if (!conf.dive(DruidConfigurationKeys.readerPrefix).getBoolean(DruidConfigurationKeys.vectorizeDefaultKey)) {
+      false
+    } else {
+      if (schema.isEmpty) {
+        readSchema()
+      }
+      val filterOpt = FilterUtils.mapFilters(filters, schema.get)
+      filterOpt.fold(true) { filter =>
+        val rowSignature = SchemaUtils.generateRowSignatureFromSparkSchema(schema.get)
+        val canVectorize = filter.toOptimizedFilter.canVectorizeMatcher(rowSignature)

Review comment:
       We are looking at the schema to decide the value of `canVectorize` (we generate the Druid `RowSignature` and pass it to the `canVectorizeMatcher` call). Are you saying we should be more conservative and disable vectorization in additional cases?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r755721373



##########
File path: docs/operations/spark.md
##########
@@ -0,0 +1,279 @@
+---
+id: spark
+title: "Apache Spark Reader and Writer"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Apache Spark Reader and Writer for Druid
+
+## Reader
+The reader reads Druid segments from deep storage into Spark. It locates the segments to read and determines their
+schema if not provided by querying the brokers for the relevant metadata but otherwise does not interact with a running
+Druid cluster.
+
+Sample Code:
+```scala
+import org.apache.druid.spark.DruidDataFrameReader
+
+val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
+
+sparkSession
+  .read
+  .brokerHost("localhost")
+  .brokerPort(8082)
+  .metadataDbType("mysql")
+  .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
+  .metadataUser("druid")
+  .metadataPassword("diurd")
+  .dataSource("dataSource")
+  .deepStorage(deepStorageConfig)
+  .druid()
+```
+
+Alternatively, the reader can be configured via a properties map with no additional import needed:
+```scala
+val properties = Map[String, String](
+  "metadata.dbType" -> "mysql",
+  "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
+  "metadata.user" -> "druid",
+  "metadata.password" -> "diurd",
+  "broker.host" -> "localhost",
+  "broker.port" -> 8082,
+  "table" -> "dataSource",
+  "reader.deepStorageType" -> "local",
+  "local.storageDirectory" -> "/mnt/druid/druid-segments/"
+)
+
+sparkSession
+  .read
+  .format("druid")
+  .options(properties)
+  .load()
+```
+
+If you know the schema of the Druid data source you're reading from, you can save needing to determine the schema via
+calls to the broker with
+```scala
+sparkSession
+  .read
+  .format("druid")
+  .schema(schema)
+  .options(properties)
+  .load()
+```
+
+Filters should be applied to the read-in data frame before any [Spark actions](http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.Dataset)
+are triggered, to allow predicates to be pushed down to the reader and avoid full scans of the underlying Druid data.
+
+## Plugin Registries and Druid Extension Support
+One of Druid's strengths is its extensibility. Since these Spark readers and writers will not execute on a Druid cluster
+and won't have the ability to dynamically load classes or integrate with Druid's Guice injectors, Druid extensions can't
+be used directly. Instead, these connectors use a plugin registry architecture, including default plugins that support
+most functionality in `extensions-core`. Custom plugins consisting of a string name and one or more serializable
+generator functions must be registered before the first Spark action which would depend on them is called.
+
+### ComplexMetricRegistry
+The `ComplexMetricRegistry` provides support for serializing and deserializing complex metric types between Spark and
+Druid. Support for complex metric types in Druid core extensions is provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for additional complex metric types can
+use the `ComplexMetricRegistry.register` functions to associate serde functions with a given complex metric type. The
+name used to register custom behavior must match the complex metric type name reported by Druid.
+**Note that custom plugins must be registered with both the executors and the Spark driver.**
+
+### SegmentReaderRegistry
+The `SegmentReaderRegistry` provides support for reading segments from deep storage. Local, HDFS, GCS, S3, and Azure
+Storage deep storage implementations are supported by default.
+
+Users wishing to override the default behavior or who need to add support for additional deep storage implementations
+can use either `SegmentReaderRegistry.registerInitializer` (to provide any necessary Jackson configuration for
+deserializing a `LoadSpec` object from a segment load spec) or `SegmentReaderRegistry.registerLoadFunction` (to register
+a function for creating a URI from a segment load spec). These two functions correspond to the first and second approach
+[outlined below](#deep-storage). **Note that custom plugins must be registered on the executors, not the Spark driver.**
+
+### SQLConnectorRegistry
+The `SQLConnectorRegistry` provides support for configuring connectors to Druid metadata databases. Support for MySQL,
+PostgreSQL, and Derby databases are provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for additional metadata database
+implementations can use the `SQLConnectorRegistry.register` function. Custom connectors should be registered on the
+driver.
+
+## Deploying to a Spark cluster
+This extension can be run on a Spark cluster in one of two ways: bundled as part of an application jar or uploaded as
+a library jar to a Spark cluster and included in the classpath provided to Spark applications by the application
+manager. If the second approach is used, this extension should be built with
+`mvn clean package -pl spark` and the resulting jar `druid-spark-<VERSION>.jar`
+uploaded to the Spark cluster. Application jars should then be built with a compile-time dependency on
+`org.apache.druid:druid-spark` (e.g. marked as `provided` in Maven or with `compileOnly` in Gradle).
+
+## Configuration Reference
+
+### Metadata Client Configs
+The properties used to configure the client that interacts with the Druid metadata server directly. Used by both reader
+and the writer. The `metadataPassword` property can either be provided as a string that will be used as-is or can be
+provided as a serialized DynamicConfigProvider that will be resolved when the metadata client is first instantiated. If
+a  custom DynamicConfigProvider is used, be sure to register the provider with the DynamicConfigProviderRegistry before use.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`metadata.dbType`|The metadata server's database type (e.g. `mysql`)|Yes||
+|`metadata.host`|The metadata server's host name|If using derby|`localhost`|
+|`metadata.port`|The metadata server's port|If using derby|1527|
+|`metadata.connectUri`|The URI to use to connect to the metadata server|If not using derby||
+|`metadata.user`|The user to use when connecting to the metadata server|If required by the metadata database||
+|`metadata.password`|The password to use when connecting to the metadata server. This can optionally be a serialized instance of a Druid DynamicConfigProvider or a plain string|If required by the metadata database||
+|`metadata.dbcpProperties`|The connection pooling properties to use when connecting to the metadata server|No||
+|`metadata.baseName`|The base name used when creating Druid metadata tables|No|`druid`|
+
+### Druid Client Configs
+The configuration properties used to query the Druid cluster for segment metadata. Only used in the reader.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`broker.host`|The hostname of a broker in the Druid cluster to read from|No|`localhost`|
+|`broker.port`|The port of the broker in the Druid cluster to read from|No|8082|
+|`broker.numRetries`|The number of times to retry a timed-out segment metadata request|No|5|
+|`broker.retryWaitSeconds`|How long (in seconds) to wait before retrying a timed-out segment metadata request|No|5|
+|`broker.timeoutMilliseconds`|How long (in milliseconds) to wait before timing out a segment metadata request|No|300000|
+
+### Reader Configs
+The properties used to configure the DataSourceReader when reading data from Druid in Spark.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`table`|The Druid data source to read from|Yes||
+|`reader.deepStorageType`|The type of deep storage used to back the target Druid cluster|No|`local`|
+|`reader.segments`|A hard-coded list of Druid segments to read. If set, the table and druid client configurations are ignored and the specified segments are read directly. Must be deserializable into Druid DataSegment instances|No|
+|`reader.useCompactSketches`|Controls whether or not compact representations of complex metrics are used (only for metrics that support compact forms)|No|False|

Review comment:
       Then, perhaps better rephrasing this to something like: 
   
   ```suggestion
   |`reader.useCompactSketches`|Controls whether or not compact representations of sketch metrics are used|No|False|
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-952607565


   @jihoonson added the branch to `.travis.yml` and fixed spellcheck 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r749915739



##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/v2.scala
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark
+
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory
+import org.apache.druid.segment.{IndexIO, IndexMergerV9}
+
+package object v2 { // scalastyle:ignore package.object.name

Review comment:
       This is a [Scala package object](https://docs.scala-lang.org/tour/package-objects.html). Although the `INDEX_IO` object is only used one place in the reader, that object and the `INDEX_MERGER_V9` object will also be used multiple places in the writer, which is why I've put them in the package object.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-952630338


   The packaging check is failing with
   ```Traceback (most recent call last):
     File "/home/travis/build/apache/druid/distribution/bin/generate-binary-license.py", line 181, in <module>
       generate_license(apache_license_v2, license_yaml)
     File "/home/travis/build/apache/druid/distribution/bin/generate-binary-license.py", line 140, in generate_license
       licenses_list = list(yaml.load_all(registry_file))
   
   TypeError: load_all() missing 1 required positional argument: 'Loader'
   
   [ERROR] Command execution failed.
   
   org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1)
       at org.apache.commons.exec.DefaultExecutor.executeInternal (DefaultExecutor.java:404)
       at org.apache.commons.exec.DefaultExecutor.execute (DefaultExecutor.java:166)
       at org.codehaus.mojo.exec.ExecMojo.executeCommandLine (ExecMojo.java:804)
       at org.codehaus.mojo.exec.ExecMojo.executeCommandLine (ExecMojo.java:751)
       at org.codehaus.mojo.exec.ExecMojo.execute (ExecMojo.java:313)
       at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:137)
       at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:210)
       at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:156)
       at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:148)
       at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
       at org.apache.maven.lifecycle.internal.builder.multithreaded.MultiThreadedBuilder$1.call (MultiThreadedBuilder.java:190)
       at org.apache.maven.lifecycle.internal.builder.multithreaded.MultiThreadedBuilder$1.call (MultiThreadedBuilder.java:186)
       at java.util.concurrent.FutureTask.run (FutureTask.java:266)
       at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
       at java.util.concurrent.FutureTask.run (FutureTask.java:266)
       at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
       at java.lang.Thread.run (Thread.java:748)```
   
   I had thought this was an non-obvious error due to not adding licenses yet but the check is still failing after I add the licenses. Is this something people regularly encounter?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-992136216


   @jihoonson `git diff d41a3f82 2b97037 --name-only` has empty output ([`d41a3f82`](https://github.com/apache/druid/pull/11823/commits/d41a3f825d79de7491761c2b5c0296431d6392ec) is the [last commit to successfully build](https://github.com/apache/druid/pull/11823/checks?check_run_id=4222019329) kafka-extraction-namespace, [`2b97037`](https://github.com/apache/druid/pull/11823/checks?check_run_id=4222019329) is the revert commit back after my recent changes that errors while building kafka-extraction-namespace). Based on this, I'm going to reintroduce my changes since they aren't the cause of the failure. Do you know if anything changed in the Travis environment between November 16th and December 1st, or if any other PRs have encountered this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cdmikechen commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
cdmikechen commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-1013578612


   Happy to hear that this PR has been merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] samarthjain commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r749089887



##########
File path: spark/src/main/scala/org/apache/druid/spark/clients/DruidMetadataClient.scala
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.clients
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.google.common.base.Suppliers
+import org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler
+import org.apache.druid.java.util.common.{DateTimes, Intervals, JodaUtils, StringUtils}
+import org.apache.druid.metadata.{DynamicConfigProvider, MetadataStorageConnectorConfig,
+  MetadataStorageTablesConfig, SQLMetadataConnector}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.registries.SQLConnectorRegistry
+import org.apache.druid.timeline.{DataSegment, Partitions, VersionedIntervalTimeline}
+import org.skife.jdbi.v2.{DBI, Handle}
+
+import java.util.Properties
+import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter,
+  asScalaSetConverter, mapAsJavaMapConverter}
+
+class DruidMetadataClient(
+                           metadataDbType: String,
+                           host: String,
+                           port: Int,
+                           connectUri: String,
+                           user: String,
+                           passwordProviderSer: String,
+                           dbcpMap: Properties,
+                           base: String = "druid"
+                         ) extends Logging {
+  private lazy val druidMetadataTableConfig = MetadataStorageTablesConfig.fromBase(base)
+  private lazy val dbcpProperties = new Properties(dbcpMap)
+  private lazy val password = if (passwordProviderSer == "") {
+    // Jackson doesn't like deserializing empty strings
+    passwordProviderSer
+  } else {
+    MAPPER.readValue[DynamicConfigProvider[String]](
+      passwordProviderSer, new TypeReference[DynamicConfigProvider[String]] {}
+    ).getConfig.getOrDefault("password", "")
+  }
+
+  private lazy val connectorConfig: MetadataStorageConnectorConfig =
+    new MetadataStorageConnectorConfig
+    {
+      override def isCreateTables: Boolean = false
+      override def getHost: String = host
+      override def getPort: Int = port
+      override def getConnectURI: String = connectUri
+      override def getUser: String = user
+      override def getPassword: String = password
+      override def getDbcpProperties: Properties = dbcpProperties
+    }
+  private lazy val connectorConfigSupplier = Suppliers.ofInstance(connectorConfig)
+  private lazy val metadataTableConfigSupplier = Suppliers.ofInstance(druidMetadataTableConfig)
+  private lazy val connector = buildSQLConnector()
+
+  /**
+    * Get the non-overshadowed used segments for DATASOURCE between INTERVALSTART and INTERVALEND. If either interval
+    * endpoint is None, JodaUtils.MIN_INSTANCE/MAX_INSTANCE is used instead. By default, only segments for complete
+    * partitions are returned. This behavior can be overriden by setting ALLOWINCOMPLETEPARTITIONS, in which case all
+    * non-overshadowed segments in the interval will be returned, regardless of completesness.
+    *
+    * @param datasource The Druid data source to get segment payloads for.
+    * @param intervalStart The start of the interval to fetch segment payloads for. If None, MIN_INSTANT is used.

Review comment:
       You may want to mention that start and end intervals are inclusive. 

##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReader.scala
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import org.apache.druid.java.util.common.{Intervals, JodaUtils}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.clients.{DruidClient, DruidMetadataClient}
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.utils.{FilterUtils, SchemaUtils}
+import org.apache.druid.timeline.DataSegment
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition,
+  SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.joda.time.Interval
+
+import java.util.{List => JList}
+import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}
+
+/**
+  * A DruidDataSourceReader handles the actual work of reading data from Druid. It does this by querying to determine
+  * where Druid segments live in deep storage and then reading those segments into memory in order to avoid straining
+  * the Druid cluster. In general, users should not directly instantiate instances of this class but instead use
+  * sparkSession.read.format("druid").options(Map(...)).load(). If the schema of the data in Druid is known, overhead
+  * can be further reduced by providing it directly (e.g. sparkSession.read.format("druid").schema(schema).options...)
+  *
+  * To aid comprehensibility, some idiomatic Scala has been somewhat java-fied.
+  */
+class DruidDataSourceReader(
+                             var schema: Option[StructType] = None,
+                             conf: Configuration
+                           ) extends DataSourceReader
+  with SupportsPushDownRequiredColumns with SupportsPushDownFilters with SupportsScanColumnarBatch with Logging {
+  private lazy val metadataClient =
+    DruidDataSourceReader.createDruidMetaDataClient(conf)
+  private lazy val druidClient = DruidDataSourceReader.createDruidClient(conf)
+
+  private var filters: Array[Filter] = Array.empty
+  private var druidColumnTypes: Option[Set[String]] = Option.empty
+
+  override def readSchema(): StructType = {
+    if (schema.isDefined) {
+      schema.get
+    } else {
+      require(conf.isPresent(DruidConfigurationKeys.tableKey),
+        s"Must set ${DruidConfigurationKeys.tableKey}!")
+      // TODO: Optionally accept a granularity so that if lowerBound to upperBound spans more than
+      //  twice the granularity duration, we can send a list with two disjoint intervals and
+      //  minimize the load on the broker from having to merge large numbers of segments
+      val (lowerBound, upperBound) = FilterUtils.getTimeFilterBounds(filters)
+      val columnMap = druidClient.getSchema(
+        conf.getString(DruidConfigurationKeys.tableKey),
+        Some(List[Interval](Intervals.utc(
+          lowerBound.getOrElse(JodaUtils.MIN_INSTANT),
+          upperBound.getOrElse(JodaUtils.MAX_INSTANT)
+        )))
+      )
+      schema = Option(SchemaUtils.convertDruidSchemaToSparkSchema(columnMap))
+      druidColumnTypes = Option(columnMap.map(_._2._1).toSet)
+      schema.get
+    }
+  }
+
+  override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
+    // For now, one partition for each Druid segment partition
+    // Future improvements can use information from SegmentAnalyzer results to do smart things
+    if (schema.isEmpty) {
+      readSchema()
+    }
+    val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)
+    val filter = FilterUtils.mapFilters(filters, schema.get)
+    val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)
+    val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)
+    val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
+
+    // Allow passing hard-coded list of segments to load
+    if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {
+      val segments: JList[DataSegment] = MAPPER.readValue(
+        readerConf.getString(DruidConfigurationKeys.segmentsKey),
+        new TypeReference[JList[DataSegment]]() {}
+      )
+      segments.asScala
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    } else {
+      getSegments
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    }
+  }
+
+  override def pruneColumns(structType: StructType): Unit = {
+    schema = Option(structType)
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    readSchema()
+    filters.partition(FilterUtils.isSupportedFilter(_, schema.get)) match {
+      case (supported, unsupported) =>
+        this.filters = supported
+        unsupported
+    }
+  }
+
+  override def pushedFilters(): Array[Filter] = filters
+
+  private[v2] def getSegments: Seq[DataSegment] = {
+    require(conf.isPresent(DruidConfigurationKeys.tableKey),
+      s"Must set ${DruidConfigurationKeys.tableKey}!")
+
+    // Check filters for any bounds on __time
+    // Otherwise, we'd need to full scan the segments table
+    val (lowerTimeBound, upperTimeBound) = FilterUtils.getTimeFilterBounds(filters)
+
+    metadataClient.getSegmentPayloads(
+      conf.getString(DruidConfigurationKeys.tableKey),
+      lowerTimeBound,
+      upperTimeBound,
+      conf.getBoolean(DruidConfigurationKeys.allowIncompletePartitionsDefaultKey)
+    )
+  }
+
+  override def planBatchInputPartitions(): JList[InputPartition[ColumnarBatch]] = {
+    if (schema.isEmpty) {
+      readSchema()
+    }
+    val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)
+    val filter = FilterUtils.mapFilters(filters, schema.get)
+    val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)
+    val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)
+    val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
+    val batchSize = readerConf.getInt(DruidConfigurationKeys.batchSizeDefaultKey)
+
+    // Allow passing hard-coded list of segments to load
+    if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {
+      val segments: JList[DataSegment] = MAPPER.readValue(
+        readerConf.getString(DruidConfigurationKeys.segmentsKey),
+        new TypeReference[JList[DataSegment]]() {}
+      )
+      segments.asScala
+        .map(segment =>
+          new DruidColumnarInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling,
+            batchSize
+          ): InputPartition[ColumnarBatch]
+        ).asJava
+    } else {
+      getSegments
+        .map(segment =>
+          new DruidColumnarInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling,
+            batchSize
+          ): InputPartition[ColumnarBatch]
+        ).asJava
+    }
+  }
+
+  override def enableBatchRead(): Boolean = {
+    // Fail fast
+    if (!conf.dive(DruidConfigurationKeys.readerPrefix).getBoolean(DruidConfigurationKeys.vectorizeDefaultKey)) {
+      false
+    } else {
+      if (schema.isEmpty) {
+        readSchema()
+      }
+      val filterOpt = FilterUtils.mapFilters(filters, schema.get)
+      filterOpt.fold(true) { filter =>
+        val rowSignature = SchemaUtils.generateRowSignatureFromSparkSchema(schema.get)
+        val canVectorize = filter.toOptimizedFilter.canVectorizeMatcher(rowSignature)

Review comment:
       What about the case when Druid doesn't support vectorized reads for say a complex column type? Does that mean we should be looking at the schema also to decide value of `canVectorize`? We should probably disable vectorization then. 

##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReader.scala
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.java.util.common.{FileUtils, IAE, ISE, StringUtils}
+import org.apache.druid.query.dimension.DefaultDimensionSpec
+import org.apache.druid.query.filter.DimFilter
+import org.apache.druid.segment.column.ValueType
+import org.apache.druid.segment.vector.{VectorColumnSelectorFactory, VectorCursor}
+import org.apache.druid.segment.{QueryableIndexStorageAdapter, VirtualColumns}
+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.registries.ComplexMetricRegistry
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
+import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, StringType,
+  StructField, StructType, TimestampType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+
+class DruidColumnarInputPartitionReader(
+                                         segmentStr: String,
+                                         schema: StructType,
+                                         filter: Option[DimFilter],
+                                         columnTypes: Option[Set[String]],
+                                         broadcastedHadoopConf: Broadcast[SerializableHadoopConfiguration],
+                                         conf: Configuration,
+                                         useSparkConfForDeepStorage: Boolean,
+                                         useCompactSketches: Boolean,
+                                         useDefaultNullHandling: Boolean,
+                                         batchSize: Int
+                                       )
+  extends DruidBaseInputPartitionReader(
+    segmentStr,
+    columnTypes,
+    broadcastedHadoopConf,
+    conf,
+    useSparkConfForDeepStorage,
+    useCompactSketches,
+    useDefaultNullHandling
+  ) with InputPartitionReader[ColumnarBatch] with Logging {
+
+  private val adapter = new QueryableIndexStorageAdapter(queryableIndex)
+
+  private val cursor: VectorCursor = adapter.makeVectorCursor(
+    filter.map(_.toOptimizedFilter).orNull,
+    adapter.getInterval,
+    VirtualColumns.EMPTY,
+    false,
+    batchSize,
+    null) // scalastyle:ignore null
+
+  private val columnVectors: Array[OnHeapColumnVector] = OnHeapColumnVector.allocateColumns(batchSize, schema)
+  private val resultBatch: ColumnarBatch = new ColumnarBatch(columnVectors.map(_.asInstanceOf[ColumnVector]))
+
+  override def next(): Boolean = {
+    if (!cursor.isDone) {
+      fillVectors()
+      true
+    } else {
+      false
+    }
+  }
+
+  override def get(): ColumnarBatch = {
+    resultBatch
+  }
+
+  override def close(): Unit = {
+    resultBatch.close()

Review comment:
       Can any of these `close()` calls throw an `Exception`? We probably need to guard against that. 

##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReader.scala
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.java.util.common.{FileUtils, IAE, ISE, StringUtils}
+import org.apache.druid.query.dimension.DefaultDimensionSpec
+import org.apache.druid.query.filter.DimFilter
+import org.apache.druid.segment.column.ValueType
+import org.apache.druid.segment.vector.{VectorColumnSelectorFactory, VectorCursor}
+import org.apache.druid.segment.{QueryableIndexStorageAdapter, VirtualColumns}
+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.registries.ComplexMetricRegistry
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
+import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, StringType,
+  StructField, StructType, TimestampType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+
+class DruidColumnarInputPartitionReader(

Review comment:
       Would a better name instead be `DruidVectorizedInputPartitionReader`. I am not sure if `Columnar` is representing the fact that this reader is reading column values in batches. 

##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/v2.scala
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark
+
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory
+import org.apache.druid.segment.{IndexIO, IndexMergerV9}
+
+package object v2 { // scalastyle:ignore package.object.name

Review comment:
       What is the purpose of this class? 

##########
File path: spark/src/main/scala/org/apache/druid/spark/utils/NullHandlingUtils.scala
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.utils
+
+import org.apache.druid.common.config.{NullHandling, NullValueHandlingConfig}
+
+/**
+  * Utility class for initializing a Druid NullValueHandlingConfig. In a Druid cluster, this is handled via injection
+  * and in unit tests defautl value null handling is initialized via NullHandling.initializeForTests(), but we need

Review comment:
       typo: default

##########
File path: spark/src/main/scala/org/apache/druid/spark/utils/FilterUtils.scala
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.utils
+
+import org.apache.druid.java.util.common.{ISE, JodaUtils}
+import org.apache.druid.query.filter.{AndDimFilter, BoundDimFilter, DimFilter, InDimFilter,
+  NotDimFilter, OrDimFilter, RegexDimFilter, SelectorDimFilter}
+import org.apache.druid.query.ordering.{StringComparator, StringComparators}
+import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan,
+  GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains,
+  StringEndsWith, StringStartsWith}
+import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, IntegerType,
+  LongType, StringType, StructType, TimestampType}
+
+import scala.collection.JavaConverters.{seqAsJavaListConverter, setAsJavaSetConverter}
+
+/**
+  * Converters and utilities for working with Spark and Druid Filters.
+  */
+object FilterUtils {
+  /**
+    * Map an array of Spark filters FILTERS to a Druid dim filter or None if filters is empty.
+    *
+    * We return a DimFilter instead of a Filter and force callers to call .toFilter
+    * or .toOptimizedFilter to get a filter because callers can't covert back to a DimFilter from a
+    * Filter.
+    *
+    * @param filters The spark filters to map to a Druid filter.
+    * @return A Druid filter corresponding to the union of filter conditions enumerated in FILTERS.
+    */
+  def mapFilters(filters: Array[Filter], schema: StructType): Option[DimFilter] = {
+    if (filters.isEmpty) {
+      Option.empty[DimFilter]
+    } else {
+      Some(new AndDimFilter(filters.map(mapFilter(_, schema)).toList.asJava).optimize())
+    }
+  }
+
+  /**
+    * Convert a Spark-style filter FILTER to a Druid-style filter.
+    *
+    * @param filter The Spark filter to map to a Druid filter.
+    * @return The Druid filter corresponding to the filter condition described by FILTER.
+    */
+  def mapFilter(filter: Filter, schema: StructType): DimFilter = { // scalastyle:ignore method.length

Review comment:
       Druid supports a few other filter types like `Search`, `Extraction`, `Expression`, `Javascript` etc. Are there any spark counterparts for those? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest closed pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest closed pull request #11823:
URL: https://github.com/apache/druid/pull/11823


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest edited a comment on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest edited a comment on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-953674869


   @jihoonson thanks for starting your review. The end-to-end tests I was referring to in our earlier discussion haven't been added yet (since this PR doesn't include the writing logic), but when they are added no changes will be necessary - they're lightweight enough run as part of `mvn test`. This PR does include the unit tests for the reader logic, which also run via `mvn test`, as expected. If you'd like, I can create an explicit Travis job for the spark tests instead of letting them be bundled into the `other modules test` jobs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r749909190



##########
File path: spark/src/main/scala/org/apache/druid/spark/utils/FilterUtils.scala
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.utils
+
+import org.apache.druid.java.util.common.{ISE, JodaUtils}
+import org.apache.druid.query.filter.{AndDimFilter, BoundDimFilter, DimFilter, InDimFilter,
+  NotDimFilter, OrDimFilter, RegexDimFilter, SelectorDimFilter}
+import org.apache.druid.query.ordering.{StringComparator, StringComparators}
+import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan,
+  GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains,
+  StringEndsWith, StringStartsWith}
+import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, IntegerType,
+  LongType, StringType, StructType, TimestampType}
+
+import scala.collection.JavaConverters.{seqAsJavaListConverter, setAsJavaSetConverter}
+
+/**
+  * Converters and utilities for working with Spark and Druid Filters.
+  */
+object FilterUtils {
+  /**
+    * Map an array of Spark filters FILTERS to a Druid dim filter or None if filters is empty.
+    *
+    * We return a DimFilter instead of a Filter and force callers to call .toFilter
+    * or .toOptimizedFilter to get a filter because callers can't covert back to a DimFilter from a
+    * Filter.
+    *
+    * @param filters The spark filters to map to a Druid filter.
+    * @return A Druid filter corresponding to the union of filter conditions enumerated in FILTERS.
+    */
+  def mapFilters(filters: Array[Filter], schema: StructType): Option[DimFilter] = {
+    if (filters.isEmpty) {
+      Option.empty[DimFilter]
+    } else {
+      Some(new AndDimFilter(filters.map(mapFilter(_, schema)).toList.asJava).optimize())
+    }
+  }
+
+  /**
+    * Convert a Spark-style filter FILTER to a Druid-style filter.
+    *
+    * @param filter The Spark filter to map to a Druid filter.
+    * @return The Druid filter corresponding to the filter condition described by FILTER.
+    */
+  def mapFilter(filter: Filter, schema: StructType): DimFilter = { // scalastyle:ignore method.length

Review comment:
       There aren't. The main reason to push down some Spark filters to the readers is to be able to parse out bounds on `__time` that we could use to further reduce which segments we open. As a happy benefit, in some cases we can push down filters to the readers and filter out rows before returning them to Spark, but we're not aiming to run the Druid query execution engine in a Spark executor. Any operation that would be evaluated in a Druid `Expression` or `Javascript` filter should instead be handled by Spark. (As an aside, I wouldn't expect Spark to even attempt to push down any predicates that needed UDFs or custom code to be executed. You can see the set of Spark Filters that can be pushed down in the `isSupportedFilter` function.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r749904700



##########
File path: spark/src/main/scala/org/apache/druid/spark/clients/DruidMetadataClient.scala
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.clients
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.google.common.base.Suppliers
+import org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler
+import org.apache.druid.java.util.common.{DateTimes, Intervals, JodaUtils, StringUtils}
+import org.apache.druid.metadata.{DynamicConfigProvider, MetadataStorageConnectorConfig,
+  MetadataStorageTablesConfig, SQLMetadataConnector}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.registries.SQLConnectorRegistry
+import org.apache.druid.timeline.{DataSegment, Partitions, VersionedIntervalTimeline}
+import org.skife.jdbi.v2.{DBI, Handle}
+
+import java.util.Properties
+import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter,
+  asScalaSetConverter, mapAsJavaMapConverter}
+
+class DruidMetadataClient(
+                           metadataDbType: String,
+                           host: String,
+                           port: Int,
+                           connectUri: String,
+                           user: String,
+                           passwordProviderSer: String,
+                           dbcpMap: Properties,
+                           base: String = "druid"
+                         ) extends Logging {
+  private lazy val druidMetadataTableConfig = MetadataStorageTablesConfig.fromBase(base)
+  private lazy val dbcpProperties = new Properties(dbcpMap)
+  private lazy val password = if (passwordProviderSer == "") {
+    // Jackson doesn't like deserializing empty strings
+    passwordProviderSer
+  } else {
+    MAPPER.readValue[DynamicConfigProvider[String]](
+      passwordProviderSer, new TypeReference[DynamicConfigProvider[String]] {}
+    ).getConfig.getOrDefault("password", "")
+  }
+
+  private lazy val connectorConfig: MetadataStorageConnectorConfig =
+    new MetadataStorageConnectorConfig
+    {
+      override def isCreateTables: Boolean = false
+      override def getHost: String = host
+      override def getPort: Int = port
+      override def getConnectURI: String = connectUri
+      override def getUser: String = user
+      override def getPassword: String = password
+      override def getDbcpProperties: Properties = dbcpProperties
+    }
+  private lazy val connectorConfigSupplier = Suppliers.ofInstance(connectorConfig)
+  private lazy val metadataTableConfigSupplier = Suppliers.ofInstance(druidMetadataTableConfig)
+  private lazy val connector = buildSQLConnector()
+
+  /**
+    * Get the non-overshadowed used segments for DATASOURCE between INTERVALSTART and INTERVALEND. If either interval
+    * endpoint is None, JodaUtils.MIN_INSTANCE/MAX_INSTANCE is used instead. By default, only segments for complete
+    * partitions are returned. This behavior can be overriden by setting ALLOWINCOMPLETEPARTITIONS, in which case all
+    * non-overshadowed segments in the interval will be returned, regardless of completesness.
+    *
+    * @param datasource The Druid data source to get segment payloads for.
+    * @param intervalStart The start of the interval to fetch segment payloads for. If None, MIN_INSTANT is used.

Review comment:
       I'm not 100% on how to word this: the SQL query being executed uses `>=` and `<=` for its bounds, but it's effectively querying against JodaTime Interval endpoints, where the start time is inclusive and the end time is exclusive. The net result is fetching segments within `[startTime, endTime)`. If you were encountering this method while working in an unfamiliar code base, how would you want this communicated?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r755737499



##########
File path: spark/src/main/scala/org/apache/druid/spark/registries/ComplexMetricRegistry.scala
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.registries
+
+import org.apache.datasketches.hll.HllSketch
+import org.apache.datasketches.quantiles.DoublesSketch
+import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule
+import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule
+import org.apache.druid.query.aggregation.datasketches.theta.{SketchHolder, SketchModule}
+import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule
+import org.apache.druid.query.aggregation.histogram.{ApproximateHistogram,
+  ApproximateHistogramDruidModule, FixedBucketsHistogram, FixedBucketsHistogramAggregator}
+import org.apache.druid.query.aggregation.variance.{VarianceAggregatorCollector, VarianceSerde}
+import org.apache.druid.segment.serde.ComplexMetrics
+import org.apache.druid.spark.mixins.Logging
+
+import scala.collection.mutable
+
+/**
+ * A registry for plugging in support for Druid complex metric types. Provides definitions for supporting complex types
+ * in extensions-core out of the box.
+ */
+object ComplexMetricRegistry extends Logging {

Review comment:
       nit: `ComplexTypeRegistry` could be a better name as we are trying to support complexTypes for dimensions.

##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidInputPartitionReader.scala
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.java.util.common.FileUtils
+import org.apache.druid.query.filter.DimFilter
+import org.apache.druid.segment.realtime.firehose.{IngestSegmentFirehose, WindowedStorageAdapter}
+import org.apache.druid.segment.transform.TransformSpec
+import org.apache.druid.segment.QueryableIndexStorageAdapter
+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.utils.SchemaUtils
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, seqAsJavaListConverter}
+
+class DruidInputPartitionReader(
+                                 segmentStr: String,
+                                 schema: StructType,
+                                 filter: Option[DimFilter],
+                                 columnTypes: Option[Set[String]],
+                                 broadcastedHadoopConf: Broadcast[SerializableHadoopConfiguration],
+                                 conf: Configuration,
+                                 useSparkConfForDeepStorage: Boolean,
+                                 useCompactSketches: Boolean,
+                                 useDefaultNullHandling: Boolean
+                               )
+  extends DruidBaseInputPartitionReader(
+    segmentStr,
+    columnTypes,
+    broadcastedHadoopConf,
+    conf,
+    useSparkConfForDeepStorage,
+    useCompactSketches,
+    useDefaultNullHandling
+  ) with InputPartitionReader[InternalRow] with Logging {
+
+  private val firehose: IngestSegmentFirehose = DruidInputPartitionReader.makeFirehose(
+    new WindowedStorageAdapter(
+      new QueryableIndexStorageAdapter(queryableIndex), segment.getInterval
+    ),
+    filter.orNull,
+    schema.fieldNames.toList
+  )
+
+  override def next(): Boolean = {
+    firehose.hasMore
+  }
+
+  override def get(): InternalRow = {
+    SchemaUtils.convertInputRowToSparkRow(firehose.nextRow(), schema, useDefaultNullHandling)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (Option(firehose).nonEmpty) {
+        firehose.close()
+      }
+      if (Option(queryableIndex).nonEmpty) {
+        queryableIndex.close()
+      }
+      if (Option(tmpDir).nonEmpty) {
+        FileUtils.deleteDirectory(tmpDir)
+      }
+    } catch {
+      case e: Exception =>
+        // Since we're just going to rethrow e and tearing down the JVM will clean up the firehose and queryable index
+        // even if we can't, the only leak we have to worry about is the temp file. Spark should clean up temp files as
+        // well, but rather than rely on that we'll try to take care of it ourselves.
+        logWarn("Encountered exception attempting to close a DruidInputPartitionReader!")
+        if (Option(tmpDir).nonEmpty && tmpDir.exists()) {
+          FileUtils.deleteDirectory(tmpDir)
+        }
+        throw e
+    }
+  }
+}
+
+private[v2] object DruidInputPartitionReader {
+  private def makeFirehose(
+                            adapter: WindowedStorageAdapter,
+                            filter: DimFilter,
+                            columns: List[String]): IngestSegmentFirehose = {
+    // This could be in-lined into the return, but this is more legible
+    val availableDimensions = adapter.getAdapter.getAvailableDimensions.asScala.toSet
+    val availableMetrics = adapter.getAdapter.getAvailableMetrics.asScala.toSet
+    val dimensions = columns.filter(availableDimensions.contains).asJava
+    val metrics = columns.filter(availableMetrics.contains).asJava
+
+    new IngestSegmentFirehose(List(adapter).asJava, TransformSpec.NONE, dimensions, metrics, filter)

Review comment:
       Firehose is deprecated and will be removed sooner or later. Suggest to use InputFormat instead.

##########
File path: spark/src/main/scala/org/apache/druid/spark/utils/DeepStorageConstructorHelpers.scala
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.utils
+
+import com.fasterxml.jackson.databind.MapperFeature
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.druid.common.aws.{AWSClientConfig, AWSCredentialsConfig, AWSEndpointConfig,
+  AWSModule, AWSProxyConfig}
+import org.apache.druid.common.gcp.GcpModule
+import org.apache.druid.java.util.common.StringUtils
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.TryWithResources
+import org.apache.druid.storage.azure.{AzureAccountConfig, AzureDataSegmentConfig,
+  AzureInputDataConfig, AzureStorage, AzureStorageDruidModule}
+import org.apache.druid.storage.google.{GoogleAccountConfig, GoogleInputDataConfig, GoogleStorage,
+  GoogleStorageDruidModule}
+import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig
+import org.apache.druid.storage.s3.{NoopServerSideEncryption, S3DataSegmentPusherConfig,
+  S3InputDataConfig, S3SSECustomConfig, S3SSEKmsConfig, S3StorageConfig, S3StorageDruidModule,
+  ServerSideEncryptingAmazonS3, ServerSideEncryption}
+import org.apache.hadoop.conf.{Configuration => HConf}
+
+import java.io.{ByteArrayInputStream, DataInputStream}
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+
+object DeepStorageConstructorHelpers extends TryWithResources {
+  /*
+   * Spark DataSourceOption property maps are case insensitive, by which they mean they lower-case all keys. Since all
+   * our user-provided property keys will come to us via a DataSourceOption, we need to use a case-insensisitive jackson
+   * mapper to deserialize property maps into objects. We want to be case-aware in the rest of our code, so we create a
+   * private, case-insensitive copy of our mapper here.
+   */
+  private val caseInsensitiveMapper = MAPPER.copy()
+    .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+    .registerModule(DefaultScalaModule)
+
+  // Local Storage Helpers
+
+  def createLocalDataSegmentPusherConfig(conf: Configuration): LocalDataSegmentPusherConfig = {
+    convertConfToInstance(conf, classOf[LocalDataSegmentPusherConfig])
+  }
+
+  // HDFS Storage Helpers
+
+  def createHdfsDataSegmentPusherConfig(conf: Configuration): HdfsDataSegmentPusherConfig = {
+    convertConfToInstance(conf, classOf[HdfsDataSegmentPusherConfig])
+  }
+
+  def createHadoopConfiguration(conf: Configuration): HConf = {
+    val hadoopConf = new HConf()
+    val confByteStream = new ByteArrayInputStream(
+      StringUtils.decodeBase64String(conf.getString(DruidConfigurationKeys.hdfsHadoopConfKey))
+    )
+    tryWithResources(confByteStream, new DataInputStream(confByteStream)){
+      case (_, inputStream: DataInputStream) => hadoopConf.readFields(inputStream)
+    }
+    hadoopConf
+  }
+
+  // S3 Storage Helpers
+
+  /**
+    * Create an S3DataSegmentPusherConfig from the relevant properties in CONF.
+    *
+    * *** Note that we explicitly override the default for `useS3aSchema`! ***
+    * Almost all users will want to use s3a, not s3n, and we have no backwards-compatibility to maintain.
+    *
+    * @param conf The Configuration object specifying the S3DataSegmentPusherConfig to create.
+    * @return An S3DataSegmentPusherConfig derived from the properties specified in CONF.
+    */
+  def createS3DataSegmentPusherConfig(conf: Configuration): S3DataSegmentPusherConfig = {
+    if (!conf.isPresent(DruidConfigurationKeys.s3UseS3ASchemaKey)) {
+      convertConfToInstance(conf.merge(
+        Configuration.fromKeyValue(DruidConfigurationKeys.s3UseS3ASchemaKey, "true")
+      ), classOf[S3DataSegmentPusherConfig])
+    } else {
+      convertConfToInstance(conf, classOf[S3DataSegmentPusherConfig])
+    }
+  }
+
+  def createS3InputDataConfig(conf: Configuration): S3InputDataConfig = {
+    convertConfToInstance(conf, classOf[S3InputDataConfig])
+  }
+
+  def createServerSideEncryptingAmazonS3(conf: Configuration): ServerSideEncryptingAmazonS3 = {
+    val (credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig) =
+      createConfigsForServerSideEncryptingAmazonS3(conf)
+
+    val awsModule = new AWSModule
+    val s3Module = new S3StorageDruidModule
+    val credentialsProvider = awsModule.getAWSCredentialsProvider(credentialsConfig)
+    s3Module.getAmazonS3Client(
+      s3Module.getServerSideEncryptingAmazonS3Builder(
+        credentialsProvider,
+        proxyConfig,
+        endpointConfig,
+        clientConfig,
+        s3StorageConfig
+      )
+    )
+  }
+
+  def createConfigsForServerSideEncryptingAmazonS3(conf: Configuration):
+  (AWSCredentialsConfig, AWSProxyConfig, AWSEndpointConfig, AWSClientConfig, S3StorageConfig) = {
+    val credentialsConfig = convertConfToInstance(conf, classOf[AWSCredentialsConfig])
+
+    val proxyConfig = convertConfToInstance(conf.dive("proxy"), classOf[AWSProxyConfig])
+
+    val endpointConfig = convertConfToInstance(conf.dive("endpoint"), classOf[AWSEndpointConfig])
+
+    val clientConfig = convertConfToInstance(conf.dive("client"), classOf[AWSClientConfig])
+
+    val s3StorageConfig = createS3StorageConfig(conf.dive(DruidConfigurationKeys.s3ServerSideEncryptionPrefix))
+    (credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig)
+  }
+
+  /**
+    * A helper method for creating instances of S3StorageConfigs from a Configuration. While I'm sure there's a simple
+    * solution I'm missing, I would have thought that something like the following would have worked:
+    *
+    * ```
+    * val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])
+    * caseInsensitiveMapper.setInjectableValues(new InjectableValues.Std().addValue(classOf[S3SSEKmsConfig], kmsConfig))
+    * val ser = caseInsensitiveMapper.writeValueAsString(Map[String, String]("type" -> "kms"))
+    * caseInsensitiveMapper.readValue[ServerSideEncryption](ser, new TypeReference[ServerSideEncryption] {})
+    * ```
+    *
+    * However, the code above throws an com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Invalid
+    * definition for property `config` (of type `org.apache.druid.storage.s3.KmsServerSideEncryption`): Could not find
+    * creator property with name 'config' (known Creator properties: [])
+    *
+    * I _think_ that the root cause is that ServerSideEncryption is abstract, but the error message above isn't
+    * what I would expect. Nevertheless, the simple solution would be to serialize to a KmsServerSideEncryption
+    * instance and then cast to the base ServerSideEncryption to assign. Unfortunately, KmsServerSideEncryption
+    * is package-private, so we can't access the class here. Since we already have the config object and we
+    * need to muck about with field visibility, we take the shortcut and just make the constructor accessible. This
+    * solution generalizes to the CustomServerSideEncyption case as well.
+    */
+  def createS3StorageConfig(conf: Configuration): S3StorageConfig = {
+    // There's probably a more elegant way to do this that would allow us to transparently support new sse types, but
+    // this will work for now.
+    val sseType = conf.get(DruidConfigurationKeys.s3ServerSideEncryptionTypeKey)
+
+    // Getting the list of subtypes since we'll need to use it to grab references to the package-private implementations
+    val config = caseInsensitiveMapper.getDeserializationConfig
+    val ac = AnnotatedClass.constructWithoutSuperTypes(classOf[ServerSideEncryption], config)
+    val subtypes = caseInsensitiveMapper.getSubtypeResolver.collectAndResolveSubtypesByClass(config, ac)
+
+    val serverSideEncryption: ServerSideEncryption = sseType match {
+      case Some("s3") =>
+        val clazz = subtypes.asScala.filter(_.getName == "s3").head.getType
+        val constructor = clazz.getDeclaredConstructor()
+        constructor.setAccessible(true)
+        constructor.newInstance().asInstanceOf[ServerSideEncryption]
+      case Some("kms") =>
+        val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])
+        val clazz = subtypes.asScala.filter(_.getName == "kms").head.getType
+        val constructor = clazz.getDeclaredConstructor(classOf[S3SSEKmsConfig])
+        constructor.setAccessible(true)
+        constructor.newInstance(kmsConfig).asInstanceOf[ServerSideEncryption]
+      case Some("custom") =>
+        val customConfig = convertConfToInstance(conf.dive("custom"), classOf[S3SSECustomConfig])
+        val clazz = subtypes.asScala.filter(_.getName == "custom").head.getType
+        val constructor = clazz.getDeclaredConstructor(classOf[S3SSECustomConfig])
+        constructor.setAccessible(true)
+        constructor.newInstance(customConfig).asInstanceOf[ServerSideEncryption]
+      case _ => new NoopServerSideEncryption
+    }
+    new S3StorageConfig(serverSideEncryption)
+  }
+
+  // GCS Storage Helpers
+
+  def createGoogleAcountConfig(conf: Configuration): GoogleAccountConfig = {
+    convertConfToInstance(conf, classOf[GoogleAccountConfig])
+  }
+
+  def createGoogleInputDataConfig(conf: Configuration): GoogleInputDataConfig = {
+    convertConfToInstance(conf, classOf[GoogleInputDataConfig])
+  }
+
+  def createGoogleStorage(): GoogleStorage = {
+    val gcpModule = new GcpModule
+    val gcpStorageModule = new GoogleStorageDruidModule
+
+    val httpTransport = gcpModule.getHttpTransport
+    val jsonFactory = gcpModule.getJsonFactory
+    val requestInitializer = gcpModule.getHttpRequestInitializer(httpTransport, jsonFactory)
+    gcpStorageModule.getGoogleStorage(httpTransport, jsonFactory, requestInitializer)
+  }
+
+  // Azure Storage Helpers
+
+  def createAzureDataSegmentConfig(conf: Configuration): AzureDataSegmentConfig = {
+    convertConfToInstance(conf, classOf[AzureDataSegmentConfig])
+  }
+
+  def createAzureInputDataConfig(conf: Configuration): AzureInputDataConfig = {
+    convertConfToInstance(conf, classOf[AzureInputDataConfig])
+  }
+
+  def createAzureAccountConfig(conf: Configuration): AzureAccountConfig = {
+    convertConfToInstance(conf, classOf[AzureAccountConfig])
+  }
+
+  def createAzureStorage(conf: Configuration): AzureStorage = {

Review comment:
       If we ever want to support a new storage type, then should we modify this file? Could there be a more extendible way to support new types without recompiling the spark connector? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-1012002051


   @jihoonson picking this back up now that I'm back from vacation. Because this PR isn't against master, would it be possible to land it despite the seemingly unrelated failing test? This would allow me to put the last PR for this feature up (the writing component) and begin getting reviews while working on the kafka lookup extraction test in parallel.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson merged pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
jihoonson merged pull request #11823:
URL: https://github.com/apache/druid/pull/11823


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-987623235


   @JulianJaffePinterest thanks for addressing comments. Can you check the travis failure? One is the doc spelling check and another is the "other module test". The other module test failure seems strange as the forked vm exploded while testing `druid-kafka-extraction-namespace` but seems related to this PR because the same job failed at the same module after restart. Perhaps it's something about splitting the "spark module test" out of the "other module test"? The PR looks good to go once the CI is fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r760100319



##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidInputPartitionReader.scala
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.java.util.common.FileUtils
+import org.apache.druid.query.filter.DimFilter
+import org.apache.druid.segment.realtime.firehose.{IngestSegmentFirehose, WindowedStorageAdapter}
+import org.apache.druid.segment.transform.TransformSpec
+import org.apache.druid.segment.QueryableIndexStorageAdapter
+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.utils.SchemaUtils
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, seqAsJavaListConverter}
+
+class DruidInputPartitionReader(
+                                 segmentStr: String,
+                                 schema: StructType,
+                                 filter: Option[DimFilter],
+                                 columnTypes: Option[Set[String]],
+                                 broadcastedHadoopConf: Broadcast[SerializableHadoopConfiguration],
+                                 conf: Configuration,
+                                 useSparkConfForDeepStorage: Boolean,
+                                 useCompactSketches: Boolean,
+                                 useDefaultNullHandling: Boolean
+                               )
+  extends DruidBaseInputPartitionReader(
+    segmentStr,
+    columnTypes,
+    broadcastedHadoopConf,
+    conf,
+    useSparkConfForDeepStorage,
+    useCompactSketches,
+    useDefaultNullHandling
+  ) with InputPartitionReader[InternalRow] with Logging {
+
+  private val firehose: IngestSegmentFirehose = DruidInputPartitionReader.makeFirehose(
+    new WindowedStorageAdapter(
+      new QueryableIndexStorageAdapter(queryableIndex), segment.getInterval
+    ),
+    filter.orNull,
+    schema.fieldNames.toList
+  )
+
+  override def next(): Boolean = {
+    firehose.hasMore
+  }
+
+  override def get(): InternalRow = {
+    SchemaUtils.convertInputRowToSparkRow(firehose.nextRow(), schema, useDefaultNullHandling)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (Option(firehose).nonEmpty) {
+        firehose.close()
+      }
+      if (Option(queryableIndex).nonEmpty) {
+        queryableIndex.close()
+      }
+      if (Option(tmpDir).nonEmpty) {
+        FileUtils.deleteDirectory(tmpDir)
+      }
+    } catch {
+      case e: Exception =>
+        // Since we're just going to rethrow e and tearing down the JVM will clean up the firehose and queryable index
+        // even if we can't, the only leak we have to worry about is the temp file. Spark should clean up temp files as
+        // well, but rather than rely on that we'll try to take care of it ourselves.
+        logWarn("Encountered exception attempting to close a DruidInputPartitionReader!")
+        if (Option(tmpDir).nonEmpty && tmpDir.exists()) {
+          FileUtils.deleteDirectory(tmpDir)
+        }
+        throw e
+    }
+  }
+}
+
+private[v2] object DruidInputPartitionReader {
+  private def makeFirehose(
+                            adapter: WindowedStorageAdapter,
+                            filter: DimFilter,
+                            columns: List[String]): IngestSegmentFirehose = {
+    // This could be in-lined into the return, but this is more legible
+    val availableDimensions = adapter.getAdapter.getAvailableDimensions.asScala.toSet
+    val availableMetrics = adapter.getAdapter.getAvailableMetrics.asScala.toSet
+    val dimensions = columns.filter(availableDimensions.contains).asJava
+    val metrics = columns.filter(availableMetrics.contains).asJava
+
+    new IngestSegmentFirehose(List(adapter).asJava, TransformSpec.NONE, dimensions, metrics, filter)

Review comment:
       Refactored 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r738208127



##########
File path: docs/operations/spark.md
##########
@@ -0,0 +1,279 @@
+---
+id: spark
+title: "Apache Spark Reader and Writer"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Apache Spark Reader and Writer for Druid
+
+## Reader
+The reader reads Druid segments from deep storage into Spark. It locates the segments to read and determines their
+schema if not provided by querying the brokers for the relevant metadata but otherwise does not interact with a running
+Druid cluster.
+
+Sample Code:
+```scala
+import org.apache.druid.spark.DruidDataFrameReader
+
+val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
+
+sparkSession
+  .read
+  .brokerHost("localhost")
+  .brokerPort(8082)
+  .metadataDbType("mysql")
+  .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
+  .metadataUser("druid")
+  .metadataPassword("diurd")
+  .dataSource("dataSource")
+  .deepStorage(deepStorageConfig)
+  .druid()
+```
+
+Alternatively, the reader can be configured via a properties map with no additional import needed:
+```scala
+val properties = Map[String, String](
+  "metadata.dbType" -> "mysql",
+  "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
+  "metadata.user" -> "druid",
+  "metadata.password" -> "diurd",
+  "broker.host" -> "localhost",
+  "broker.port" -> 8082,
+  "table" -> "dataSource",
+  "reader.deepStorageType" -> "local",
+  "local.storageDirectory" -> "/mnt/druid/druid-segments/"
+)
+
+sparkSession
+  .read
+  .format("druid")
+  .options(properties)
+  .load()
+```
+
+If you know the schema of the Druid data source you're reading from, you can save needing to determine the schema via
+calls to the broker with
+```scala
+sparkSession
+  .read
+  .format("druid")
+  .schema(schema)
+  .options(properties)
+  .load()
+```
+
+Filters should be applied to the read-in data frame before any [Spark actions](http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.Dataset)
+are triggered, to allow predicates to be pushed down to the reader and avoid full scans of the underlying Druid data.
+
+## Plugin Registries and Druid Extension Support
+One of Druid's strengths is its extensibility. Since these Spark readers and writers will not execute on a Druid cluster
+and won't have the ability to dynamically load classes or integrate with Druid's Guice injectors, Druid extensions can't
+be used directly. Instead, these connectors use a plugin registry architecture, including default plugins that support
+most functionality in `extensions-core`. Custom plugins consisting of a string name and one or more serializable
+generator functions must be registered before the first Spark action which would depend on them is called.
+
+### ComplexMetricRegistry
+The `ComplexMetricRegistry` provides support for serializing and deserializing complex metric types between Spark and
+Druid. Support for complex metric types in Druid core extensions is provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for additional complex metric types can
+use the `ComplexMetricRegistry.register` functions to associate serde functions with a given complex metric type. The
+name used to register custom behavior must match the complex metric type name reported by Druid.
+**Note that custom plugins must be registered with both the executors and the Spark driver.**
+
+### SegmentReaderRegistry
+The `SegmentReaderRegistry` provides support for reading segments from deep storage. Local, HDFS, GCS, S3, and Azure
+Storage deep storage implementations are supported by default.
+
+Users wishing to override the default behavior or who need to add support for additional deep storage implementations
+can use either `SegmentReaderRegistry.registerInitializer` (to provide any necessary Jackson configuration for
+deserializing a `LoadSpec` object from a segment load spec) or `SegmentReaderRegistry.registerLoadFunction` (to register
+a function for creating a URI from a segment load spec). These two functions correspond to the first and second approach
+[outlined below](#deep-storage). **Note that custom plugins must be registered on the executors, not the Spark driver.**
+
+### SQLConnectorRegistry
+The `SQLConnectorRegistry` provides support for configuring connectors to Druid metadata databases. Support for MySQL,
+PostgreSQL, and Derby databases are provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for additional metadata database
+implementations can use the `SQLConnectorRegistry.register` function. Custom connectors should be registered on the
+driver.
+
+## Deploying to a Spark cluster
+This extension can be run on a Spark cluster in one of two ways: bundled as part of an application jar or uploaded as
+a library jar to a Spark cluster and included in the classpath provided to Spark applications by the application
+manager. If the second approach is used, this extension should be built with
+`mvn clean package -pl spark` and the resulting jar `druid-spark-<VERSION>.jar`
+uploaded to the Spark cluster. Application jars should then be built with a compile-time dependency on
+`org.apache.druid:druid-spark` (e.g. marked as `provided` in Maven or with `compileOnly` in Gradle).
+
+## Configuration Reference
+
+### Metadata Client Configs
+The properties used to configure the client that interacts with the Druid metadata server directly. Used by both reader
+and the writer. The `metadataPassword` property can either be provided as a string that will be used as-is or can be
+provided as a serialized DynamicConfigProvider that will be resolved when the metadata client is first instantiated. If
+a  custom DynamicConfigProvider is used, be sure to register the provider with the DynamicConfigProviderRegistry before use.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`metadata.dbType`|The metadata server's database type (e.g. `mysql`)|Yes||
+|`metadata.host`|The metadata server's host name|If using derby|`localhost`|
+|`metadata.port`|The metadata server's port|If using derby|1527|
+|`metadata.connectUri`|The URI to use to connect to the metadata server|If not using derby||
+|`metadata.user`|The user to use when connecting to the metadata server|If required by the metadata database||
+|`metadata.password`|The password to use when connecting to the metadata server. This can optionally be a serialized instance of a Druid DynamicConfigProvider or a plain string|If required by the metadata database||
+|`metadata.dbcpProperties`|The connection pooling properties to use when connecting to the metadata server|No||
+|`metadata.baseName`|The base name used when creating Druid metadata tables|No|`druid`|
+
+### Druid Client Configs
+The configuration properties used to query the Druid cluster for segment metadata. Only used in the reader.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`broker.host`|The hostname of a broker in the Druid cluster to read from|No|`localhost`|
+|`broker.port`|The port of the broker in the Druid cluster to read from|No|8082|
+|`broker.numRetries`|The number of times to retry a timed-out segment metadata request|No|5|
+|`broker.retryWaitSeconds`|How long (in seconds) to wait before retrying a timed-out segment metadata request|No|5|
+|`broker.timeoutMilliseconds`|How long (in milliseconds) to wait before timing out a segment metadata request|No|300000|
+
+### Reader Configs
+The properties used to configure the DataSourceReader when reading data from Druid in Spark.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`table`|The Druid data source to read from|Yes||
+|`reader.deepStorageType`|The type of deep storage used to back the target Druid cluster|No|`local`|
+|`reader.segments`|A hard-coded list of Druid segments to read. If set, the table and druid client configurations are ignored and the specified segments are read directly. Must be deserializable into Druid DataSegment instances|No|
+|`reader.useCompactSketches`|Controls whether or not compact representations of complex metrics are used (only for metrics that support compact forms)|No|False|

Review comment:
       Are there complex metrics that have compact forms and _aren't_ sketches? Either way, at the moment this property is only applied to sketches (see `ComplexMetricRegistry`)

##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReader.scala
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import org.apache.druid.java.util.common.{DateTimes, Intervals, JodaUtils}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.clients.{DruidClient, DruidMetadataClient}
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.utils.{FilterUtils, SchemaUtils}
+import org.apache.druid.timeline.DataSegment
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition,
+  SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.joda.time.Interval
+
+import java.util.{List => JList}
+import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}
+
+/**
+  * A DruidDataSourceReader handles the actual work of reading data from Druid. It does this by querying to determine
+  * where Druid segments live in deep storage and then reading those segments into memory in order to avoid straining
+  * the Druid cluster. In general, users should not directly instantiate instances of this class but instead use
+  * sparkSession.read.format("druid").options(Map(...)).load(). If the schema of the data in Druid is known, overhead
+  * can be further reduced by providing it directly (e.g. sparkSession.read.format("druid").schema(schema).options...)
+  *
+  * To aid comprehensibility, some idiomatic Scala has been somewhat java-fied.
+  *
+  * @param schema
+  * @param conf
+  */
+class DruidDataSourceReader(
+                             var schema: Option[StructType] = None,
+                             conf: Configuration
+                           ) extends DataSourceReader
+  with SupportsPushDownRequiredColumns with SupportsPushDownFilters with SupportsScanColumnarBatch with Logging {
+  private lazy val metadataClient =
+    DruidDataSourceReader.createDruidMetaDataClient(conf)
+  private lazy val druidClient = DruidDataSourceReader.createDruidClient(conf)
+
+  private var filters: Array[Filter] = Array.empty
+  private var druidColumnTypes: Option[Set[String]] = Option.empty
+
+  override def readSchema(): StructType = {
+    if (schema.isDefined) {
+      schema.get
+    } else {
+      require(conf.isPresent(DruidConfigurationKeys.tableKey),
+        s"Must set ${DruidConfigurationKeys.tableKey}!")
+      // TODO: Optionally accept a granularity so that if lowerBound to upperBound spans more than
+      //  twice the granularity duration, we can send a list with two disjoint intervals and
+      //  minimize the load on the broker from having to merge large numbers of segments
+      val (lowerBound, upperBound) = FilterUtils.getTimeFilterBounds(filters)
+      val columnMap = druidClient.getSchema(
+        conf.getString(DruidConfigurationKeys.tableKey),
+        Some(List[Interval](Intervals.utc(
+          lowerBound.getOrElse(JodaUtils.MIN_INSTANT),
+          upperBound.getOrElse(JodaUtils.MAX_INSTANT)
+        )))
+      )
+      schema = Option(SchemaUtils.convertDruidSchemaToSparkSchema(columnMap))
+      druidColumnTypes = Option(columnMap.map(_._2._1).toSet)
+      schema.get
+    }
+  }
+
+  override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
+    // For now, one partition for each Druid segment partition
+    // Future improvements can use information from SegmentAnalyzer results to do smart things
+    if (schema.isEmpty) {
+      readSchema()
+    }
+    val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)
+    val filter = FilterUtils.mapFilters(filters, schema.get)
+    val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)
+    val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)
+    val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
+
+    // Allow passing hard-coded list of segments to load
+    if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {
+      val segments: JList[DataSegment] = MAPPER.readValue(
+        readerConf.getString(DruidConfigurationKeys.segmentsKey),
+        new TypeReference[JList[DataSegment]]() {}
+      )
+      segments.asScala
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    } else {
+      getSegments
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    }
+  }
+
+  override def pruneColumns(structType: StructType): Unit = {
+    schema = Option(structType)
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    readSchema()
+    filters.partition(FilterUtils.isSupportedFilter(_, schema.get)) match {
+      case (supported, unsupported) =>
+        this.filters = supported
+        unsupported
+    }
+  }
+
+  override def pushedFilters(): Array[Filter] = filters
+
+  private[v2] def getSegments: Seq[DataSegment] = {
+    require(conf.isPresent(DruidConfigurationKeys.tableKey),
+      s"Must set ${DruidConfigurationKeys.tableKey}!")
+
+    // Check filters for any bounds on __time
+    // Otherwise, we'd need to full scan the segments table
+    val (lowerTimeBound, upperTimeBound) = FilterUtils.getTimeFilterBounds(filters)
+
+    metadataClient.getSegmentPayloads(conf.getString(DruidConfigurationKeys.tableKey),

Review comment:
       Ah, I dropped the overshadow checking logic while chunking up PRs. Thanks for catching this. I've moved the logic to the metadata client because I think it makes more sense there now and addressed a TODO and added support for querying incomplete partitions if desired.

##########
File path: docs/operations/spark.md
##########
@@ -0,0 +1,279 @@
+---
+id: spark
+title: "Apache Spark Reader and Writer"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Apache Spark Reader and Writer for Druid
+
+## Reader
+The reader reads Druid segments from deep storage into Spark. It locates the segments to read and determines their
+schema if not provided by querying the brokers for the relevant metadata but otherwise does not interact with a running
+Druid cluster.
+
+Sample Code:
+```scala
+import org.apache.druid.spark.DruidDataFrameReader
+
+val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
+
+sparkSession
+  .read
+  .brokerHost("localhost")
+  .brokerPort(8082)
+  .metadataDbType("mysql")
+  .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
+  .metadataUser("druid")
+  .metadataPassword("diurd")
+  .dataSource("dataSource")
+  .deepStorage(deepStorageConfig)
+  .druid()
+```
+
+Alternatively, the reader can be configured via a properties map with no additional import needed:
+```scala
+val properties = Map[String, String](
+  "metadata.dbType" -> "mysql",
+  "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
+  "metadata.user" -> "druid",
+  "metadata.password" -> "diurd",
+  "broker.host" -> "localhost",
+  "broker.port" -> 8082,
+  "table" -> "dataSource",
+  "reader.deepStorageType" -> "local",
+  "local.storageDirectory" -> "/mnt/druid/druid-segments/"
+)
+
+sparkSession
+  .read
+  .format("druid")
+  .options(properties)
+  .load()
+```
+
+If you know the schema of the Druid data source you're reading from, you can save needing to determine the schema via
+calls to the broker with
+```scala
+sparkSession
+  .read
+  .format("druid")
+  .schema(schema)
+  .options(properties)
+  .load()
+```
+
+Filters should be applied to the read-in data frame before any [Spark actions](http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.Dataset)
+are triggered, to allow predicates to be pushed down to the reader and avoid full scans of the underlying Druid data.
+
+## Plugin Registries and Druid Extension Support
+One of Druid's strengths is its extensibility. Since these Spark readers and writers will not execute on a Druid cluster
+and won't have the ability to dynamically load classes or integrate with Druid's Guice injectors, Druid extensions can't
+be used directly. Instead, these connectors use a plugin registry architecture, including default plugins that support
+most functionality in `extensions-core`. Custom plugins consisting of a string name and one or more serializable
+generator functions must be registered before the first Spark action which would depend on them is called.
+
+### ComplexMetricRegistry
+The `ComplexMetricRegistry` provides support for serializing and deserializing complex metric types between Spark and
+Druid. Support for complex metric types in Druid core extensions is provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for additional complex metric types can
+use the `ComplexMetricRegistry.register` functions to associate serde functions with a given complex metric type. The
+name used to register custom behavior must match the complex metric type name reported by Druid.
+**Note that custom plugins must be registered with both the executors and the Spark driver.**
+
+### SegmentReaderRegistry
+The `SegmentReaderRegistry` provides support for reading segments from deep storage. Local, HDFS, GCS, S3, and Azure
+Storage deep storage implementations are supported by default.
+
+Users wishing to override the default behavior or who need to add support for additional deep storage implementations
+can use either `SegmentReaderRegistry.registerInitializer` (to provide any necessary Jackson configuration for
+deserializing a `LoadSpec` object from a segment load spec) or `SegmentReaderRegistry.registerLoadFunction` (to register
+a function for creating a URI from a segment load spec). These two functions correspond to the first and second approach
+[outlined below](#deep-storage). **Note that custom plugins must be registered on the executors, not the Spark driver.**
+
+### SQLConnectorRegistry
+The `SQLConnectorRegistry` provides support for configuring connectors to Druid metadata databases. Support for MySQL,
+PostgreSQL, and Derby databases are provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for additional metadata database
+implementations can use the `SQLConnectorRegistry.register` function. Custom connectors should be registered on the
+driver.
+
+## Deploying to a Spark cluster
+This extension can be run on a Spark cluster in one of two ways: bundled as part of an application jar or uploaded as
+a library jar to a Spark cluster and included in the classpath provided to Spark applications by the application
+manager. If the second approach is used, this extension should be built with
+`mvn clean package -pl spark` and the resulting jar `druid-spark-<VERSION>.jar`
+uploaded to the Spark cluster. Application jars should then be built with a compile-time dependency on
+`org.apache.druid:druid-spark` (e.g. marked as `provided` in Maven or with `compileOnly` in Gradle).
+
+## Configuration Reference
+
+### Metadata Client Configs
+The properties used to configure the client that interacts with the Druid metadata server directly. Used by both reader
+and the writer. The `metadataPassword` property can either be provided as a string that will be used as-is or can be

Review comment:
       Ah nice catch, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-951594933


   @JulianJaffePinterest I just noticed that Travis is not running for this branch since it's not included in the [travis configuration](https://github.com/apache/druid/blob/spark_druid_connector/.travis.yml#L16-L19). Can you please add the `spark_druid_connector` branch there? I think we can remove it later when we merge this branch to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r749911738



##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReader.scala
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.java.util.common.{FileUtils, IAE, ISE, StringUtils}
+import org.apache.druid.query.dimension.DefaultDimensionSpec
+import org.apache.druid.query.filter.DimFilter
+import org.apache.druid.segment.column.ValueType
+import org.apache.druid.segment.vector.{VectorColumnSelectorFactory, VectorCursor}
+import org.apache.druid.segment.{QueryableIndexStorageAdapter, VirtualColumns}
+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.registries.ComplexMetricRegistry
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
+import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, StringType,
+  StructField, StructType, TimestampType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+
+class DruidColumnarInputPartitionReader(

Review comment:
       `Columnar` is the Spark convention for this (e.g. the DataSourceReader mixes in `SupportsScanColumnarBatch` and this InputPartitionReader returns `ColumnarBatch`es of `ColumnVector`s). Since this will primarily be read by Druid developers I'm happy to ditch the Spark naming convention and name this `DruidVectorizedInputPartitionReader`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-966076783


   @jihoonson gentle reminder 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r763665571



##########
File path: spark/src/main/scala/org/apache/druid/spark/utils/DeepStorageConstructorHelpers.scala
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.utils
+
+import com.fasterxml.jackson.databind.MapperFeature
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.druid.common.aws.{AWSClientConfig, AWSCredentialsConfig, AWSEndpointConfig,
+  AWSModule, AWSProxyConfig}
+import org.apache.druid.common.gcp.GcpModule
+import org.apache.druid.java.util.common.StringUtils
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.TryWithResources
+import org.apache.druid.storage.azure.{AzureAccountConfig, AzureDataSegmentConfig,
+  AzureInputDataConfig, AzureStorage, AzureStorageDruidModule}
+import org.apache.druid.storage.google.{GoogleAccountConfig, GoogleInputDataConfig, GoogleStorage,
+  GoogleStorageDruidModule}
+import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig
+import org.apache.druid.storage.s3.{NoopServerSideEncryption, S3DataSegmentPusherConfig,
+  S3InputDataConfig, S3SSECustomConfig, S3SSEKmsConfig, S3StorageConfig, S3StorageDruidModule,
+  ServerSideEncryptingAmazonS3, ServerSideEncryption}
+import org.apache.hadoop.conf.{Configuration => HConf}
+
+import java.io.{ByteArrayInputStream, DataInputStream}
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+
+object DeepStorageConstructorHelpers extends TryWithResources {
+  /*
+   * Spark DataSourceOption property maps are case insensitive, by which they mean they lower-case all keys. Since all
+   * our user-provided property keys will come to us via a DataSourceOption, we need to use a case-insensisitive jackson
+   * mapper to deserialize property maps into objects. We want to be case-aware in the rest of our code, so we create a
+   * private, case-insensitive copy of our mapper here.
+   */
+  private val caseInsensitiveMapper = MAPPER.copy()
+    .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+    .registerModule(DefaultScalaModule)
+
+  // Local Storage Helpers
+
+  def createLocalDataSegmentPusherConfig(conf: Configuration): LocalDataSegmentPusherConfig = {
+    convertConfToInstance(conf, classOf[LocalDataSegmentPusherConfig])
+  }
+
+  // HDFS Storage Helpers
+
+  def createHdfsDataSegmentPusherConfig(conf: Configuration): HdfsDataSegmentPusherConfig = {
+    convertConfToInstance(conf, classOf[HdfsDataSegmentPusherConfig])
+  }
+
+  def createHadoopConfiguration(conf: Configuration): HConf = {
+    val hadoopConf = new HConf()
+    val confByteStream = new ByteArrayInputStream(
+      StringUtils.decodeBase64String(conf.getString(DruidConfigurationKeys.hdfsHadoopConfKey))
+    )
+    tryWithResources(confByteStream, new DataInputStream(confByteStream)){
+      case (_, inputStream: DataInputStream) => hadoopConf.readFields(inputStream)
+    }
+    hadoopConf
+  }
+
+  // S3 Storage Helpers
+
+  /**
+    * Create an S3DataSegmentPusherConfig from the relevant properties in CONF.
+    *
+    * *** Note that we explicitly override the default for `useS3aSchema`! ***
+    * Almost all users will want to use s3a, not s3n, and we have no backwards-compatibility to maintain.
+    *
+    * @param conf The Configuration object specifying the S3DataSegmentPusherConfig to create.
+    * @return An S3DataSegmentPusherConfig derived from the properties specified in CONF.
+    */
+  def createS3DataSegmentPusherConfig(conf: Configuration): S3DataSegmentPusherConfig = {
+    if (!conf.isPresent(DruidConfigurationKeys.s3UseS3ASchemaKey)) {
+      convertConfToInstance(conf.merge(
+        Configuration.fromKeyValue(DruidConfigurationKeys.s3UseS3ASchemaKey, "true")
+      ), classOf[S3DataSegmentPusherConfig])
+    } else {
+      convertConfToInstance(conf, classOf[S3DataSegmentPusherConfig])
+    }
+  }
+
+  def createS3InputDataConfig(conf: Configuration): S3InputDataConfig = {
+    convertConfToInstance(conf, classOf[S3InputDataConfig])
+  }
+
+  def createServerSideEncryptingAmazonS3(conf: Configuration): ServerSideEncryptingAmazonS3 = {
+    val (credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig) =
+      createConfigsForServerSideEncryptingAmazonS3(conf)
+
+    val awsModule = new AWSModule
+    val s3Module = new S3StorageDruidModule
+    val credentialsProvider = awsModule.getAWSCredentialsProvider(credentialsConfig)
+    s3Module.getAmazonS3Client(
+      s3Module.getServerSideEncryptingAmazonS3Builder(
+        credentialsProvider,
+        proxyConfig,
+        endpointConfig,
+        clientConfig,
+        s3StorageConfig
+      )
+    )
+  }
+
+  def createConfigsForServerSideEncryptingAmazonS3(conf: Configuration):
+  (AWSCredentialsConfig, AWSProxyConfig, AWSEndpointConfig, AWSClientConfig, S3StorageConfig) = {
+    val credentialsConfig = convertConfToInstance(conf, classOf[AWSCredentialsConfig])
+
+    val proxyConfig = convertConfToInstance(conf.dive("proxy"), classOf[AWSProxyConfig])
+
+    val endpointConfig = convertConfToInstance(conf.dive("endpoint"), classOf[AWSEndpointConfig])
+
+    val clientConfig = convertConfToInstance(conf.dive("client"), classOf[AWSClientConfig])
+
+    val s3StorageConfig = createS3StorageConfig(conf.dive(DruidConfigurationKeys.s3ServerSideEncryptionPrefix))
+    (credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig)
+  }
+
+  /**
+    * A helper method for creating instances of S3StorageConfigs from a Configuration. While I'm sure there's a simple
+    * solution I'm missing, I would have thought that something like the following would have worked:
+    *
+    * ```
+    * val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])
+    * caseInsensitiveMapper.setInjectableValues(new InjectableValues.Std().addValue(classOf[S3SSEKmsConfig], kmsConfig))
+    * val ser = caseInsensitiveMapper.writeValueAsString(Map[String, String]("type" -> "kms"))
+    * caseInsensitiveMapper.readValue[ServerSideEncryption](ser, new TypeReference[ServerSideEncryption] {})
+    * ```
+    *
+    * However, the code above throws an com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Invalid
+    * definition for property `config` (of type `org.apache.druid.storage.s3.KmsServerSideEncryption`): Could not find
+    * creator property with name 'config' (known Creator properties: [])
+    *
+    * I _think_ that the root cause is that ServerSideEncryption is abstract, but the error message above isn't
+    * what I would expect. Nevertheless, the simple solution would be to serialize to a KmsServerSideEncryption
+    * instance and then cast to the base ServerSideEncryption to assign. Unfortunately, KmsServerSideEncryption
+    * is package-private, so we can't access the class here. Since we already have the config object and we
+    * need to muck about with field visibility, we take the shortcut and just make the constructor accessible. This
+    * solution generalizes to the CustomServerSideEncyption case as well.
+    */
+  def createS3StorageConfig(conf: Configuration): S3StorageConfig = {
+    // There's probably a more elegant way to do this that would allow us to transparently support new sse types, but
+    // this will work for now.
+    val sseType = conf.get(DruidConfigurationKeys.s3ServerSideEncryptionTypeKey)
+
+    // Getting the list of subtypes since we'll need to use it to grab references to the package-private implementations
+    val config = caseInsensitiveMapper.getDeserializationConfig
+    val ac = AnnotatedClass.constructWithoutSuperTypes(classOf[ServerSideEncryption], config)
+    val subtypes = caseInsensitiveMapper.getSubtypeResolver.collectAndResolveSubtypesByClass(config, ac)
+
+    val serverSideEncryption: ServerSideEncryption = sseType match {
+      case Some("s3") =>
+        val clazz = subtypes.asScala.filter(_.getName == "s3").head.getType
+        val constructor = clazz.getDeclaredConstructor()
+        constructor.setAccessible(true)
+        constructor.newInstance().asInstanceOf[ServerSideEncryption]
+      case Some("kms") =>
+        val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])
+        val clazz = subtypes.asScala.filter(_.getName == "kms").head.getType
+        val constructor = clazz.getDeclaredConstructor(classOf[S3SSEKmsConfig])
+        constructor.setAccessible(true)
+        constructor.newInstance(kmsConfig).asInstanceOf[ServerSideEncryption]
+      case Some("custom") =>
+        val customConfig = convertConfToInstance(conf.dive("custom"), classOf[S3SSECustomConfig])
+        val clazz = subtypes.asScala.filter(_.getName == "custom").head.getType
+        val constructor = clazz.getDeclaredConstructor(classOf[S3SSECustomConfig])
+        constructor.setAccessible(true)
+        constructor.newInstance(customConfig).asInstanceOf[ServerSideEncryption]
+      case _ => new NoopServerSideEncryption
+    }
+    new S3StorageConfig(serverSideEncryption)
+  }
+
+  // GCS Storage Helpers
+
+  def createGoogleAcountConfig(conf: Configuration): GoogleAccountConfig = {
+    convertConfToInstance(conf, classOf[GoogleAccountConfig])
+  }
+
+  def createGoogleInputDataConfig(conf: Configuration): GoogleInputDataConfig = {
+    convertConfToInstance(conf, classOf[GoogleInputDataConfig])
+  }
+
+  def createGoogleStorage(): GoogleStorage = {
+    val gcpModule = new GcpModule
+    val gcpStorageModule = new GoogleStorageDruidModule
+
+    val httpTransport = gcpModule.getHttpTransport
+    val jsonFactory = gcpModule.getJsonFactory
+    val requestInitializer = gcpModule.getHttpRequestInitializer(httpTransport, jsonFactory)
+    gcpStorageModule.getGoogleStorage(httpTransport, jsonFactory, requestInitializer)
+  }
+
+  // Azure Storage Helpers
+
+  def createAzureDataSegmentConfig(conf: Configuration): AzureDataSegmentConfig = {
+    convertConfToInstance(conf, classOf[AzureDataSegmentConfig])
+  }
+
+  def createAzureInputDataConfig(conf: Configuration): AzureInputDataConfig = {
+    convertConfToInstance(conf, classOf[AzureInputDataConfig])
+  }
+
+  def createAzureAccountConfig(conf: Configuration): AzureAccountConfig = {
+    convertConfToInstance(conf, classOf[AzureAccountConfig])
+  }
+
+  def createAzureStorage(conf: Configuration): AzureStorage = {

Review comment:
       I was mostly wondering what people should do if they want to register their custom deep storage type to the spark connector. But now looking at the code again, I think I was misunderstood the code probably. I see that users can register their own custom type in the registry. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r760100147



##########
File path: spark/src/main/scala/org/apache/druid/spark/utils/DeepStorageConstructorHelpers.scala
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.utils
+
+import com.fasterxml.jackson.databind.MapperFeature
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.druid.common.aws.{AWSClientConfig, AWSCredentialsConfig, AWSEndpointConfig,
+  AWSModule, AWSProxyConfig}
+import org.apache.druid.common.gcp.GcpModule
+import org.apache.druid.java.util.common.StringUtils
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.TryWithResources
+import org.apache.druid.storage.azure.{AzureAccountConfig, AzureDataSegmentConfig,
+  AzureInputDataConfig, AzureStorage, AzureStorageDruidModule}
+import org.apache.druid.storage.google.{GoogleAccountConfig, GoogleInputDataConfig, GoogleStorage,
+  GoogleStorageDruidModule}
+import org.apache.druid.storage.hdfs.HdfsDataSegmentPusherConfig
+import org.apache.druid.storage.s3.{NoopServerSideEncryption, S3DataSegmentPusherConfig,
+  S3InputDataConfig, S3SSECustomConfig, S3SSEKmsConfig, S3StorageConfig, S3StorageDruidModule,
+  ServerSideEncryptingAmazonS3, ServerSideEncryption}
+import org.apache.hadoop.conf.{Configuration => HConf}
+
+import java.io.{ByteArrayInputStream, DataInputStream}
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+
+object DeepStorageConstructorHelpers extends TryWithResources {
+  /*
+   * Spark DataSourceOption property maps are case insensitive, by which they mean they lower-case all keys. Since all
+   * our user-provided property keys will come to us via a DataSourceOption, we need to use a case-insensisitive jackson
+   * mapper to deserialize property maps into objects. We want to be case-aware in the rest of our code, so we create a
+   * private, case-insensitive copy of our mapper here.
+   */
+  private val caseInsensitiveMapper = MAPPER.copy()
+    .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
+    .registerModule(DefaultScalaModule)
+
+  // Local Storage Helpers
+
+  def createLocalDataSegmentPusherConfig(conf: Configuration): LocalDataSegmentPusherConfig = {
+    convertConfToInstance(conf, classOf[LocalDataSegmentPusherConfig])
+  }
+
+  // HDFS Storage Helpers
+
+  def createHdfsDataSegmentPusherConfig(conf: Configuration): HdfsDataSegmentPusherConfig = {
+    convertConfToInstance(conf, classOf[HdfsDataSegmentPusherConfig])
+  }
+
+  def createHadoopConfiguration(conf: Configuration): HConf = {
+    val hadoopConf = new HConf()
+    val confByteStream = new ByteArrayInputStream(
+      StringUtils.decodeBase64String(conf.getString(DruidConfigurationKeys.hdfsHadoopConfKey))
+    )
+    tryWithResources(confByteStream, new DataInputStream(confByteStream)){
+      case (_, inputStream: DataInputStream) => hadoopConf.readFields(inputStream)
+    }
+    hadoopConf
+  }
+
+  // S3 Storage Helpers
+
+  /**
+    * Create an S3DataSegmentPusherConfig from the relevant properties in CONF.
+    *
+    * *** Note that we explicitly override the default for `useS3aSchema`! ***
+    * Almost all users will want to use s3a, not s3n, and we have no backwards-compatibility to maintain.
+    *
+    * @param conf The Configuration object specifying the S3DataSegmentPusherConfig to create.
+    * @return An S3DataSegmentPusherConfig derived from the properties specified in CONF.
+    */
+  def createS3DataSegmentPusherConfig(conf: Configuration): S3DataSegmentPusherConfig = {
+    if (!conf.isPresent(DruidConfigurationKeys.s3UseS3ASchemaKey)) {
+      convertConfToInstance(conf.merge(
+        Configuration.fromKeyValue(DruidConfigurationKeys.s3UseS3ASchemaKey, "true")
+      ), classOf[S3DataSegmentPusherConfig])
+    } else {
+      convertConfToInstance(conf, classOf[S3DataSegmentPusherConfig])
+    }
+  }
+
+  def createS3InputDataConfig(conf: Configuration): S3InputDataConfig = {
+    convertConfToInstance(conf, classOf[S3InputDataConfig])
+  }
+
+  def createServerSideEncryptingAmazonS3(conf: Configuration): ServerSideEncryptingAmazonS3 = {
+    val (credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig) =
+      createConfigsForServerSideEncryptingAmazonS3(conf)
+
+    val awsModule = new AWSModule
+    val s3Module = new S3StorageDruidModule
+    val credentialsProvider = awsModule.getAWSCredentialsProvider(credentialsConfig)
+    s3Module.getAmazonS3Client(
+      s3Module.getServerSideEncryptingAmazonS3Builder(
+        credentialsProvider,
+        proxyConfig,
+        endpointConfig,
+        clientConfig,
+        s3StorageConfig
+      )
+    )
+  }
+
+  def createConfigsForServerSideEncryptingAmazonS3(conf: Configuration):
+  (AWSCredentialsConfig, AWSProxyConfig, AWSEndpointConfig, AWSClientConfig, S3StorageConfig) = {
+    val credentialsConfig = convertConfToInstance(conf, classOf[AWSCredentialsConfig])
+
+    val proxyConfig = convertConfToInstance(conf.dive("proxy"), classOf[AWSProxyConfig])
+
+    val endpointConfig = convertConfToInstance(conf.dive("endpoint"), classOf[AWSEndpointConfig])
+
+    val clientConfig = convertConfToInstance(conf.dive("client"), classOf[AWSClientConfig])
+
+    val s3StorageConfig = createS3StorageConfig(conf.dive(DruidConfigurationKeys.s3ServerSideEncryptionPrefix))
+    (credentialsConfig, proxyConfig, endpointConfig, clientConfig, s3StorageConfig)
+  }
+
+  /**
+    * A helper method for creating instances of S3StorageConfigs from a Configuration. While I'm sure there's a simple
+    * solution I'm missing, I would have thought that something like the following would have worked:
+    *
+    * ```
+    * val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])
+    * caseInsensitiveMapper.setInjectableValues(new InjectableValues.Std().addValue(classOf[S3SSEKmsConfig], kmsConfig))
+    * val ser = caseInsensitiveMapper.writeValueAsString(Map[String, String]("type" -> "kms"))
+    * caseInsensitiveMapper.readValue[ServerSideEncryption](ser, new TypeReference[ServerSideEncryption] {})
+    * ```
+    *
+    * However, the code above throws an com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Invalid
+    * definition for property `config` (of type `org.apache.druid.storage.s3.KmsServerSideEncryption`): Could not find
+    * creator property with name 'config' (known Creator properties: [])
+    *
+    * I _think_ that the root cause is that ServerSideEncryption is abstract, but the error message above isn't
+    * what I would expect. Nevertheless, the simple solution would be to serialize to a KmsServerSideEncryption
+    * instance and then cast to the base ServerSideEncryption to assign. Unfortunately, KmsServerSideEncryption
+    * is package-private, so we can't access the class here. Since we already have the config object and we
+    * need to muck about with field visibility, we take the shortcut and just make the constructor accessible. This
+    * solution generalizes to the CustomServerSideEncyption case as well.
+    */
+  def createS3StorageConfig(conf: Configuration): S3StorageConfig = {
+    // There's probably a more elegant way to do this that would allow us to transparently support new sse types, but
+    // this will work for now.
+    val sseType = conf.get(DruidConfigurationKeys.s3ServerSideEncryptionTypeKey)
+
+    // Getting the list of subtypes since we'll need to use it to grab references to the package-private implementations
+    val config = caseInsensitiveMapper.getDeserializationConfig
+    val ac = AnnotatedClass.constructWithoutSuperTypes(classOf[ServerSideEncryption], config)
+    val subtypes = caseInsensitiveMapper.getSubtypeResolver.collectAndResolveSubtypesByClass(config, ac)
+
+    val serverSideEncryption: ServerSideEncryption = sseType match {
+      case Some("s3") =>
+        val clazz = subtypes.asScala.filter(_.getName == "s3").head.getType
+        val constructor = clazz.getDeclaredConstructor()
+        constructor.setAccessible(true)
+        constructor.newInstance().asInstanceOf[ServerSideEncryption]
+      case Some("kms") =>
+        val kmsConfig = convertConfToInstance(conf.dive("kms"), classOf[S3SSEKmsConfig])
+        val clazz = subtypes.asScala.filter(_.getName == "kms").head.getType
+        val constructor = clazz.getDeclaredConstructor(classOf[S3SSEKmsConfig])
+        constructor.setAccessible(true)
+        constructor.newInstance(kmsConfig).asInstanceOf[ServerSideEncryption]
+      case Some("custom") =>
+        val customConfig = convertConfToInstance(conf.dive("custom"), classOf[S3SSECustomConfig])
+        val clazz = subtypes.asScala.filter(_.getName == "custom").head.getType
+        val constructor = clazz.getDeclaredConstructor(classOf[S3SSECustomConfig])
+        constructor.setAccessible(true)
+        constructor.newInstance(customConfig).asInstanceOf[ServerSideEncryption]
+      case _ => new NoopServerSideEncryption
+    }
+    new S3StorageConfig(serverSideEncryption)
+  }
+
+  // GCS Storage Helpers
+
+  def createGoogleAcountConfig(conf: Configuration): GoogleAccountConfig = {
+    convertConfToInstance(conf, classOf[GoogleAccountConfig])
+  }
+
+  def createGoogleInputDataConfig(conf: Configuration): GoogleInputDataConfig = {
+    convertConfToInstance(conf, classOf[GoogleInputDataConfig])
+  }
+
+  def createGoogleStorage(): GoogleStorage = {
+    val gcpModule = new GcpModule
+    val gcpStorageModule = new GoogleStorageDruidModule
+
+    val httpTransport = gcpModule.getHttpTransport
+    val jsonFactory = gcpModule.getJsonFactory
+    val requestInitializer = gcpModule.getHttpRequestInitializer(httpTransport, jsonFactory)
+    gcpStorageModule.getGoogleStorage(httpTransport, jsonFactory, requestInitializer)
+  }
+
+  // Azure Storage Helpers
+
+  def createAzureDataSegmentConfig(conf: Configuration): AzureDataSegmentConfig = {
+    convertConfToInstance(conf, classOf[AzureDataSegmentConfig])
+  }
+
+  def createAzureInputDataConfig(conf: Configuration): AzureInputDataConfig = {
+    convertConfToInstance(conf, classOf[AzureInputDataConfig])
+  }
+
+  def createAzureAccountConfig(conf: Configuration): AzureAccountConfig = {
+    convertConfToInstance(conf, classOf[AzureAccountConfig])
+  }
+
+  def createAzureStorage(conf: Configuration): AzureStorage = {

Review comment:
       Users can add new deep storage handlers (or override the default ones) using the various plugin registries with no need to recompile the spark connector. If you're asking what happens when Druid adds support for new deep storage types, then someone needs to tell spark how to access the new deep storage type. This can be done either in the spark module or theoretically in the new druid extension, although users would then need to register the new functions themselves. I'm not sure I follow the concern about needing to recompile the spark connector - presumably Druid itself will only add support for new deep storage types when new versions are released, which will mean compiling from source anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-983553686


   @jihoonson no worries I know how it goes 🙂. I've split out the spark module tests into separate Travis jobs as you requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
suneet-s commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-990783074


   @JulianJaffePinterest Sorry about that. I canceled your travis run because I was trying to make space for the tests to run on 0.22.1 first. If I cancel again I'll restart your travis job - no need to close and re-open.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-990782132


   Cancel dance again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest edited a comment on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest edited a comment on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-952640178


   Ah I see #11814. Let me see if the solution to that issue also solves this issue
   
   Edit: #11799 is the fix for the root issue. I'll pull these changes in and then we can ignore my versions when we eventually rebase off master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-953674869


   @jihoonson thanks for starting your review. The end-to-end tests I was referring to in our earlier discussion haven't been added yet (since this PR doesn't include the writing logic), but when they are added no changes will be necessary - they're lightweight enough run as part of `mvn test`. This PR does include the unit tests for the reader logic, which also run via `mvn test`, as expected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-988617992


   @jihoonson do you know if it's possible to get the logs mentioned in the error message (`Error occurred in starting fork, check output in log`)? I'm having trouble replicating this issue locally so any more information would be helpful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r749910270



##########
File path: spark/src/main/scala/org/apache/druid/spark/utils/FilterUtils.scala
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.utils
+
+import org.apache.druid.java.util.common.{ISE, JodaUtils}
+import org.apache.druid.query.filter.{AndDimFilter, BoundDimFilter, DimFilter, InDimFilter,
+  NotDimFilter, OrDimFilter, RegexDimFilter, SelectorDimFilter}
+import org.apache.druid.query.ordering.{StringComparator, StringComparators}
+import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan,
+  GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains,
+  StringEndsWith, StringStartsWith}
+import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, IntegerType,
+  LongType, StringType, StructType, TimestampType}
+
+import scala.collection.JavaConverters.{seqAsJavaListConverter, setAsJavaSetConverter}
+
+/**
+  * Converters and utilities for working with Spark and Druid Filters.
+  */
+object FilterUtils {
+  /**
+    * Map an array of Spark filters FILTERS to a Druid dim filter or None if filters is empty.
+    *
+    * We return a DimFilter instead of a Filter and force callers to call .toFilter
+    * or .toOptimizedFilter to get a filter because callers can't covert back to a DimFilter from a
+    * Filter.
+    *
+    * @param filters The spark filters to map to a Druid filter.
+    * @return A Druid filter corresponding to the union of filter conditions enumerated in FILTERS.
+    */
+  def mapFilters(filters: Array[Filter], schema: StructType): Option[DimFilter] = {
+    if (filters.isEmpty) {
+      Option.empty[DimFilter]
+    } else {
+      Some(new AndDimFilter(filters.map(mapFilter(_, schema)).toList.asJava).optimize())
+    }
+  }
+
+  /**
+    * Convert a Spark-style filter FILTER to a Druid-style filter.
+    *
+    * @param filter The Spark filter to map to a Druid filter.
+    * @return The Druid filter corresponding to the filter condition described by FILTER.
+    */
+  def mapFilter(filter: Filter, schema: StructType): DimFilter = { // scalastyle:ignore method.length

Review comment:
       Looking at this code again though, there's probably room in the future to better tie it to the two null handling schemes and increase the number of filters we're able to push down. This is just an efficiency improvement though - if we can't handle a given set of filters, Spark will execute them itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-951574781


   Closing and reopening to trigger Travis.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson closed pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
jihoonson closed pull request #11823:
URL: https://github.com/apache/druid/pull/11823


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r736176457



##########
File path: docs/operations/spark.md
##########
@@ -0,0 +1,279 @@
+---
+id: spark
+title: "Apache Spark Reader and Writer"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Apache Spark Reader and Writer for Druid
+
+## Reader
+The reader reads Druid segments from deep storage into Spark. It locates the segments to read and determines their
+schema if not provided by querying the brokers for the relevant metadata but otherwise does not interact with a running
+Druid cluster.
+
+Sample Code:
+```scala
+import org.apache.druid.spark.DruidDataFrameReader
+
+val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
+
+sparkSession
+  .read
+  .brokerHost("localhost")
+  .brokerPort(8082)
+  .metadataDbType("mysql")
+  .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
+  .metadataUser("druid")
+  .metadataPassword("diurd")
+  .dataSource("dataSource")
+  .deepStorage(deepStorageConfig)
+  .druid()
+```
+
+Alternatively, the reader can be configured via a properties map with no additional import needed:
+```scala
+val properties = Map[String, String](
+  "metadata.dbType" -> "mysql",
+  "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
+  "metadata.user" -> "druid",
+  "metadata.password" -> "diurd",
+  "broker.host" -> "localhost",
+  "broker.port" -> 8082,
+  "table" -> "dataSource",
+  "reader.deepStorageType" -> "local",
+  "local.storageDirectory" -> "/mnt/druid/druid-segments/"
+)
+
+sparkSession
+  .read
+  .format("druid")
+  .options(properties)
+  .load()
+```
+
+If you know the schema of the Druid data source you're reading from, you can save needing to determine the schema via
+calls to the broker with
+```scala
+sparkSession
+  .read
+  .format("druid")
+  .schema(schema)
+  .options(properties)
+  .load()
+```
+
+Filters should be applied to the read-in data frame before any [Spark actions](http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.Dataset)
+are triggered, to allow predicates to be pushed down to the reader and avoid full scans of the underlying Druid data.
+
+## Plugin Registries and Druid Extension Support
+One of Druid's strengths is its extensibility. Since these Spark readers and writers will not execute on a Druid cluster
+and won't have the ability to dynamically load classes or integrate with Druid's Guice injectors, Druid extensions can't
+be used directly. Instead, these connectors use a plugin registry architecture, including default plugins that support
+most functionality in `extensions-core`. Custom plugins consisting of a string name and one or more serializable
+generator functions must be registered before the first Spark action which would depend on them is called.
+
+### ComplexMetricRegistry
+The `ComplexMetricRegistry` provides support for serializing and deserializing complex metric types between Spark and
+Druid. Support for complex metric types in Druid core extensions is provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for additional complex metric types can
+use the `ComplexMetricRegistry.register` functions to associate serde functions with a given complex metric type. The
+name used to register custom behavior must match the complex metric type name reported by Druid.
+**Note that custom plugins must be registered with both the executors and the Spark driver.**
+
+### SegmentReaderRegistry
+The `SegmentReaderRegistry` provides support for reading segments from deep storage. Local, HDFS, GCS, S3, and Azure
+Storage deep storage implementations are supported by default.
+
+Users wishing to override the default behavior or who need to add support for additional deep storage implementations
+can use either `SegmentReaderRegistry.registerInitializer` (to provide any necessary Jackson configuration for
+deserializing a `LoadSpec` object from a segment load spec) or `SegmentReaderRegistry.registerLoadFunction` (to register
+a function for creating a URI from a segment load spec). These two functions correspond to the first and second approach
+[outlined below](#deep-storage). **Note that custom plugins must be registered on the executors, not the Spark driver.**
+
+### SQLConnectorRegistry
+The `SQLConnectorRegistry` provides support for configuring connectors to Druid metadata databases. Support for MySQL,
+PostgreSQL, and Derby databases are provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for additional metadata database
+implementations can use the `SQLConnectorRegistry.register` function. Custom connectors should be registered on the
+driver.
+
+## Deploying to a Spark cluster
+This extension can be run on a Spark cluster in one of two ways: bundled as part of an application jar or uploaded as
+a library jar to a Spark cluster and included in the classpath provided to Spark applications by the application
+manager. If the second approach is used, this extension should be built with
+`mvn clean package -pl spark` and the resulting jar `druid-spark-<VERSION>.jar`
+uploaded to the Spark cluster. Application jars should then be built with a compile-time dependency on
+`org.apache.druid:druid-spark` (e.g. marked as `provided` in Maven or with `compileOnly` in Gradle).
+
+## Configuration Reference
+
+### Metadata Client Configs
+The properties used to configure the client that interacts with the Druid metadata server directly. Used by both reader
+and the writer. The `metadataPassword` property can either be provided as a string that will be used as-is or can be

Review comment:
       Did you mean this?
   
   ```suggestion
   and the writer. The `metadata.password` property can either be provided as a string that will be used as-is or can be
   ```

##########
File path: docs/operations/spark.md
##########
@@ -0,0 +1,279 @@
+---
+id: spark
+title: "Apache Spark Reader and Writer"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Apache Spark Reader and Writer for Druid
+
+## Reader
+The reader reads Druid segments from deep storage into Spark. It locates the segments to read and determines their
+schema if not provided by querying the brokers for the relevant metadata but otherwise does not interact with a running
+Druid cluster.
+
+Sample Code:
+```scala
+import org.apache.druid.spark.DruidDataFrameReader
+
+val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")
+
+sparkSession
+  .read
+  .brokerHost("localhost")
+  .brokerPort(8082)
+  .metadataDbType("mysql")
+  .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
+  .metadataUser("druid")
+  .metadataPassword("diurd")
+  .dataSource("dataSource")
+  .deepStorage(deepStorageConfig)
+  .druid()
+```
+
+Alternatively, the reader can be configured via a properties map with no additional import needed:
+```scala
+val properties = Map[String, String](
+  "metadata.dbType" -> "mysql",
+  "metadata.connectUri" -> "jdbc:mysql://druid.metadata.server:3306/druid",
+  "metadata.user" -> "druid",
+  "metadata.password" -> "diurd",
+  "broker.host" -> "localhost",
+  "broker.port" -> 8082,
+  "table" -> "dataSource",
+  "reader.deepStorageType" -> "local",
+  "local.storageDirectory" -> "/mnt/druid/druid-segments/"
+)
+
+sparkSession
+  .read
+  .format("druid")
+  .options(properties)
+  .load()
+```
+
+If you know the schema of the Druid data source you're reading from, you can save needing to determine the schema via
+calls to the broker with
+```scala
+sparkSession
+  .read
+  .format("druid")
+  .schema(schema)
+  .options(properties)
+  .load()
+```
+
+Filters should be applied to the read-in data frame before any [Spark actions](http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.Dataset)
+are triggered, to allow predicates to be pushed down to the reader and avoid full scans of the underlying Druid data.
+
+## Plugin Registries and Druid Extension Support
+One of Druid's strengths is its extensibility. Since these Spark readers and writers will not execute on a Druid cluster
+and won't have the ability to dynamically load classes or integrate with Druid's Guice injectors, Druid extensions can't
+be used directly. Instead, these connectors use a plugin registry architecture, including default plugins that support
+most functionality in `extensions-core`. Custom plugins consisting of a string name and one or more serializable
+generator functions must be registered before the first Spark action which would depend on them is called.
+
+### ComplexMetricRegistry
+The `ComplexMetricRegistry` provides support for serializing and deserializing complex metric types between Spark and
+Druid. Support for complex metric types in Druid core extensions is provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for additional complex metric types can
+use the `ComplexMetricRegistry.register` functions to associate serde functions with a given complex metric type. The
+name used to register custom behavior must match the complex metric type name reported by Druid.
+**Note that custom plugins must be registered with both the executors and the Spark driver.**
+
+### SegmentReaderRegistry
+The `SegmentReaderRegistry` provides support for reading segments from deep storage. Local, HDFS, GCS, S3, and Azure
+Storage deep storage implementations are supported by default.
+
+Users wishing to override the default behavior or who need to add support for additional deep storage implementations
+can use either `SegmentReaderRegistry.registerInitializer` (to provide any necessary Jackson configuration for
+deserializing a `LoadSpec` object from a segment load spec) or `SegmentReaderRegistry.registerLoadFunction` (to register
+a function for creating a URI from a segment load spec). These two functions correspond to the first and second approach
+[outlined below](#deep-storage). **Note that custom plugins must be registered on the executors, not the Spark driver.**
+
+### SQLConnectorRegistry
+The `SQLConnectorRegistry` provides support for configuring connectors to Druid metadata databases. Support for MySQL,
+PostgreSQL, and Derby databases are provided out of the box.
+
+Users wishing to override the default behavior or who need to add support for additional metadata database
+implementations can use the `SQLConnectorRegistry.register` function. Custom connectors should be registered on the
+driver.
+
+## Deploying to a Spark cluster
+This extension can be run on a Spark cluster in one of two ways: bundled as part of an application jar or uploaded as
+a library jar to a Spark cluster and included in the classpath provided to Spark applications by the application
+manager. If the second approach is used, this extension should be built with
+`mvn clean package -pl spark` and the resulting jar `druid-spark-<VERSION>.jar`
+uploaded to the Spark cluster. Application jars should then be built with a compile-time dependency on
+`org.apache.druid:druid-spark` (e.g. marked as `provided` in Maven or with `compileOnly` in Gradle).
+
+## Configuration Reference
+
+### Metadata Client Configs
+The properties used to configure the client that interacts with the Druid metadata server directly. Used by both reader
+and the writer. The `metadataPassword` property can either be provided as a string that will be used as-is or can be
+provided as a serialized DynamicConfigProvider that will be resolved when the metadata client is first instantiated. If
+a  custom DynamicConfigProvider is used, be sure to register the provider with the DynamicConfigProviderRegistry before use.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`metadata.dbType`|The metadata server's database type (e.g. `mysql`)|Yes||
+|`metadata.host`|The metadata server's host name|If using derby|`localhost`|
+|`metadata.port`|The metadata server's port|If using derby|1527|
+|`metadata.connectUri`|The URI to use to connect to the metadata server|If not using derby||
+|`metadata.user`|The user to use when connecting to the metadata server|If required by the metadata database||
+|`metadata.password`|The password to use when connecting to the metadata server. This can optionally be a serialized instance of a Druid DynamicConfigProvider or a plain string|If required by the metadata database||
+|`metadata.dbcpProperties`|The connection pooling properties to use when connecting to the metadata server|No||
+|`metadata.baseName`|The base name used when creating Druid metadata tables|No|`druid`|
+
+### Druid Client Configs
+The configuration properties used to query the Druid cluster for segment metadata. Only used in the reader.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`broker.host`|The hostname of a broker in the Druid cluster to read from|No|`localhost`|
+|`broker.port`|The port of the broker in the Druid cluster to read from|No|8082|
+|`broker.numRetries`|The number of times to retry a timed-out segment metadata request|No|5|
+|`broker.retryWaitSeconds`|How long (in seconds) to wait before retrying a timed-out segment metadata request|No|5|
+|`broker.timeoutMilliseconds`|How long (in milliseconds) to wait before timing out a segment metadata request|No|300000|
+
+### Reader Configs
+The properties used to configure the DataSourceReader when reading data from Druid in Spark.
+
+|Key|Description|Required|Default|
+|---|-----------|--------|-------|
+|`table`|The Druid data source to read from|Yes||
+|`reader.deepStorageType`|The type of deep storage used to back the target Druid cluster|No|`local`|
+|`reader.segments`|A hard-coded list of Druid segments to read. If set, the table and druid client configurations are ignored and the specified segments are read directly. Must be deserializable into Druid DataSegment instances|No|
+|`reader.useCompactSketches`|Controls whether or not compact representations of complex metrics are used (only for metrics that support compact forms)|No|False|

Review comment:
       Should it be `reader.useCompactMetrics` if it applies to all metrics that support compact forms? Or does it apply to only sketches?

##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReader.scala
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import org.apache.druid.java.util.common.{DateTimes, Intervals, JodaUtils}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.clients.{DruidClient, DruidMetadataClient}
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.utils.{FilterUtils, SchemaUtils}
+import org.apache.druid.timeline.DataSegment
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition,
+  SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.joda.time.Interval
+
+import java.util.{List => JList}
+import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}
+
+/**
+  * A DruidDataSourceReader handles the actual work of reading data from Druid. It does this by querying to determine
+  * where Druid segments live in deep storage and then reading those segments into memory in order to avoid straining
+  * the Druid cluster. In general, users should not directly instantiate instances of this class but instead use
+  * sparkSession.read.format("druid").options(Map(...)).load(). If the schema of the data in Druid is known, overhead
+  * can be further reduced by providing it directly (e.g. sparkSession.read.format("druid").schema(schema).options...)
+  *
+  * To aid comprehensibility, some idiomatic Scala has been somewhat java-fied.
+  *
+  * @param schema
+  * @param conf
+  */
+class DruidDataSourceReader(
+                             var schema: Option[StructType] = None,
+                             conf: Configuration
+                           ) extends DataSourceReader
+  with SupportsPushDownRequiredColumns with SupportsPushDownFilters with SupportsScanColumnarBatch with Logging {
+  private lazy val metadataClient =
+    DruidDataSourceReader.createDruidMetaDataClient(conf)
+  private lazy val druidClient = DruidDataSourceReader.createDruidClient(conf)
+
+  private var filters: Array[Filter] = Array.empty
+  private var druidColumnTypes: Option[Set[String]] = Option.empty
+
+  override def readSchema(): StructType = {
+    if (schema.isDefined) {
+      schema.get
+    } else {
+      require(conf.isPresent(DruidConfigurationKeys.tableKey),
+        s"Must set ${DruidConfigurationKeys.tableKey}!")
+      // TODO: Optionally accept a granularity so that if lowerBound to upperBound spans more than
+      //  twice the granularity duration, we can send a list with two disjoint intervals and
+      //  minimize the load on the broker from having to merge large numbers of segments
+      val (lowerBound, upperBound) = FilterUtils.getTimeFilterBounds(filters)
+      val columnMap = druidClient.getSchema(
+        conf.getString(DruidConfigurationKeys.tableKey),
+        Some(List[Interval](Intervals.utc(
+          lowerBound.getOrElse(JodaUtils.MIN_INSTANT),
+          upperBound.getOrElse(JodaUtils.MAX_INSTANT)
+        )))
+      )
+      schema = Option(SchemaUtils.convertDruidSchemaToSparkSchema(columnMap))
+      druidColumnTypes = Option(columnMap.map(_._2._1).toSet)
+      schema.get
+    }
+  }
+
+  override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
+    // For now, one partition for each Druid segment partition
+    // Future improvements can use information from SegmentAnalyzer results to do smart things
+    if (schema.isEmpty) {
+      readSchema()
+    }
+    val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)
+    val filter = FilterUtils.mapFilters(filters, schema.get)
+    val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)
+    val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)
+    val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
+
+    // Allow passing hard-coded list of segments to load
+    if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {
+      val segments: JList[DataSegment] = MAPPER.readValue(
+        readerConf.getString(DruidConfigurationKeys.segmentsKey),
+        new TypeReference[JList[DataSegment]]() {}
+      )
+      segments.asScala
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    } else {
+      getSegments
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    }
+  }
+
+  override def pruneColumns(structType: StructType): Unit = {
+    schema = Option(structType)
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    readSchema()
+    filters.partition(FilterUtils.isSupportedFilter(_, schema.get)) match {
+      case (supported, unsupported) =>
+        this.filters = supported
+        unsupported
+    }
+  }
+
+  override def pushedFilters(): Array[Filter] = filters
+
+  private[v2] def getSegments: Seq[DataSegment] = {
+    require(conf.isPresent(DruidConfigurationKeys.tableKey),
+      s"Must set ${DruidConfigurationKeys.tableKey}!")
+
+    // Check filters for any bounds on __time
+    // Otherwise, we'd need to full scan the segments table
+    val (lowerTimeBound, upperTimeBound) = FilterUtils.getTimeFilterBounds(filters)
+
+    metadataClient.getSegmentPayloads(conf.getString(DruidConfigurationKeys.tableKey),

Review comment:
       Hmm, it seems that `metadataClient.getSegmentPayloads` simply returns used segments in a given interval. In that case, the returned segments can include some segments overshadowed by others if the coordinator hasn't marked those overshadowed segments unused yet. To handle this problem, Druid uses `VersionedIntervalTimeline` to find the most recent set of segments. Is there some code somewhere to find the most recent set of segments from the returned segments?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r749913511



##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReader.scala
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.java.util.common.{FileUtils, IAE, ISE, StringUtils}
+import org.apache.druid.query.dimension.DefaultDimensionSpec
+import org.apache.druid.query.filter.DimFilter
+import org.apache.druid.segment.column.ValueType
+import org.apache.druid.segment.vector.{VectorColumnSelectorFactory, VectorCursor}
+import org.apache.druid.segment.{QueryableIndexStorageAdapter, VirtualColumns}
+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.registries.ComplexMetricRegistry
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
+import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, StringType,
+  StructField, StructType, TimestampType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+
+class DruidColumnarInputPartitionReader(
+                                         segmentStr: String,
+                                         schema: StructType,
+                                         filter: Option[DimFilter],
+                                         columnTypes: Option[Set[String]],
+                                         broadcastedHadoopConf: Broadcast[SerializableHadoopConfiguration],
+                                         conf: Configuration,
+                                         useSparkConfForDeepStorage: Boolean,
+                                         useCompactSketches: Boolean,
+                                         useDefaultNullHandling: Boolean,
+                                         batchSize: Int
+                                       )
+  extends DruidBaseInputPartitionReader(
+    segmentStr,
+    columnTypes,
+    broadcastedHadoopConf,
+    conf,
+    useSparkConfForDeepStorage,
+    useCompactSketches,
+    useDefaultNullHandling
+  ) with InputPartitionReader[ColumnarBatch] with Logging {
+
+  private val adapter = new QueryableIndexStorageAdapter(queryableIndex)
+
+  private val cursor: VectorCursor = adapter.makeVectorCursor(
+    filter.map(_.toOptimizedFilter).orNull,
+    adapter.getInterval,
+    VirtualColumns.EMPTY,
+    false,
+    batchSize,
+    null) // scalastyle:ignore null
+
+  private val columnVectors: Array[OnHeapColumnVector] = OnHeapColumnVector.allocateColumns(batchSize, schema)
+  private val resultBatch: ColumnarBatch = new ColumnarBatch(columnVectors.map(_.asInstanceOf[ColumnVector]))
+
+  override def next(): Boolean = {
+    if (!cursor.isDone) {
+      fillVectors()
+      true
+    } else {
+      false
+    }
+  }
+
+  override def get(): ColumnarBatch = {
+    resultBatch
+  }
+
+  override def close(): Unit = {
+    resultBatch.close()

Review comment:
       The Spark elements (`resultBatch`, columnVectors`) shouldn't. The Druid elements can throw RuntimeExceptions, but I'm not sure what recovery actions we would take if the call failed - log and rethrow? Either way, any unhandled or raised exception here is going to result in the loss of the task and the destruction of the JVM, so the only leak I think we'd face is in the temporary directory. Spark should clear out the temporary directories created by a run when it handles a failing task, but adding explicit guards and a finally block is a good idea 👍.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest edited a comment on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest edited a comment on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-952630338


   The packaging check is failing with
   ```Traceback (most recent call last):
     File "/home/travis/build/apache/druid/distribution/bin/generate-binary-license.py", line 181, in <module>
       generate_license(apache_license_v2, license_yaml)
     File "/home/travis/build/apache/druid/distribution/bin/generate-binary-license.py", line 140, in generate_license
       licenses_list = list(yaml.load_all(registry_file))
   
   TypeError: load_all() missing 1 required positional argument: 'Loader'
   
   [ERROR] Command execution failed.
   
   org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1)
       at org.apache.commons.exec.DefaultExecutor.executeInternal (DefaultExecutor.java:404)
       at org.apache.commons.exec.DefaultExecutor.execute (DefaultExecutor.java:166)
       at org.codehaus.mojo.exec.ExecMojo.executeCommandLine (ExecMojo.java:804)
       at org.codehaus.mojo.exec.ExecMojo.executeCommandLine (ExecMojo.java:751)
       at org.codehaus.mojo.exec.ExecMojo.execute (ExecMojo.java:313)
       at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:137)
       at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:210)
       at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:156)
       at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:148)
       at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
       at org.apache.maven.lifecycle.internal.builder.multithreaded.MultiThreadedBuilder$1.call (MultiThreadedBuilder.java:190)
       at org.apache.maven.lifecycle.internal.builder.multithreaded.MultiThreadedBuilder$1.call (MultiThreadedBuilder.java:186)
       at java.util.concurrent.FutureTask.run (FutureTask.java:266)
       at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
       at java.util.concurrent.FutureTask.run (FutureTask.java:266)
       at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
       at java.lang.Thread.run (Thread.java:748)
   ```
   
   I had thought this was an non-obvious error due to not adding licenses yet but the check is still failing after I add the licenses. Is this something people regularly encounter?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] samarthjain commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r755731078



##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReader.scala
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.java.util.common.{FileUtils, IAE, ISE, StringUtils}
+import org.apache.druid.query.dimension.DefaultDimensionSpec
+import org.apache.druid.query.filter.DimFilter
+import org.apache.druid.segment.column.ValueType
+import org.apache.druid.segment.vector.{VectorColumnSelectorFactory, VectorCursor}
+import org.apache.druid.segment.{QueryableIndexStorageAdapter, VirtualColumns}
+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.registries.ComplexMetricRegistry
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
+import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, StringType,
+  StructField, StructType, TimestampType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+
+class DruidColumnarInputPartitionReader(

Review comment:
       Ah, yes. Makes sense. I am ok with `DruidColumnarInputPartitionReader` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-1012822159


   @JulianJaffePinterest thank you for picking this up again. It makes sense to me. I will merge this PR shortly. Happy new year! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #11823:
URL: https://github.com/apache/druid/pull/11823#issuecomment-989431304


   @JulianJaffePinterest I'm not sure. Maybe we can see what is in the surefire reports after they fail in Travis?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

Posted by GitBox <gi...@apache.org>.
JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r760093726



##########
File path: spark/src/main/scala/org/apache/druid/spark/registries/ComplexMetricRegistry.scala
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.registries
+
+import org.apache.datasketches.hll.HllSketch
+import org.apache.datasketches.quantiles.DoublesSketch
+import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule
+import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule
+import org.apache.druid.query.aggregation.datasketches.theta.{SketchHolder, SketchModule}
+import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule
+import org.apache.druid.query.aggregation.histogram.{ApproximateHistogram,
+  ApproximateHistogramDruidModule, FixedBucketsHistogram, FixedBucketsHistogramAggregator}
+import org.apache.druid.query.aggregation.variance.{VarianceAggregatorCollector, VarianceSerde}
+import org.apache.druid.segment.serde.ComplexMetrics
+import org.apache.druid.spark.mixins.Logging
+
+import scala.collection.mutable
+
+/**
+ * A registry for plugging in support for Druid complex metric types. Provides definitions for supporting complex types
+ * in extensions-core out of the box.
+ */
+object ComplexMetricRegistry extends Logging {

Review comment:
       Done 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org