You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/10/18 09:08:43 UTC
carbondata git commit: [CARBONDATA-3009] Move method for mergeIndex
to correct the place
Repository: carbondata
Updated Branches:
refs/heads/master 4a090ce27 -> 0e8588744
[CARBONDATA-3009] Move method for mergeIndex to correct the place
Currently the entry point of function for mergeIndex is in CommonUtil
which is not proper. Here in this commit, we will move this to the right
place.
This closes #2817
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0e858874
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0e858874
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0e858874
Branch: refs/heads/master
Commit: 0e8588744fa0d4d0ee0c02d52de09231492f56d8
Parents: 4a090ce
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Tue Oct 16 14:57:16 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Oct 18 17:08:08 2018 +0800
----------------------------------------------------------------------
.../carbondata/spark/util/CommonUtil.scala | 68 ++------------------
.../apache/spark/rdd/CarbonMergeFilesRDD.scala | 63 ++++++++++++++++++
.../sql/events/MergeIndexEventListener.scala | 8 ++-
3 files changed, 72 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index f6e2b94..82a2f9d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -29,13 +29,11 @@ import scala.util.Random
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.{SparkContext, SparkEnv}
-import org.apache.spark.rdd.CarbonMergeFilesRDD
-import org.apache.spark.sql.{Row, RowFactory, SparkSession}
+import org.apache.spark.sql.{Row, RowFactory}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
import org.apache.spark.sql.types.{MetadataBuilder, StringType}
-import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.FileUtils
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -43,14 +41,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.partition.PartitionUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
import org.apache.carbondata.core.util.comparator.Comparator
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
@@ -826,61 +823,4 @@ object CommonUtil {
})
}
}
-
- /**
- * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment
- *
- * @param sparkContext
- * @param segmentIds
- * @param tablePath
- * @param carbonTable
- * @param mergeIndexProperty
- * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
- * file. This will used in case of upgrade from version
- * which do not store the blocklet info to current
- * version
- */
- def mergeIndexFiles(sparkSession: SparkSession,
- segmentIds: Seq[String],
- segmentFileNameToSegmentIdMap: java.util.Map[String, String],
- tablePath: String,
- carbonTable: CarbonTable,
- mergeIndexProperty: Boolean,
- readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
- if (mergeIndexProperty) {
- new CarbonMergeFilesRDD(
- sparkSession,
- carbonTable,
- segmentIds,
- segmentFileNameToSegmentIdMap,
- carbonTable.isHivePartitionTable,
- readFileFooterFromCarbonDataFile).collect()
- } else {
- try {
- if (CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
- new CarbonMergeFilesRDD(
- sparkSession,
- carbonTable,
- segmentIds,
- segmentFileNameToSegmentIdMap,
- carbonTable.isHivePartitionTable,
- readFileFooterFromCarbonDataFile).collect()
- }
- } catch {
- case _: Exception =>
- if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
- new CarbonMergeFilesRDD(
- sparkSession,
- carbonTable,
- segmentIds,
- segmentFileNameToSegmentIdMap,
- carbonTable.isHivePartitionTable,
- readFileFooterFromCarbonDataFile).collect()
- }
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index e29a658..3605dde 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -20,7 +20,9 @@ package org.apache.spark.rdd
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.sql.SparkSession
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -34,6 +36,67 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String)
override def hashCode(): Int = 41 * (41 + rddId) + idx
}
+object CarbonMergeFilesRDD {
+
+ /**
+ * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment
+ *
+ * @param sparkSession carbon session
+ * @param segmentIds the segments to process
+ * @param segmentFileNameToSegmentIdMap a map that map the segmentFileName to segmentId
+ * @param tablePath table path
+ * @param carbonTable carbon table
+ * @param mergeIndexProperty whether to merge the property of the carbon index, the usage
+ * scenario is the same as that of `readFileFooterFromCarbonDataFile`
+ * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
+ * file. This will used in case of upgrade from version
+ * which do not store the blocklet info to current
+ * version
+ */
+ def mergeIndexFiles(sparkSession: SparkSession,
+ segmentIds: Seq[String],
+ segmentFileNameToSegmentIdMap: java.util.Map[String, String],
+ tablePath: String,
+ carbonTable: CarbonTable,
+ mergeIndexProperty: Boolean,
+ readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
+ if (mergeIndexProperty) {
+ new CarbonMergeFilesRDD(
+ sparkSession,
+ carbonTable,
+ segmentIds,
+ segmentFileNameToSegmentIdMap,
+ carbonTable.isHivePartitionTable,
+ readFileFooterFromCarbonDataFile).collect()
+ } else {
+ try {
+ if (CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+ CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
+ new CarbonMergeFilesRDD(
+ sparkSession,
+ carbonTable,
+ segmentIds,
+ segmentFileNameToSegmentIdMap,
+ carbonTable.isHivePartitionTable,
+ readFileFooterFromCarbonDataFile).collect()
+ }
+ } catch {
+ case _: Exception =>
+ if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
+ new CarbonMergeFilesRDD(
+ sparkSession,
+ carbonTable,
+ segmentIds,
+ segmentFileNameToSegmentIdMap,
+ carbonTable.isHivePartitionTable,
+ readFileFooterFromCarbonDataFile).collect()
+ }
+ }
+ }
+ }
+}
+
/**
* RDD to merge all carbonindex files of each segment to carbonindex file into the same segment.
* @param ss
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 639a0e3..c8c9a47 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -23,6 +23,8 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.internal.Logging
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.CarbonMergeFilesRDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.util.CarbonException
@@ -61,7 +63,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
segmentFileNameMap
.put(loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp))
- CommonUtil.mergeIndexFiles(sparkSession,
+ CarbonMergeFilesRDD.mergeIndexFiles(sparkSession,
Seq(loadModel.getSegmentId),
segmentFileNameMap,
carbonTable.getTablePath,
@@ -116,7 +118,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
// readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy
// store (store <= 1.1 version) and create merge Index file as per new store so that
// old store is also upgraded to new store
- CommonUtil.mergeIndexFiles(
+ CarbonMergeFilesRDD.mergeIndexFiles(
sparkSession = sparkSession,
segmentIds = validSegmentIds,
segmentFileNameToSegmentIdMap = segmentFileNameMap,
@@ -176,7 +178,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
val validMergedSegIds = validSegments
.filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) }.map(_.getSegmentNo)
if (null != validMergedSegIds && validMergedSegIds.nonEmpty) {
- CommonUtil.mergeIndexFiles(sparkSession,
+ CarbonMergeFilesRDD.mergeIndexFiles(sparkSession,
validMergedSegIds,
segmentFileNameMap,
carbonTable.getTablePath,