You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/10/18 09:08:43 UTC

carbondata git commit: [CARBONDATA-3009] Move method for mergeIndex to correct the place

Repository: carbondata
Updated Branches:
  refs/heads/master 4a090ce27 -> 0e8588744


[CARBONDATA-3009] Move method for mergeIndex to correct the place

Currently the entry point of function for mergeIndex is in CommonUtil
which is not proper. Here in this commit, we will move this to the right
place.

This closes #2817


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0e858874
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0e858874
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0e858874

Branch: refs/heads/master
Commit: 0e8588744fa0d4d0ee0c02d52de09231492f56d8
Parents: 4a090ce
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Tue Oct 16 14:57:16 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Oct 18 17:08:08 2018 +0800

----------------------------------------------------------------------
 .../carbondata/spark/util/CommonUtil.scala      | 68 ++------------------
 .../apache/spark/rdd/CarbonMergeFilesRDD.scala  | 63 ++++++++++++++++++
 .../sql/events/MergeIndexEventListener.scala    |  8 ++-
 3 files changed, 72 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index f6e2b94..82a2f9d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -29,13 +29,11 @@ import scala.util.Random
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, SparkEnv}
-import org.apache.spark.rdd.CarbonMergeFilesRDD
-import org.apache.spark.sql.{Row, RowFactory, SparkSession}
+import org.apache.spark.sql.{Row, RowFactory}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
 import org.apache.spark.sql.types.{MetadataBuilder, StringType}
-import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.FileUtils
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -43,14 +41,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.partition.PartitionUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.comparator.Comparator
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
@@ -826,61 +823,4 @@ object CommonUtil {
       })
     }
   }
-
-  /**
-   * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment
-   *
-   * @param sparkContext
-   * @param segmentIds
-   * @param tablePath
-   * @param carbonTable
-   * @param mergeIndexProperty
-   * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
-   *                                         file. This will used in case of upgrade from version
-   *                                         which do not store the blocklet info to current
-   *                                         version
-   */
-  def mergeIndexFiles(sparkSession: SparkSession,
-    segmentIds: Seq[String],
-    segmentFileNameToSegmentIdMap: java.util.Map[String, String],
-    tablePath: String,
-    carbonTable: CarbonTable,
-    mergeIndexProperty: Boolean,
-    readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
-    if (mergeIndexProperty) {
-      new CarbonMergeFilesRDD(
-        sparkSession,
-        carbonTable,
-        segmentIds,
-        segmentFileNameToSegmentIdMap,
-        carbonTable.isHivePartitionTable,
-        readFileFooterFromCarbonDataFile).collect()
-    } else {
-      try {
-        if (CarbonProperties.getInstance().getProperty(
-          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
-          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
-          new CarbonMergeFilesRDD(
-            sparkSession,
-            carbonTable,
-            segmentIds,
-            segmentFileNameToSegmentIdMap,
-            carbonTable.isHivePartitionTable,
-            readFileFooterFromCarbonDataFile).collect()
-        }
-      } catch {
-        case _: Exception =>
-          if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
-            new CarbonMergeFilesRDD(
-              sparkSession,
-              carbonTable,
-              segmentIds,
-              segmentFileNameToSegmentIdMap,
-              carbonTable.isHivePartitionTable,
-              readFileFooterFromCarbonDataFile).collect()
-          }
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index e29a658..3605dde 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -20,7 +20,9 @@ package org.apache.spark.rdd
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.sql.SparkSession
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -34,6 +36,67 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String)
   override def hashCode(): Int = 41 * (41 + rddId) + idx
 }
 
+object CarbonMergeFilesRDD {
+
+  /**
+   * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment
+   *
+   * @param sparkSession carbon session
+   * @param segmentIds the segments to process
+   * @param segmentFileNameToSegmentIdMap a map that map the segmentFileName to segmentId
+   * @param tablePath table path
+   * @param carbonTable carbon table
+   * @param mergeIndexProperty whether to merge the property of the carbon index, the usage
+   *                           scenario is the same as that of `readFileFooterFromCarbonDataFile`
+   * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
+   *                                         file. This will used in case of upgrade from version
+   *                                         which do not store the blocklet info to current
+   *                                         version
+   */
+  def mergeIndexFiles(sparkSession: SparkSession,
+      segmentIds: Seq[String],
+      segmentFileNameToSegmentIdMap: java.util.Map[String, String],
+      tablePath: String,
+      carbonTable: CarbonTable,
+      mergeIndexProperty: Boolean,
+      readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
+    if (mergeIndexProperty) {
+      new CarbonMergeFilesRDD(
+        sparkSession,
+        carbonTable,
+        segmentIds,
+        segmentFileNameToSegmentIdMap,
+        carbonTable.isHivePartitionTable,
+        readFileFooterFromCarbonDataFile).collect()
+    } else {
+      try {
+        if (CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
+          new CarbonMergeFilesRDD(
+            sparkSession,
+            carbonTable,
+            segmentIds,
+            segmentFileNameToSegmentIdMap,
+            carbonTable.isHivePartitionTable,
+            readFileFooterFromCarbonDataFile).collect()
+        }
+      } catch {
+        case _: Exception =>
+          if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
+            new CarbonMergeFilesRDD(
+              sparkSession,
+              carbonTable,
+              segmentIds,
+              segmentFileNameToSegmentIdMap,
+              carbonTable.isHivePartitionTable,
+              readFileFooterFromCarbonDataFile).collect()
+          }
+      }
+    }
+  }
+}
+
 /**
  * RDD to merge all carbonindex files of each segment to carbonindex file into the same segment.
  * @param ss

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e858874/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 639a0e3..c8c9a47 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -23,6 +23,8 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.CarbonMergeFilesRDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.util.CarbonException
 
@@ -61,7 +63,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
 
             segmentFileNameMap
               .put(loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp))
-            CommonUtil.mergeIndexFiles(sparkSession,
+            CarbonMergeFilesRDD.mergeIndexFiles(sparkSession,
               Seq(loadModel.getSegmentId),
               segmentFileNameMap,
               carbonTable.getTablePath,
@@ -116,7 +118,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
               // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy
               // store (store <= 1.1 version) and create merge Index file as per new store so that
               // old store is also upgraded to new store
-              CommonUtil.mergeIndexFiles(
+              CarbonMergeFilesRDD.mergeIndexFiles(
                 sparkSession = sparkSession,
                 segmentIds = validSegmentIds,
                 segmentFileNameToSegmentIdMap = segmentFileNameMap,
@@ -176,7 +178,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
     val validMergedSegIds = validSegments
       .filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) }.map(_.getSegmentNo)
     if (null != validMergedSegIds && validMergedSegIds.nonEmpty) {
-      CommonUtil.mergeIndexFiles(sparkSession,
+      CarbonMergeFilesRDD.mergeIndexFiles(sparkSession,
           validMergedSegIds,
           segmentFileNameMap,
           carbonTable.getTablePath,