You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/11/15 09:19:42 UTC

[GitHub] [druid] samarthjain commented on a change in pull request #11823: Add Spark connector reader support.

samarthjain commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r749089887



##########
File path: spark/src/main/scala/org/apache/druid/spark/clients/DruidMetadataClient.scala
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.clients
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.google.common.base.Suppliers
+import org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler
+import org.apache.druid.java.util.common.{DateTimes, Intervals, JodaUtils, StringUtils}
+import org.apache.druid.metadata.{DynamicConfigProvider, MetadataStorageConnectorConfig,
+  MetadataStorageTablesConfig, SQLMetadataConnector}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.registries.SQLConnectorRegistry
+import org.apache.druid.timeline.{DataSegment, Partitions, VersionedIntervalTimeline}
+import org.skife.jdbi.v2.{DBI, Handle}
+
+import java.util.Properties
+import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter,
+  asScalaSetConverter, mapAsJavaMapConverter}
+
+class DruidMetadataClient(
+                           metadataDbType: String,
+                           host: String,
+                           port: Int,
+                           connectUri: String,
+                           user: String,
+                           passwordProviderSer: String,
+                           dbcpMap: Properties,
+                           base: String = "druid"
+                         ) extends Logging {
+  private lazy val druidMetadataTableConfig = MetadataStorageTablesConfig.fromBase(base)
+  private lazy val dbcpProperties = new Properties(dbcpMap)
+  private lazy val password = if (passwordProviderSer == "") {
+    // Jackson doesn't like deserializing empty strings
+    passwordProviderSer
+  } else {
+    MAPPER.readValue[DynamicConfigProvider[String]](
+      passwordProviderSer, new TypeReference[DynamicConfigProvider[String]] {}
+    ).getConfig.getOrDefault("password", "")
+  }
+
+  private lazy val connectorConfig: MetadataStorageConnectorConfig =
+    new MetadataStorageConnectorConfig
+    {
+      override def isCreateTables: Boolean = false
+      override def getHost: String = host
+      override def getPort: Int = port
+      override def getConnectURI: String = connectUri
+      override def getUser: String = user
+      override def getPassword: String = password
+      override def getDbcpProperties: Properties = dbcpProperties
+    }
+  private lazy val connectorConfigSupplier = Suppliers.ofInstance(connectorConfig)
+  private lazy val metadataTableConfigSupplier = Suppliers.ofInstance(druidMetadataTableConfig)
+  private lazy val connector = buildSQLConnector()
+
+  /**
+    * Get the non-overshadowed used segments for DATASOURCE between INTERVALSTART and INTERVALEND. If either interval
+    * endpoint is None, JodaUtils.MIN_INSTANCE/MAX_INSTANCE is used instead. By default, only segments for complete
+    * partitions are returned. This behavior can be overriden by setting ALLOWINCOMPLETEPARTITIONS, in which case all
+    * non-overshadowed segments in the interval will be returned, regardless of completesness.
+    *
+    * @param datasource The Druid data source to get segment payloads for.
+    * @param intervalStart The start of the interval to fetch segment payloads for. If None, MIN_INSTANT is used.

Review comment:
       You may want to mention that start and end intervals are inclusive. 

##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidDataSourceReader.scala
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import org.apache.druid.java.util.common.{Intervals, JodaUtils}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.clients.{DruidClient, DruidMetadataClient}
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.utils.{FilterUtils, SchemaUtils}
+import org.apache.druid.timeline.DataSegment
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition,
+  SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.joda.time.Interval
+
+import java.util.{List => JList}
+import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}
+
+/**
+  * A DruidDataSourceReader handles the actual work of reading data from Druid. It does this by querying to determine
+  * where Druid segments live in deep storage and then reading those segments into memory in order to avoid straining
+  * the Druid cluster. In general, users should not directly instantiate instances of this class but instead use
+  * sparkSession.read.format("druid").options(Map(...)).load(). If the schema of the data in Druid is known, overhead
+  * can be further reduced by providing it directly (e.g. sparkSession.read.format("druid").schema(schema).options...)
+  *
+  * To aid comprehensibility, some idiomatic Scala has been somewhat java-fied.
+  */
+class DruidDataSourceReader(
+                             var schema: Option[StructType] = None,
+                             conf: Configuration
+                           ) extends DataSourceReader
+  with SupportsPushDownRequiredColumns with SupportsPushDownFilters with SupportsScanColumnarBatch with Logging {
+  private lazy val metadataClient =
+    DruidDataSourceReader.createDruidMetaDataClient(conf)
+  private lazy val druidClient = DruidDataSourceReader.createDruidClient(conf)
+
+  private var filters: Array[Filter] = Array.empty
+  private var druidColumnTypes: Option[Set[String]] = Option.empty
+
+  override def readSchema(): StructType = {
+    if (schema.isDefined) {
+      schema.get
+    } else {
+      require(conf.isPresent(DruidConfigurationKeys.tableKey),
+        s"Must set ${DruidConfigurationKeys.tableKey}!")
+      // TODO: Optionally accept a granularity so that if lowerBound to upperBound spans more than
+      //  twice the granularity duration, we can send a list with two disjoint intervals and
+      //  minimize the load on the broker from having to merge large numbers of segments
+      val (lowerBound, upperBound) = FilterUtils.getTimeFilterBounds(filters)
+      val columnMap = druidClient.getSchema(
+        conf.getString(DruidConfigurationKeys.tableKey),
+        Some(List[Interval](Intervals.utc(
+          lowerBound.getOrElse(JodaUtils.MIN_INSTANT),
+          upperBound.getOrElse(JodaUtils.MAX_INSTANT)
+        )))
+      )
+      schema = Option(SchemaUtils.convertDruidSchemaToSparkSchema(columnMap))
+      druidColumnTypes = Option(columnMap.map(_._2._1).toSet)
+      schema.get
+    }
+  }
+
+  override def planInputPartitions(): JList[InputPartition[InternalRow]] = {
+    // For now, one partition for each Druid segment partition
+    // Future improvements can use information from SegmentAnalyzer results to do smart things
+    if (schema.isEmpty) {
+      readSchema()
+    }
+    val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)
+    val filter = FilterUtils.mapFilters(filters, schema.get)
+    val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)
+    val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)
+    val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
+
+    // Allow passing hard-coded list of segments to load
+    if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {
+      val segments: JList[DataSegment] = MAPPER.readValue(
+        readerConf.getString(DruidConfigurationKeys.segmentsKey),
+        new TypeReference[JList[DataSegment]]() {}
+      )
+      segments.asScala
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    } else {
+      getSegments
+        .map(segment =>
+          new DruidInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling
+          ): InputPartition[InternalRow]
+        ).asJava
+    }
+  }
+
+  override def pruneColumns(structType: StructType): Unit = {
+    schema = Option(structType)
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    readSchema()
+    filters.partition(FilterUtils.isSupportedFilter(_, schema.get)) match {
+      case (supported, unsupported) =>
+        this.filters = supported
+        unsupported
+    }
+  }
+
+  override def pushedFilters(): Array[Filter] = filters
+
+  private[v2] def getSegments: Seq[DataSegment] = {
+    require(conf.isPresent(DruidConfigurationKeys.tableKey),
+      s"Must set ${DruidConfigurationKeys.tableKey}!")
+
+    // Check filters for any bounds on __time
+    // Otherwise, we'd need to full scan the segments table
+    val (lowerTimeBound, upperTimeBound) = FilterUtils.getTimeFilterBounds(filters)
+
+    metadataClient.getSegmentPayloads(
+      conf.getString(DruidConfigurationKeys.tableKey),
+      lowerTimeBound,
+      upperTimeBound,
+      conf.getBoolean(DruidConfigurationKeys.allowIncompletePartitionsDefaultKey)
+    )
+  }
+
+  override def planBatchInputPartitions(): JList[InputPartition[ColumnarBatch]] = {
+    if (schema.isEmpty) {
+      readSchema()
+    }
+    val readerConf = conf.dive(DruidConfigurationKeys.readerPrefix)
+    val filter = FilterUtils.mapFilters(filters, schema.get)
+    val useSparkConfForDeepStorage = readerConf.getBoolean(DruidConfigurationKeys.useSparkConfForDeepStorageDefaultKey)
+    val useCompactSketches = readerConf.isPresent(DruidConfigurationKeys.useCompactSketchesKey)
+    val useDefaultNullHandling = readerConf.getBoolean(DruidConfigurationKeys.useDefaultValueForNullDefaultKey)
+    val batchSize = readerConf.getInt(DruidConfigurationKeys.batchSizeDefaultKey)
+
+    // Allow passing hard-coded list of segments to load
+    if (readerConf.isPresent(DruidConfigurationKeys.segmentsKey)) {
+      val segments: JList[DataSegment] = MAPPER.readValue(
+        readerConf.getString(DruidConfigurationKeys.segmentsKey),
+        new TypeReference[JList[DataSegment]]() {}
+      )
+      segments.asScala
+        .map(segment =>
+          new DruidColumnarInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling,
+            batchSize
+          ): InputPartition[ColumnarBatch]
+        ).asJava
+    } else {
+      getSegments
+        .map(segment =>
+          new DruidColumnarInputPartition(
+            segment,
+            schema.get,
+            filter,
+            druidColumnTypes,
+            conf,
+            useSparkConfForDeepStorage,
+            useCompactSketches,
+            useDefaultNullHandling,
+            batchSize
+          ): InputPartition[ColumnarBatch]
+        ).asJava
+    }
+  }
+
+  override def enableBatchRead(): Boolean = {
+    // Fail fast
+    if (!conf.dive(DruidConfigurationKeys.readerPrefix).getBoolean(DruidConfigurationKeys.vectorizeDefaultKey)) {
+      false
+    } else {
+      if (schema.isEmpty) {
+        readSchema()
+      }
+      val filterOpt = FilterUtils.mapFilters(filters, schema.get)
+      filterOpt.fold(true) { filter =>
+        val rowSignature = SchemaUtils.generateRowSignatureFromSparkSchema(schema.get)
+        val canVectorize = filter.toOptimizedFilter.canVectorizeMatcher(rowSignature)

Review comment:
       What about the case when Druid doesn't support vectorized reads for say a complex column type? Does that mean we should be looking at the schema also to decide value of `canVectorize`? We should probably disable vectorization then. 

##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReader.scala
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.java.util.common.{FileUtils, IAE, ISE, StringUtils}
+import org.apache.druid.query.dimension.DefaultDimensionSpec
+import org.apache.druid.query.filter.DimFilter
+import org.apache.druid.segment.column.ValueType
+import org.apache.druid.segment.vector.{VectorColumnSelectorFactory, VectorCursor}
+import org.apache.druid.segment.{QueryableIndexStorageAdapter, VirtualColumns}
+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.registries.ComplexMetricRegistry
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
+import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, StringType,
+  StructField, StructType, TimestampType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+
+class DruidColumnarInputPartitionReader(
+                                         segmentStr: String,
+                                         schema: StructType,
+                                         filter: Option[DimFilter],
+                                         columnTypes: Option[Set[String]],
+                                         broadcastedHadoopConf: Broadcast[SerializableHadoopConfiguration],
+                                         conf: Configuration,
+                                         useSparkConfForDeepStorage: Boolean,
+                                         useCompactSketches: Boolean,
+                                         useDefaultNullHandling: Boolean,
+                                         batchSize: Int
+                                       )
+  extends DruidBaseInputPartitionReader(
+    segmentStr,
+    columnTypes,
+    broadcastedHadoopConf,
+    conf,
+    useSparkConfForDeepStorage,
+    useCompactSketches,
+    useDefaultNullHandling
+  ) with InputPartitionReader[ColumnarBatch] with Logging {
+
+  private val adapter = new QueryableIndexStorageAdapter(queryableIndex)
+
+  private val cursor: VectorCursor = adapter.makeVectorCursor(
+    filter.map(_.toOptimizedFilter).orNull,
+    adapter.getInterval,
+    VirtualColumns.EMPTY,
+    false,
+    batchSize,
+    null) // scalastyle:ignore null
+
+  private val columnVectors: Array[OnHeapColumnVector] = OnHeapColumnVector.allocateColumns(batchSize, schema)
+  private val resultBatch: ColumnarBatch = new ColumnarBatch(columnVectors.map(_.asInstanceOf[ColumnVector]))
+
+  override def next(): Boolean = {
+    if (!cursor.isDone) {
+      fillVectors()
+      true
+    } else {
+      false
+    }
+  }
+
+  override def get(): ColumnarBatch = {
+    resultBatch
+  }
+
+  override def close(): Unit = {
+    resultBatch.close()

Review comment:
       Can any of these `close()` calls throw an `Exception`? We probably need to guard against that. 

##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/reader/DruidColumnarInputPartitionReader.scala
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.v2.reader
+
+import org.apache.druid.java.util.common.{FileUtils, IAE, ISE, StringUtils}
+import org.apache.druid.query.dimension.DefaultDimensionSpec
+import org.apache.druid.query.filter.DimFilter
+import org.apache.druid.segment.column.ValueType
+import org.apache.druid.segment.vector.{VectorColumnSelectorFactory, VectorCursor}
+import org.apache.druid.segment.{QueryableIndexStorageAdapter, VirtualColumns}
+import org.apache.druid.spark.configuration.{Configuration, SerializableHadoopConfiguration}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.registries.ComplexMetricRegistry
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
+import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, StringType,
+  StructField, StructType, TimestampType}
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+
+class DruidColumnarInputPartitionReader(

Review comment:
       Would a better name instead be `DruidVectorizedInputPartitionReader`. I am not sure if `Columnar` is representing the fact that this reader is reading column values in batches. 

##########
File path: spark/src/main/scala/org/apache/druid/spark/v2/v2.scala
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark
+
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory
+import org.apache.druid.segment.{IndexIO, IndexMergerV9}
+
+package object v2 { // scalastyle:ignore package.object.name

Review comment:
       What is the purpose of this class? 

##########
File path: spark/src/main/scala/org/apache/druid/spark/utils/NullHandlingUtils.scala
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.utils
+
+import org.apache.druid.common.config.{NullHandling, NullValueHandlingConfig}
+
+/**
+  * Utility class for initializing a Druid NullValueHandlingConfig. In a Druid cluster, this is handled via injection
+  * and in unit tests defautl value null handling is initialized via NullHandling.initializeForTests(), but we need

Review comment:
       typo: default

##########
File path: spark/src/main/scala/org/apache/druid/spark/utils/FilterUtils.scala
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.utils
+
+import org.apache.druid.java.util.common.{ISE, JodaUtils}
+import org.apache.druid.query.filter.{AndDimFilter, BoundDimFilter, DimFilter, InDimFilter,
+  NotDimFilter, OrDimFilter, RegexDimFilter, SelectorDimFilter}
+import org.apache.druid.query.ordering.{StringComparator, StringComparators}
+import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan,
+  GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains,
+  StringEndsWith, StringStartsWith}
+import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, IntegerType,
+  LongType, StringType, StructType, TimestampType}
+
+import scala.collection.JavaConverters.{seqAsJavaListConverter, setAsJavaSetConverter}
+
+/**
+  * Converters and utilities for working with Spark and Druid Filters.
+  */
+object FilterUtils {
+  /**
+    * Map an array of Spark filters FILTERS to a Druid dim filter or None if filters is empty.
+    *
+    * We return a DimFilter instead of a Filter and force callers to call .toFilter
+    * or .toOptimizedFilter to get a filter because callers can't covert back to a DimFilter from a
+    * Filter.
+    *
+    * @param filters The spark filters to map to a Druid filter.
+    * @return A Druid filter corresponding to the union of filter conditions enumerated in FILTERS.
+    */
+  def mapFilters(filters: Array[Filter], schema: StructType): Option[DimFilter] = {
+    if (filters.isEmpty) {
+      Option.empty[DimFilter]
+    } else {
+      Some(new AndDimFilter(filters.map(mapFilter(_, schema)).toList.asJava).optimize())
+    }
+  }
+
+  /**
+    * Convert a Spark-style filter FILTER to a Druid-style filter.
+    *
+    * @param filter The Spark filter to map to a Druid filter.
+    * @return The Druid filter corresponding to the filter condition described by FILTER.
+    */
+  def mapFilter(filter: Filter, schema: StructType): DimFilter = { // scalastyle:ignore method.length

Review comment:
       Druid supports a few other filter types like `Search`, `Extraction`, `Expression`, `Javascript` etc. Are there any spark counterparts for those? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org