You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/11/16 05:13:13 UTC

[GitHub] [druid] JulianJaffePinterest commented on a change in pull request #11823: Add Spark connector reader support.

JulianJaffePinterest commented on a change in pull request #11823:
URL: https://github.com/apache/druid/pull/11823#discussion_r749904700



##########
File path: spark/src/main/scala/org/apache/druid/spark/clients/DruidMetadataClient.scala
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spark.clients
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.google.common.base.Suppliers
+import org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler
+import org.apache.druid.java.util.common.{DateTimes, Intervals, JodaUtils, StringUtils}
+import org.apache.druid.metadata.{DynamicConfigProvider, MetadataStorageConnectorConfig,
+  MetadataStorageTablesConfig, SQLMetadataConnector}
+import org.apache.druid.spark.MAPPER
+import org.apache.druid.spark.configuration.{Configuration, DruidConfigurationKeys}
+import org.apache.druid.spark.mixins.Logging
+import org.apache.druid.spark.registries.SQLConnectorRegistry
+import org.apache.druid.timeline.{DataSegment, Partitions, VersionedIntervalTimeline}
+import org.skife.jdbi.v2.{DBI, Handle}
+
+import java.util.Properties
+import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter,
+  asScalaSetConverter, mapAsJavaMapConverter}
+
+class DruidMetadataClient(
+                           metadataDbType: String,
+                           host: String,
+                           port: Int,
+                           connectUri: String,
+                           user: String,
+                           passwordProviderSer: String,
+                           dbcpMap: Properties,
+                           base: String = "druid"
+                         ) extends Logging {
+  private lazy val druidMetadataTableConfig = MetadataStorageTablesConfig.fromBase(base)
+  private lazy val dbcpProperties = new Properties(dbcpMap)
+  private lazy val password = if (passwordProviderSer == "") {
+    // Jackson doesn't like deserializing empty strings
+    passwordProviderSer
+  } else {
+    MAPPER.readValue[DynamicConfigProvider[String]](
+      passwordProviderSer, new TypeReference[DynamicConfigProvider[String]] {}
+    ).getConfig.getOrDefault("password", "")
+  }
+
+  private lazy val connectorConfig: MetadataStorageConnectorConfig =
+    new MetadataStorageConnectorConfig
+    {
+      override def isCreateTables: Boolean = false
+      override def getHost: String = host
+      override def getPort: Int = port
+      override def getConnectURI: String = connectUri
+      override def getUser: String = user
+      override def getPassword: String = password
+      override def getDbcpProperties: Properties = dbcpProperties
+    }
+  private lazy val connectorConfigSupplier = Suppliers.ofInstance(connectorConfig)
+  private lazy val metadataTableConfigSupplier = Suppliers.ofInstance(druidMetadataTableConfig)
+  private lazy val connector = buildSQLConnector()
+
+  /**
+    * Get the non-overshadowed used segments for DATASOURCE between INTERVALSTART and INTERVALEND. If either interval
+    * endpoint is None, JodaUtils.MIN_INSTANCE/MAX_INSTANCE is used instead. By default, only segments for complete
+    * partitions are returned. This behavior can be overriden by setting ALLOWINCOMPLETEPARTITIONS, in which case all
+    * non-overshadowed segments in the interval will be returned, regardless of completesness.
+    *
+    * @param datasource The Druid data source to get segment payloads for.
+    * @param intervalStart The start of the interval to fetch segment payloads for. If None, MIN_INSTANT is used.

Review comment:
       I'm not 100% on how to word this: the SQL query being executed uses `>=` and `<=` for its bounds, but it's effectively querying against JodaTime Interval endpoints, where the start time is inclusive and the end time is exclusive. The net result is fetching segments within `[startTime, endTime)`. If you were encountering this method while working in an unfamiliar code base, how would you want this communicated?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org