You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by PKOfficial <gi...@git.apache.org> on 2016/10/19 17:00:43 UTC

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

GitHub user PKOfficial opened a pull request:

    https://github.com/apache/incubator-carbondata/pull/248

    [CARBONDATA-328] Improve Code and Fix Warnings [Spark]

    Removed some compliation warnings.
    Replace pattern matching for boolean to IF-ELSE.
    Improved code according to scala standards.
    
    
    **Please provide details on** 
    **- Whether new unit test cases have been added or why no new tests are required?**
    Not required because no change in functionality.
    **- What manual testing you have done?**
    Run basic Commands in Beeline.
    **- Any additional information to help reviewers in testing this change.**
    No

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/PKOfficial/incubator-carbondata improved-code-spark

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-carbondata/pull/248.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #248
    
----
commit 64f169a324e6210fd156b6b5a6acc0916c201616
Author: Prabhat Kashyap <pr...@knoldus.in>
Date:   2016-10-19T16:54:47Z

    Improved spark module code.
    * Removed some compliation warnings.
    * Replace pattern matching for boolean to IF-ELSE.
    * Improved code according to scala standards.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84404111
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -505,129 +512,129 @@ object CarbonDataRDDFactory extends Logging {
               )
         }
     
    -      val compactionThread = new Thread {
    -        override def run(): Unit = {
    +    val compactionThread = new Thread {
    +      override def run(): Unit = {
     
    +        try {
    +          // compaction status of the table which is triggered by the user.
    +          var triggeredCompactionStatus = false
    +          var exception: Exception = null
               try {
    -            // compaction status of the table which is triggered by the user.
    -            var triggeredCompactionStatus = false
    -            var exception : Exception = null
    -            try {
    -              executeCompaction(carbonLoadModel: CarbonLoadModel,
    -                hdfsStoreLocation: String,
    -                compactionModel: CompactionModel,
    -                partitioner: Partitioner,
    -                executor, sqlContext, kettleHomePath, storeLocation
    +            executeCompaction(carbonLoadModel: CarbonLoadModel,
    +              hdfsStoreLocation: String,
    +              compactionModel: CompactionModel,
    +              partitioner: Partitioner,
    +              executor, sqlContext, kettleHomePath, storeLocation
    +            )
    +            triggeredCompactionStatus = true
    +          }
    +          catch {
    +            case e: Exception =>
    +              logger.error("Exception in compaction thread " + e.getMessage)
    +              exception = e
    +          }
    +          // continue in case of exception also, check for all the tables.
    +          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
    +            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
    +              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
    +            ).equalsIgnoreCase("true")
    +
    +          if (!isConcurrentCompactionAllowed) {
    +            logger.info("System level compaction lock is enabled.")
    +            val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
    +            var tableForCompaction = CarbonCompactionUtil
    +              .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    +                .tablesMeta.toArray, skipCompactionTables.toList.asJava
                   )
    -              triggeredCompactionStatus = true
    -            }
    -            catch {
    -              case e: Exception =>
    -                logger.error("Exception in compaction thread " + e.getMessage)
    -                exception = e
    -            }
    -            // continue in case of exception also, check for all the tables.
    -            val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
    -              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
    -                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
    -              ).equalsIgnoreCase("true")
    -
    -            if (!isConcurrentCompactionAllowed) {
    -              logger.info("System level compaction lock is enabled.")
    -              val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
    -              var tableForCompaction = CarbonCompactionUtil
    -                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    -                  .tablesMeta.toArray, skipCompactionTables.toList.asJava
    +            while (null != tableForCompaction) {
    +              logger
    +                .info("Compaction request has been identified for table " + tableForCompaction
    +                  .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
    +                        .getTableName
                     )
    -              while (null != tableForCompaction) {
    -                logger
    -                  .info("Compaction request has been identified for table " + tableForCompaction
    -                    .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
    -                          .getTableName
    -                  )
    -                val table: CarbonTable = tableForCompaction.carbonTable
    -                val metadataPath = table.getMetaDataFilepath
    -                val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
    -
    -                val newCarbonLoadModel = new CarbonLoadModel()
    -                prepareCarbonLoadModel(hdfsStoreLocation, table, newCarbonLoadModel)
    -                val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
    -                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
    -                    newCarbonLoadModel.getTableName
    -                  )
    -
    -                val compactionSize = CarbonDataMergerUtil
    -                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
    -
    -                val newcompactionModel = CompactionModel(compactionSize,
    -                  compactionType,
    -                  table,
    -                  tableCreationTime,
    -                  compactionModel.isDDLTrigger
    +              val table: CarbonTable = tableForCompaction.carbonTable
    +              val metadataPath = table.getMetaDataFilepath
    +              val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
    +
    +              val newCarbonLoadModel = new CarbonLoadModel()
    +              prepareCarbonLoadModel(hdfsStoreLocation, table, newCarbonLoadModel)
    +              val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
    +                .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
    +                  newCarbonLoadModel.getTableName
    +                )
    --- End diff --
    
    move to previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84400407
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -196,30 +196,29 @@ class DataFileLoaderRDD[K, V](
       sc.setLocalProperty("spark.scheduler.pool", "DDL")
     
       override def getPartitions: Array[Partition] = {
    -    isTableSplitPartition match {
    -      case true =>
    -        // for table split partition
    -        var splits = Array[TableSplit]()
    -        if (carbonLoadModel.isDirectLoad) {
    -          splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
    -            partitioner.nodeList, partitioner.partitionCount)
    -        }
    -        else {
    -          splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
    -            carbonLoadModel.getTableName, null, partitioner)
    -        }
    +    if (isTableSplitPartition) {
    +      // for table split partition
    +      var splits = Array[TableSplit]()
    +      if (carbonLoadModel.isDirectLoad) {
    +        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
    +          partitioner.nodeList, partitioner.partitionCount)
    +      }
    +      else {
    --- End diff --
    
    put to previous line, please modify all places


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84681646
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -210,7 +216,7 @@ object CarbonDataRDDFactory extends Logging {
     
         logger
           .audit(s"Compaction request received for table " +
    --- End diff --
    
    move to previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84404340
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---
    @@ -385,22 +385,22 @@ class CarbonGlobalDictionaryGenerateRDD(
                   distinctValues)
                 sortIndexWriteTask.execute()
               }
    -          val sortIndexWriteTime = (System.currentTimeMillis() - t4)
    -          CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicShuffleAndWriteTime()
    +          val sortIndexWriteTime = System.currentTimeMillis() - t4
    +          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
               // After sortIndex writing, update dictionaryMeta
               dictWriteTask.updateMetaData()
               // clear the value buffer after writing dictionary data
               valuesBuffer.clear
               org.apache.carbondata.core.util.CarbonUtil
    --- End diff --
    
    remove package name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84405181
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---
    @@ -839,10 +841,10 @@ object GlobalDictionaryUtil extends Logging {
               headers = headers.map(headerName => headerName.trim)
               // prune columns according to the CSV file header, dimension columns
               val (requireDimension, requireColumnNames) =
    -            pruneDimensions(dimensions, headers, headers)
    +          pruneDimensions(dimensions, headers, headers)
    --- End diff --
    
    incorrect indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84404900
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---
    @@ -267,10 +267,10 @@ object GlobalDictionaryUtil extends Logging {
       }
     
       def isHighCardinalityColumn(columnCardinality: Int,
    -                              rowCount: Long,
    -                              model: DictionaryLoadModel): Boolean = {
    +      rowCount: Long,
    +      model: DictionaryLoadModel): Boolean = {
         (columnCardinality > model.highCardThreshold) && (rowCount > 0) &&
    --- End diff --
    
    move `(rowCount > 0)` to next line so that every condition is one line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by PKOfficial <gi...@git.apache.org>.
Github user PKOfficial closed the pull request at:

    https://github.com/apache/incubator-carbondata/pull/248


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84404263
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -1114,36 +1121,36 @@ object CarbonDataRDDFactory extends Logging {
     
           if (isUpdationRequired) {
             try {
    -        // Update load metadate file after cleaning deleted nodes
    -        if (carbonTableStatusLock.lockWithRetries()) {
    -          logger.info("Table status lock has been successfully acquired.")
    +          // Update load metadate file after cleaning deleted nodes
    +          if (carbonTableStatusLock.lockWithRetries()) {
    +            logger.info("Table status lock has been successfully acquired.")
     
    -          // read latest table status again.
    -          val latestMetadata = segmentStatusManager
    -            .readLoadMetadata(loadMetadataFilePath)
    +            // read latest table status again.
    +            val latestMetadata = segmentStatusManager
    +              .readLoadMetadata(loadMetadataFilePath)
     
    -          // update the metadata details from old to new status.
    +            // update the metadata details from old to new status.
     
    -          val latestStatus = CarbonLoaderUtil
    -            .updateLoadMetadataFromOldToNew(details, latestMetadata)
    +            val latestStatus = CarbonLoaderUtil
    --- End diff --
    
    in this case, move `CarbonLoaderUtil` to next line, do not break the line between object and function


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84400885
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -439,42 +436,42 @@ class DataFileLoaderRDD[K, V](
       }
     
       override def getPreferredLocations(split: Partition): Seq[String] = {
    -    isTableSplitPartition match {
    -      case true =>
    -        // for table split partition
    -        val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
    -        val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
    -        location
    -      case false =>
    -        // for node partition
    -        val theSplit = split.asInstanceOf[CarbonNodePartition]
    -        val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
    -        logInfo("Preferred Location for split : " + firstOptionLocation(0))
    -        val blockMap = new util.LinkedHashMap[String, Integer]()
    -        val tableBlocks = theSplit.blocksDetails
    -        tableBlocks.foreach(tableBlock => tableBlock.getLocations.foreach(
    -          location => {
    -            if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
    -              val currentCount = blockMap.get(location)
    -              if (currentCount == null) {
    -                blockMap.put(location, 1)
    -              } else {
    -                blockMap.put(location, currentCount + 1)
    -              }
    +    if (isTableSplitPartition) {
    +      // for table split partition
    +      val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
    +      val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
    +      location
    +    } else {
    +      // for node partition
    +      val theSplit = split.asInstanceOf[CarbonNodePartition]
    +      val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
    +      logInfo("Preferred Location for split : " + firstOptionLocation.head)
    +      val blockMap = new util.LinkedHashMap[String, Integer]()
    +      val tableBlocks = theSplit.blocksDetails
    +      tableBlocks.foreach(tableBlock => tableBlock.getLocations.foreach(
    +        location => {
    +          if (!firstOptionLocation.exists(location.equalsIgnoreCase)) {
    +            val currentCount = blockMap.get(location)
    +            if (currentCount == null) {
    +              blockMap.put(location, 1)
    +            } else {
    +              blockMap.put(location, currentCount + 1)
                 }
               }
    -        )
    -        )
    -
    -        val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
    -          nodeCount1.getValue > nodeCount2.getValue
             }
    -        )
    +      )
    +      )
    --- End diff --
    
    indentation is not correct


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84401697
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -505,129 +512,129 @@ object CarbonDataRDDFactory extends Logging {
               )
         }
     
    -      val compactionThread = new Thread {
    -        override def run(): Unit = {
    +    val compactionThread = new Thread {
    +      override def run(): Unit = {
     
    +        try {
    +          // compaction status of the table which is triggered by the user.
    +          var triggeredCompactionStatus = false
    +          var exception: Exception = null
               try {
    -            // compaction status of the table which is triggered by the user.
    -            var triggeredCompactionStatus = false
    -            var exception : Exception = null
    -            try {
    -              executeCompaction(carbonLoadModel: CarbonLoadModel,
    -                hdfsStoreLocation: String,
    -                compactionModel: CompactionModel,
    -                partitioner: Partitioner,
    -                executor, sqlContext, kettleHomePath, storeLocation
    +            executeCompaction(carbonLoadModel: CarbonLoadModel,
    +              hdfsStoreLocation: String,
    +              compactionModel: CompactionModel,
    +              partitioner: Partitioner,
    +              executor, sqlContext, kettleHomePath, storeLocation
    +            )
    +            triggeredCompactionStatus = true
    +          }
    +          catch {
    +            case e: Exception =>
    +              logger.error("Exception in compaction thread " + e.getMessage)
    +              exception = e
    +          }
    +          // continue in case of exception also, check for all the tables.
    +          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
    +            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
    +              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
    +            ).equalsIgnoreCase("true")
    +
    +          if (!isConcurrentCompactionAllowed) {
    +            logger.info("System level compaction lock is enabled.")
    +            val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
    +            var tableForCompaction = CarbonCompactionUtil
    +              .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    +                .tablesMeta.toArray, skipCompactionTables.toList.asJava
                   )
    -              triggeredCompactionStatus = true
    -            }
    -            catch {
    -              case e: Exception =>
    -                logger.error("Exception in compaction thread " + e.getMessage)
    -                exception = e
    -            }
    -            // continue in case of exception also, check for all the tables.
    -            val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
    -              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
    -                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
    -              ).equalsIgnoreCase("true")
    -
    -            if (!isConcurrentCompactionAllowed) {
    -              logger.info("System level compaction lock is enabled.")
    -              val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
    -              var tableForCompaction = CarbonCompactionUtil
    -                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    -                  .tablesMeta.toArray, skipCompactionTables.toList.asJava
    +            while (null != tableForCompaction) {
    +              logger
    +                .info("Compaction request has been identified for table " + tableForCompaction
    +                  .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
    +                        .getTableName
                     )
    -              while (null != tableForCompaction) {
    -                logger
    -                  .info("Compaction request has been identified for table " + tableForCompaction
    -                    .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
    -                          .getTableName
    -                  )
    -                val table: CarbonTable = tableForCompaction.carbonTable
    -                val metadataPath = table.getMetaDataFilepath
    -                val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
    -
    -                val newCarbonLoadModel = new CarbonLoadModel()
    -                prepareCarbonLoadModel(hdfsStoreLocation, table, newCarbonLoadModel)
    -                val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
    -                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
    -                    newCarbonLoadModel.getTableName
    -                  )
    -
    -                val compactionSize = CarbonDataMergerUtil
    -                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
    -
    -                val newcompactionModel = CompactionModel(compactionSize,
    -                  compactionType,
    -                  table,
    -                  tableCreationTime,
    -                  compactionModel.isDDLTrigger
    +              val table: CarbonTable = tableForCompaction.carbonTable
    +              val metadataPath = table.getMetaDataFilepath
    +              val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
    +
    +              val newCarbonLoadModel = new CarbonLoadModel()
    +              prepareCarbonLoadModel(hdfsStoreLocation, table, newCarbonLoadModel)
    +              val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
    +                .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
    +                  newCarbonLoadModel.getTableName
    +                )
    +
    +              val compactionSize = CarbonDataMergerUtil
    +                .getCompactionSize(CompactionType.MAJOR_COMPACTION)
    +
    +              val newcompactionModel = CompactionModel(compactionSize,
    +                compactionType,
    +                table,
    +                tableCreationTime,
    +                compactionModel.isDDLTrigger
    +              )
    +              // proceed for compaction
    +              try {
    +                executeCompaction(newCarbonLoadModel,
    +                  newCarbonLoadModel.getStorePath,
    +                  newcompactionModel,
    +                  partitioner,
    +                  executor, sqlContext, kettleHomePath, storeLocation
                     )
    -                // proceed for compaction
    -                try {
    -                  executeCompaction(newCarbonLoadModel,
    -                    newCarbonLoadModel.getStorePath,
    -                    newcompactionModel,
    -                    partitioner,
    -                    executor, sqlContext, kettleHomePath, storeLocation
    -                  )
    -                }
    -                catch {
    -                  case e: Exception =>
    -                    logger.error("Exception in compaction thread for table " + tableForCompaction
    -                      .carbonTable.getDatabaseName + "." +
    -                                 tableForCompaction.carbonTableIdentifier
    -                                   .getTableName)
    -                  // not handling the exception. only logging as this is not the table triggered
    -                  // by user.
    -                }
    -                finally {
    -                  // delete the compaction required file in case of failure or success also.
    -                  if (!CarbonCompactionUtil
    -                    .deleteCompactionRequiredFile(metadataPath, compactionType)) {
    -                    // if the compaction request file is not been able to delete then
    -                    // add those tables details to the skip list so that it wont be considered next.
    -                    skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
    -                    logger
    -                      .error("Compaction request file can not be deleted for table " +
    -                             tableForCompaction
    -                               .carbonTable.getDatabaseName + "." + tableForCompaction
    -                               .carbonTableIdentifier
    -                               .getTableName
    -                      )
    -
    -                  }
    -                }
    -                // ********* check again for all the tables.
    -                tableForCompaction = CarbonCompactionUtil
    -                  .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    -                    .tablesMeta.toArray, skipCompactionTables.asJava
    -                  )
                   }
    -              // giving the user his error for telling in the beeline if his triggered table
    -              // compaction is failed.
    -              if (!triggeredCompactionStatus) {
    -                throw new Exception("Exception in compaction " + exception.getMessage)
    +              catch {
    +                case e: Exception =>
    +                  logger.error("Exception in compaction thread for table " + tableForCompaction
    +                    .carbonTable.getDatabaseName + "." +
    +                               tableForCompaction.carbonTableIdentifier
    +                                 .getTableName)
    +                // not handling the exception. only logging as this is not the table triggered
    +                // by user.
                   }
    +              finally {
    --- End diff --
    
    previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84401276
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -189,15 +195,15 @@ object CarbonDataRDDFactory extends Logging {
           }
           hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
           logInfo("totalInputSpaceConsumed : " + spaceConsumed +
    -        " , defaultParallelism : " + defaultParallelism)
    +              " , defaultParallelism : " + defaultParallelism)
    --- End diff --
    
    change to `s"total... $spaceConsumed ... "` instead of string concat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84404806
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---
    @@ -49,7 +49,7 @@ object CommonUtil {
               "Column group doesn't support Timestamp datatype:" + x)
           }
           // if invalid column is present
    -      else if (dims.filter { dim => dim.column.equalsIgnoreCase(x) }.isEmpty) {
    +      else if (!dims.exists { dim => dim.column.equalsIgnoreCase(x) }) {
    --- End diff --
    
    use `!dims.exists(_.column.equalsIgnoreCase(x))` instead of having `{` 
    modify all places


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84401600
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -505,129 +512,129 @@ object CarbonDataRDDFactory extends Logging {
               )
         }
     
    -      val compactionThread = new Thread {
    -        override def run(): Unit = {
    +    val compactionThread = new Thread {
    +      override def run(): Unit = {
     
    --- End diff --
    
    remove empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84404368
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---
    @@ -409,10 +409,10 @@ class CarbonGlobalDictionaryGenerateRDD(
           } finally {
             if (!dictionaryForDistinctValueLookUpCleared) {
               org.apache.carbondata.core.util.CarbonUtil
    --- End diff --
    
    remove package name, modify all places


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84400383
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala ---
    @@ -259,29 +259,29 @@ object CarbonFilters {
                 Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
     
             case Not(In(a: Attribute, list))
    -         if !list.exists(!_.isInstanceOf[Literal]) =>
    -         if (list.exists(x => (isNullLiteral(x.asInstanceOf[Literal])))) {
    -          Some(new FalseExpression(transformExpression(a).get))
    -         }
    -        else {
    -          Some(new NotInExpression(transformExpression(a).get,
    +          if !list.exists(!_.isInstanceOf[Literal]) =>
    +          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
    +            Some(new FalseExpression(transformExpression(a).get))
    +          }
    +          else {
    +            Some(new NotInExpression(transformExpression(a).get,
                   new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
    -            }
    +          }
             case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
               Some(new InExpression(transformExpression(a).get,
                 new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
             case Not(In(Cast(a: Attribute, _), list))
               if !list.exists(!_.isInstanceOf[Literal]) =>
    -        /* if any illogical expression comes in NOT IN Filter like
    -         NOT IN('scala',NULL) this will be treated as false expression and will
    -         always return no result. */
    -          if (list.exists(x => (isNullLiteral(x.asInstanceOf[Literal])))) {
    -          Some(new FalseExpression(transformExpression(a).get))
    -         }
    -        else {
    -          Some(new NotInExpression(transformExpression(a).get, new ListExpression(
    +          /* if any illogical expression comes in NOT IN Filter like
    +           NOT IN('scala',NULL) this will be treated as false expression and will
    +           always return no result. */
    +          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
    +            Some(new FalseExpression(transformExpression(a).get))
    +          }
    +          else {
    --- End diff --
    
    put to previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84401666
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -505,129 +512,129 @@ object CarbonDataRDDFactory extends Logging {
               )
         }
     
    -      val compactionThread = new Thread {
    -        override def run(): Unit = {
    +    val compactionThread = new Thread {
    +      override def run(): Unit = {
     
    +        try {
    +          // compaction status of the table which is triggered by the user.
    +          var triggeredCompactionStatus = false
    +          var exception: Exception = null
               try {
    -            // compaction status of the table which is triggered by the user.
    -            var triggeredCompactionStatus = false
    -            var exception : Exception = null
    -            try {
    -              executeCompaction(carbonLoadModel: CarbonLoadModel,
    -                hdfsStoreLocation: String,
    -                compactionModel: CompactionModel,
    -                partitioner: Partitioner,
    -                executor, sqlContext, kettleHomePath, storeLocation
    +            executeCompaction(carbonLoadModel: CarbonLoadModel,
    +              hdfsStoreLocation: String,
    +              compactionModel: CompactionModel,
    +              partitioner: Partitioner,
    +              executor, sqlContext, kettleHomePath, storeLocation
    +            )
    +            triggeredCompactionStatus = true
    +          }
    +          catch {
    --- End diff --
    
    move to previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84401743
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -505,129 +512,129 @@ object CarbonDataRDDFactory extends Logging {
               )
         }
     
    -      val compactionThread = new Thread {
    -        override def run(): Unit = {
    +    val compactionThread = new Thread {
    +      override def run(): Unit = {
     
    +        try {
    +          // compaction status of the table which is triggered by the user.
    +          var triggeredCompactionStatus = false
    +          var exception: Exception = null
               try {
    -            // compaction status of the table which is triggered by the user.
    -            var triggeredCompactionStatus = false
    -            var exception : Exception = null
    -            try {
    -              executeCompaction(carbonLoadModel: CarbonLoadModel,
    -                hdfsStoreLocation: String,
    -                compactionModel: CompactionModel,
    -                partitioner: Partitioner,
    -                executor, sqlContext, kettleHomePath, storeLocation
    +            executeCompaction(carbonLoadModel: CarbonLoadModel,
    +              hdfsStoreLocation: String,
    +              compactionModel: CompactionModel,
    +              partitioner: Partitioner,
    +              executor, sqlContext, kettleHomePath, storeLocation
    +            )
    +            triggeredCompactionStatus = true
    +          }
    +          catch {
    +            case e: Exception =>
    +              logger.error("Exception in compaction thread " + e.getMessage)
    +              exception = e
    +          }
    +          // continue in case of exception also, check for all the tables.
    +          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
    +            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
    +              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
    +            ).equalsIgnoreCase("true")
    +
    +          if (!isConcurrentCompactionAllowed) {
    +            logger.info("System level compaction lock is enabled.")
    +            val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
    +            var tableForCompaction = CarbonCompactionUtil
    +              .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    +                .tablesMeta.toArray, skipCompactionTables.toList.asJava
                   )
    -              triggeredCompactionStatus = true
    -            }
    -            catch {
    -              case e: Exception =>
    -                logger.error("Exception in compaction thread " + e.getMessage)
    -                exception = e
    -            }
    -            // continue in case of exception also, check for all the tables.
    -            val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
    -              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
    -                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
    -              ).equalsIgnoreCase("true")
    -
    -            if (!isConcurrentCompactionAllowed) {
    -              logger.info("System level compaction lock is enabled.")
    -              val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
    -              var tableForCompaction = CarbonCompactionUtil
    -                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    -                  .tablesMeta.toArray, skipCompactionTables.toList.asJava
    +            while (null != tableForCompaction) {
    +              logger
    +                .info("Compaction request has been identified for table " + tableForCompaction
    +                  .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
    +                        .getTableName
                     )
    -              while (null != tableForCompaction) {
    -                logger
    -                  .info("Compaction request has been identified for table " + tableForCompaction
    -                    .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
    -                          .getTableName
    -                  )
    -                val table: CarbonTable = tableForCompaction.carbonTable
    -                val metadataPath = table.getMetaDataFilepath
    -                val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
    -
    -                val newCarbonLoadModel = new CarbonLoadModel()
    -                prepareCarbonLoadModel(hdfsStoreLocation, table, newCarbonLoadModel)
    -                val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
    -                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
    -                    newCarbonLoadModel.getTableName
    -                  )
    -
    -                val compactionSize = CarbonDataMergerUtil
    -                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
    -
    -                val newcompactionModel = CompactionModel(compactionSize,
    -                  compactionType,
    -                  table,
    -                  tableCreationTime,
    -                  compactionModel.isDDLTrigger
    +              val table: CarbonTable = tableForCompaction.carbonTable
    +              val metadataPath = table.getMetaDataFilepath
    +              val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
    +
    +              val newCarbonLoadModel = new CarbonLoadModel()
    +              prepareCarbonLoadModel(hdfsStoreLocation, table, newCarbonLoadModel)
    +              val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
    +                .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
    +                  newCarbonLoadModel.getTableName
    +                )
    +
    +              val compactionSize = CarbonDataMergerUtil
    +                .getCompactionSize(CompactionType.MAJOR_COMPACTION)
    +
    +              val newcompactionModel = CompactionModel(compactionSize,
    +                compactionType,
    +                table,
    +                tableCreationTime,
    +                compactionModel.isDDLTrigger
    +              )
    +              // proceed for compaction
    +              try {
    +                executeCompaction(newCarbonLoadModel,
    +                  newCarbonLoadModel.getStorePath,
    +                  newcompactionModel,
    +                  partitioner,
    +                  executor, sqlContext, kettleHomePath, storeLocation
                     )
    -                // proceed for compaction
    -                try {
    -                  executeCompaction(newCarbonLoadModel,
    -                    newCarbonLoadModel.getStorePath,
    -                    newcompactionModel,
    -                    partitioner,
    -                    executor, sqlContext, kettleHomePath, storeLocation
    -                  )
    -                }
    -                catch {
    -                  case e: Exception =>
    -                    logger.error("Exception in compaction thread for table " + tableForCompaction
    -                      .carbonTable.getDatabaseName + "." +
    -                                 tableForCompaction.carbonTableIdentifier
    -                                   .getTableName)
    -                  // not handling the exception. only logging as this is not the table triggered
    -                  // by user.
    -                }
    -                finally {
    -                  // delete the compaction required file in case of failure or success also.
    -                  if (!CarbonCompactionUtil
    -                    .deleteCompactionRequiredFile(metadataPath, compactionType)) {
    -                    // if the compaction request file is not been able to delete then
    -                    // add those tables details to the skip list so that it wont be considered next.
    -                    skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
    -                    logger
    -                      .error("Compaction request file can not be deleted for table " +
    -                             tableForCompaction
    -                               .carbonTable.getDatabaseName + "." + tableForCompaction
    -                               .carbonTableIdentifier
    -                               .getTableName
    -                      )
    -
    -                  }
    -                }
    -                // ********* check again for all the tables.
    -                tableForCompaction = CarbonCompactionUtil
    -                  .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    -                    .tablesMeta.toArray, skipCompactionTables.asJava
    -                  )
                   }
    -              // giving the user his error for telling in the beeline if his triggered table
    -              // compaction is failed.
    -              if (!triggeredCompactionStatus) {
    -                throw new Exception("Exception in compaction " + exception.getMessage)
    +              catch {
    +                case e: Exception =>
    +                  logger.error("Exception in compaction thread for table " + tableForCompaction
    +                    .carbonTable.getDatabaseName + "." +
    +                               tableForCompaction.carbonTableIdentifier
    +                                 .getTableName)
    --- End diff --
    
    use `s" "` instead of string concat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84681004
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -196,30 +195,28 @@ class DataFileLoaderRDD[K, V](
       sc.setLocalProperty("spark.scheduler.pool", "DDL")
     
       override def getPartitions: Array[Partition] = {
    -    isTableSplitPartition match {
    -      case true =>
    -        // for table split partition
    -        var splits = Array[TableSplit]()
    -        if (carbonLoadModel.isDirectLoad) {
    -          splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
    -            partitioner.nodeList, partitioner.partitionCount)
    -        }
    -        else {
    -          splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
    -            carbonLoadModel.getTableName, null, partitioner)
    -        }
    +    if (isTableSplitPartition) {
    +      // for table split partition
    +      var splits = Array[TableSplit]()
    +      if (carbonLoadModel.isDirectLoad) {
    +        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
    +          partitioner.nodeList, partitioner.partitionCount)
    +      } else {
    +        splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
    +          carbonLoadModel.getTableName, null, partitioner)
    +      }
     
    -        splits.zipWithIndex.map {s =>
    -          // filter the same partition unique id, because only one will match, so get 0 element
    -          val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
    -            p._1 == s._1.getPartition.getUniqueID)(0)._2
    -          new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
    -        }
    -      case false =>
    -        // for node partition
    -        blocksGroupBy.zipWithIndex.map{b =>
    -          new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
    -        }
    +      splits.zipWithIndex.map { s =>
    --- End diff --
    
    instead of `s`, please give a more meaningful name, like `case (split, index)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84681517
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -188,16 +194,16 @@ object CarbonDataRDDFactory extends Logging {
             newSplitSize = CarbonCommonConstants.CARBON_16MB
           }
           hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
    -      logInfo("totalInputSpaceConsumed : " + spaceConsumed +
    -        " , defaultParallelism : " + defaultParallelism)
    -      logInfo("mapreduce.input.fileinputformat.split.maxsize : " + newSplitSize.toString)
    +      logInfo(s"totalInputSpaceConsumed : $spaceConsumed ," +
    --- End diff --
    
    remove space before `:` and make this log in one line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84400739
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -439,42 +436,42 @@ class DataFileLoaderRDD[K, V](
       }
     
       override def getPreferredLocations(split: Partition): Seq[String] = {
    -    isTableSplitPartition match {
    -      case true =>
    -        // for table split partition
    -        val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
    -        val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
    -        location
    -      case false =>
    -        // for node partition
    -        val theSplit = split.asInstanceOf[CarbonNodePartition]
    -        val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
    -        logInfo("Preferred Location for split : " + firstOptionLocation(0))
    -        val blockMap = new util.LinkedHashMap[String, Integer]()
    -        val tableBlocks = theSplit.blocksDetails
    -        tableBlocks.foreach(tableBlock => tableBlock.getLocations.foreach(
    -          location => {
    -            if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
    -              val currentCount = blockMap.get(location)
    -              if (currentCount == null) {
    -                blockMap.put(location, 1)
    -              } else {
    -                blockMap.put(location, currentCount + 1)
    -              }
    +    if (isTableSplitPartition) {
    +      // for table split partition
    +      val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
    +      val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
    +      location
    +    } else {
    +      // for node partition
    +      val theSplit = split.asInstanceOf[CarbonNodePartition]
    +      val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
    +      logInfo("Preferred Location for split : " + firstOptionLocation.head)
    +      val blockMap = new util.LinkedHashMap[String, Integer]()
    +      val tableBlocks = theSplit.blocksDetails
    +      tableBlocks.foreach(tableBlock => tableBlock.getLocations.foreach(
    --- End diff --
    
    move `tableBlock.getLocations.foreach` to next line, and give proper indentation.
    change `(` to `{` after foreach. the code standard is:
    ```
    xxx.foreach { x => 
      x.yyy
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84681761
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -277,44 +283,43 @@ object CarbonDataRDDFactory extends Logging {
                 compactionModel,
                 lock
               )
    -        }
    -        catch {
    -          case e : Exception =>
    -            logger.error("Exception in start compaction thread. " + e.getMessage)
    +        } catch {
    +          case e: Exception =>
    +            logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
                 lock.unlock()
             }
           }
           else {
             logger
               .audit("Not able to acquire the compaction lock for table " +
    -            s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
    +                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }"
               )
             logger
    -          .error("Not able to acquire the compaction lock for table " + carbonLoadModel
    -            .getDatabaseName + "." + carbonLoadModel.getTableName
    +          .error(s"Not able to acquire the compaction lock for table" +
    --- End diff --
    
    move to previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84404173
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -1043,32 +1050,32 @@ object CarbonDataRDDFactory extends Logging {
             CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
             logInfo("********clean up done**********")
             logger.audit(s"Data load is failed for " +
    -          s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
    +                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
             logWarning("Cannot write load metadata file as data load failed")
             throw new Exception(errorMessage)
           } else {
    -          val metadataDetails = status(0)._2
    -          if (!isAgg) {
    -            val status = CarbonLoaderUtil
    -              .recordLoadMetadata(currentLoadCount,
    -                metadataDetails,
    -                carbonLoadModel,
    -                loadStatus,
    -                loadStartTime
    -              )
    -            if (!status) {
    -              val errorMessage = "Dataload failed due to failure in table status updation."
    -              logger.audit("Data load is failed for " +
    -                           s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
    -              logger.error("Dataload failed due to failure in table status updation.")
    -              throw new Exception(errorMessage)
    -            }
    -          } else if (!carbonLoadModel.isRetentionRequest) {
    -            // TODO : Handle it
    -            logInfo("********Database updated**********")
    +        val metadataDetails = status(0)._2
    +        if (!isAgg) {
    +          val status = CarbonLoaderUtil
    +            .recordLoadMetadata(currentLoadCount,
    --- End diff --
    
    move to previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84404075
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -505,129 +512,129 @@ object CarbonDataRDDFactory extends Logging {
               )
         }
     
    -      val compactionThread = new Thread {
    -        override def run(): Unit = {
    +    val compactionThread = new Thread {
    +      override def run(): Unit = {
     
    +        try {
    +          // compaction status of the table which is triggered by the user.
    +          var triggeredCompactionStatus = false
    +          var exception: Exception = null
               try {
    -            // compaction status of the table which is triggered by the user.
    -            var triggeredCompactionStatus = false
    -            var exception : Exception = null
    -            try {
    -              executeCompaction(carbonLoadModel: CarbonLoadModel,
    -                hdfsStoreLocation: String,
    -                compactionModel: CompactionModel,
    -                partitioner: Partitioner,
    -                executor, sqlContext, kettleHomePath, storeLocation
    +            executeCompaction(carbonLoadModel: CarbonLoadModel,
    +              hdfsStoreLocation: String,
    +              compactionModel: CompactionModel,
    +              partitioner: Partitioner,
    +              executor, sqlContext, kettleHomePath, storeLocation
    +            )
    +            triggeredCompactionStatus = true
    +          }
    +          catch {
    +            case e: Exception =>
    +              logger.error("Exception in compaction thread " + e.getMessage)
    +              exception = e
    +          }
    +          // continue in case of exception also, check for all the tables.
    +          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
    +            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
    +              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
    +            ).equalsIgnoreCase("true")
    +
    +          if (!isConcurrentCompactionAllowed) {
    +            logger.info("System level compaction lock is enabled.")
    +            val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
    +            var tableForCompaction = CarbonCompactionUtil
    +              .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    +                .tablesMeta.toArray, skipCompactionTables.toList.asJava
                   )
    -              triggeredCompactionStatus = true
    -            }
    -            catch {
    -              case e: Exception =>
    -                logger.error("Exception in compaction thread " + e.getMessage)
    -                exception = e
    -            }
    -            // continue in case of exception also, check for all the tables.
    -            val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
    -              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
    -                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
    -              ).equalsIgnoreCase("true")
    -
    -            if (!isConcurrentCompactionAllowed) {
    -              logger.info("System level compaction lock is enabled.")
    -              val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
    -              var tableForCompaction = CarbonCompactionUtil
    -                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
    -                  .tablesMeta.toArray, skipCompactionTables.toList.asJava
    +            while (null != tableForCompaction) {
    +              logger
    +                .info("Compaction request has been identified for table " + tableForCompaction
    --- End diff --
    
    please give proper format of log message. please modify all places


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84400353
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala ---
    @@ -259,29 +259,29 @@ object CarbonFilters {
                 Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
     
             case Not(In(a: Attribute, list))
    -         if !list.exists(!_.isInstanceOf[Literal]) =>
    -         if (list.exists(x => (isNullLiteral(x.asInstanceOf[Literal])))) {
    -          Some(new FalseExpression(transformExpression(a).get))
    -         }
    -        else {
    -          Some(new NotInExpression(transformExpression(a).get,
    +          if !list.exists(!_.isInstanceOf[Literal]) =>
    +          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
    +            Some(new FalseExpression(transformExpression(a).get))
    +          }
    +          else {
    --- End diff --
    
    put to previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84401103
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -175,8 +177,12 @@ object CarbonDataRDDFactory extends Logging {
       }
     
       def configSplitMaxSize(context: SparkContext, filePaths: String,
    -    hadoopConfiguration: Configuration): Unit = {
    -    val defaultParallelism = if (context.defaultParallelism < 1) 1 else context.defaultParallelism
    +      hadoopConfiguration: Configuration): Unit = {
    --- End diff --
    
    in this case, write in one line is ok, no need to modify. better approach is to use min function instead of if else


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84404993
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---
    @@ -413,12 +413,12 @@ object GlobalDictionaryUtil extends Logging {
         // update Metadata
         val catalog = CarbonEnv.getInstance(sqlContext).carbonCatalog
         catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
    -        model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
    +      model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
     
         // update CarbonDataLoadSchema
         val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
    -          model.table.getTableName)(sqlContext)
    -        .asInstanceOf[CarbonRelation].tableMeta.carbonTable
    +      model.table.getTableName)(sqlContext)
    +      .asInstanceOf[CarbonRelation].tableMeta.carbonTable
    --- End diff --
    
    move to previous line if not exceeding 100 characters


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #248: [CARBONDATA-328] Improve Code and Fi...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/248#discussion_r84404204
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---
    @@ -1114,36 +1121,36 @@ object CarbonDataRDDFactory extends Logging {
     
           if (isUpdationRequired) {
             try {
    -        // Update load metadate file after cleaning deleted nodes
    -        if (carbonTableStatusLock.lockWithRetries()) {
    -          logger.info("Table status lock has been successfully acquired.")
    +          // Update load metadate file after cleaning deleted nodes
    +          if (carbonTableStatusLock.lockWithRetries()) {
    +            logger.info("Table status lock has been successfully acquired.")
     
    -          // read latest table status again.
    -          val latestMetadata = segmentStatusManager
    -            .readLoadMetadata(loadMetadataFilePath)
    +            // read latest table status again.
    +            val latestMetadata = segmentStatusManager
    +              .readLoadMetadata(loadMetadataFilePath)
    --- End diff --
    
    move to previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---