You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by xuchuanyin <gi...@git.apache.org> on 2019/01/05 06:47:55 UTC
[GitHub] carbondata pull request #3023: [CARBONDATA-3197][BloomDataMap] Merge bloom i...
Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/3023#discussion_r245469143
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala ---
@@ -24,59 +24,96 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap}
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD
import org.apache.carbondata.events._
+import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
class MergeBloomIndexEventListener extends OperationEventListener with Logging {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ val sparkSession = SparkSession.getActiveSession.get
event match {
+ case loadPreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =>
+ LOGGER.info("LoadTablePreStatusUpdateEvent called for bloom index merging")
+ // For loading process, segment can not be accessed at this time
+ val loadModel = loadPreStatusUpdateEvent.getCarbonLoadModel
+ val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
+ val segmentId = loadModel.getSegmentId
+
+ // filter out bloom datamap, skip lazy datamap
+ val bloomDatamaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
+ .filter(_.getDataMapSchema.getProviderName.equalsIgnoreCase(
+ DataMapClassProvider.BLOOMFILTER.getShortName))
+ .filter(!_.getDataMapSchema.isLazy).toList
+
+ mergeBloomIndex(sparkSession, carbonTable, bloomDatamaps, Seq(segmentId))
+
+ case compactPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =>
+ LOGGER.info("AlterTableCompactionPreStatusUpdateEvent called for bloom index merging")
+ // For compact process, segment can not be accessed at this time
+ val carbonTable = compactPreStatusUpdateEvent.carbonTable
--- End diff --
seems the following code block is duplicated with line#44~#54, please consider to optimize that.
---