You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/19 02:16:50 UTC

[1/4] incubator-carbondata git commit: Improved spark module code. * Removed some compliation warnings. * Replace pattern matching for boolean to IF-ELSE. * Improved code according to scala standards. * Removed unnecessary new lines. * Added string inter

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master c5176f31e -> 0a8e782ff


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index 0a35b21..89e1aa9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -40,20 +40,18 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
 
   override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {
 
-    val values = carbonRowInstance.getValues().toSeq.map { value =>
-      value match {
-        case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
-        case d: java.math.BigDecimal =>
-          val javaDecVal = new java.math.BigDecimal(d.toString())
-          val scalaDecVal = new scala.math.BigDecimal(javaDecVal)
-          val decConverter = new org.apache.spark.sql.types.Decimal()
-          decConverter.set(scalaDecVal)
-        case _ => value
-      }
+    val values = carbonRowInstance.getValues.toSeq.map {
+      case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
+      case d: java.math.BigDecimal =>
+        val javaDecVal = new java.math.BigDecimal(d.toString)
+        val scalaDecVal = new scala.math.BigDecimal(javaDecVal)
+        val decConverter = new org.apache.spark.sql.types.Decimal()
+        decConverter.set(scalaDecVal)
+      case value => value
     }
     try {
       val result = evaluateExpression(
-          new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
+        new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
       val sparkRes = if (isExecutor) {
         result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
       } else {
@@ -62,17 +60,16 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
       new ExpressionResult(CarbonScalaUtil.convertSparkToCarbonDataType(sparkExp.dataType),
         sparkRes
       )
-    }
-    catch {
-      case e: Exception => throw new FilterUnsupportedException(e.getMessage())
+    } catch {
+      case e: Exception => throw new FilterUnsupportedException(e.getMessage)
     }
   }
 
-  override def getFilterExpressionType(): ExpressionType = {
+  override def getFilterExpressionType: ExpressionType = {
     ExpressionType.UNKNOWN
   }
 
-  override def getString(): String = {
+  override def getString: String = {
     sparkExp.toString()
   }
 
@@ -81,46 +78,45 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
     isExecutor = true
   }
 
-  def getColumnList(): java.util.List[ColumnExpression] = {
+  def getColumnList: java.util.List[ColumnExpression] = {
 
     val lst = new java.util.ArrayList[ColumnExpression]()
     getColumnListFromExpressionTree(sparkExp, lst)
     lst
   }
-    def getLiterals(): java.util.List[ExpressionResult] = {
+  def getLiterals: java.util.List[ExpressionResult] = {
 
     val lst = new java.util.ArrayList[ExpressionResult]()
     lst
   }
 
-  def getAllColumnList(): java.util.List[ColumnExpression] = {
+  def getAllColumnList: java.util.List[ColumnExpression] = {
     val lst = new java.util.ArrayList[ColumnExpression]()
     getAllColumnListFromExpressionTree(sparkExp, lst)
     lst
   }
 
-  def isSingleDimension(): Boolean = {
+  def isSingleDimension: Boolean = {
     val lst = new java.util.ArrayList[ColumnExpression]()
     getAllColumnListFromExpressionTree(sparkExp, lst)
     if (lst.size == 1 && lst.get(0).isDimension) {
       true
-    }
-    else {
+    } else {
       false
     }
   }
 
   def getColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
-    list: java.util.List[ColumnExpression]): Unit = {
+      list: java.util.List[ColumnExpression]): Unit = {
     sparkCurrentExp match {
       case carbonBoundRef: CarbonBoundReference =>
         val foundExp = list.asScala
-          .find(p => p.getColumnName() == carbonBoundRef.colExp.getColumnName())
+          .find(p => p.getColumnName == carbonBoundRef.colExp.getColumnName)
         if (foundExp.isEmpty) {
           carbonBoundRef.colExp.setColIndex(list.size)
           list.add(carbonBoundRef.colExp)
         } else {
-          carbonBoundRef.colExp.setColIndex(foundExp.get.getColIndex())
+          carbonBoundRef.colExp.setColIndex(foundExp.get.getColIndex)
         }
       case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
     }
@@ -128,7 +124,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
 
 
   def getAllColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
-    list: List[ColumnExpression]): List[ColumnExpression] = {
+      list: List[ColumnExpression]): List[ColumnExpression] = {
     sparkCurrentExp match {
       case carbonBoundRef: CarbonBoundReference => list.add(carbonBoundRef.colExp)
       case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
@@ -136,13 +132,12 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
     list
   }
 
-  def isDirectDictionaryColumns(): Boolean = {
+  def isDirectDictionaryColumns: Boolean = {
     val lst = new ArrayList[ColumnExpression]()
     getAllColumnListFromExpressionTree(sparkExp, lst)
     if (lst.get(0).getCarbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
       true
-    }
-    else {
+    } else {
       false
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index ed757e3..a6b4ec5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -45,7 +45,8 @@ import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
 import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension,
+ColumnSchema}
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
@@ -61,7 +62,8 @@ import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil,
+GlobalDictionaryUtil}
 
 case class tableModel(
     ifNotExistsSet: Boolean,
@@ -166,18 +168,24 @@ case class NodeInfo(TaskId: String, noOfBlocks: Int)
 
 
 case class AlterTableModel(dbName: Option[String], tableName: String,
-  compactionType: String, alterSql: String)
+    compactionType: String, alterSql: String)
 
 case class CompactionModel(compactionSize: Long,
-  compactionType: CompactionType,
-  carbonTable: CarbonTable,
-  tableCreationTime: Long,
-  isDDLTrigger: Boolean)
+    compactionType: CompactionType,
+    carbonTable: CarbonTable,
+    tableCreationTime: Long,
+    isDDLTrigger: Boolean)
 
-case class CompactionCallableModel(storePath: String, carbonLoadModel: CarbonLoadModel,
-  partitioner: Partitioner, storeLocation: String, carbonTable: CarbonTable, kettleHomePath: String,
-  cubeCreationTime: Long, loadsToMerge: util.List[LoadMetadataDetails], sqlContext: SQLContext,
-  compactionType: CompactionType)
+case class CompactionCallableModel(storePath: String,
+    carbonLoadModel: CarbonLoadModel,
+    partitioner: Partitioner,
+    storeLocation: String,
+    carbonTable: CarbonTable,
+    kettleHomePath: String,
+    cubeCreationTime: Long,
+    loadsToMerge: util.List[LoadMetadataDetails],
+    sqlContext: SQLContext,
+    compactionType: CompactionType)
 
 object TableNewProcessor {
   def apply(cm: tableModel, sqlContext: SQLContext): TableInfo = {
@@ -189,6 +197,7 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
 
   var index = 0
   var rowGroup = 0
+
   def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
     var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
     fieldChildren.foreach(fields => {
@@ -294,12 +303,12 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
     // Its based on the dimension name and measure name
     allColumns.groupBy(_.getColumnName).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Duplicate column found with name : $name")
+      LOGGER.error(s"Duplicate column found with name: $name")
       LOGGER.audit(
         s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName}" +
-        s"Duplicate column found with name : $name")
-      sys.error(s"Duplicate dimensions found with name : $name")
+        s"for ${ cm.databaseName }.${ cm.tableName }" +
+        s"Duplicate column found with name: $name")
+      sys.error(s"Duplicate dimensions found with name: $name")
     })
 
     val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
@@ -314,14 +323,11 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
     for (column <- allColumns) {
       if (highCardinalityDims.contains(column.getColumnName)) {
         newOrderedDims += column
-      }
-      else if (column.isComplex) {
+      } else if (column.isComplex) {
         complexDims += column
-      }
-      else if (column.isDimensionColumn) {
+      } else if (column.isDimensionColumn) {
         newOrderedDims += column
-      }
-      else {
+      } else {
         measures += column
       }
 
@@ -333,7 +339,7 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
       // When the column is measure or the specified no inverted index column in DDL,
       // set useInvertedIndex to false, otherwise true.
       if (noInvertedIndexCols.contains(column.getColumnName) ||
-        cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
+          cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
         column.setUseInvertedIndex(false)
       } else {
         column.setUseInvertedIndex(true)
@@ -378,25 +384,22 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
             Partitioner(
               "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
               Array(""), part.partitionCount, null)
-          }
-          else {
+          } else {
             // case where partition cols are set and partition class is not set.
             // so setting the default value.
             Partitioner(
               "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
               part.partitionColumn, part.partitionCount, null)
           }
-        }
-        else if (definedpartCols.nonEmpty) {
+        } else if (definedpartCols.nonEmpty) {
           val msg = definedpartCols.mkString(", ")
-          LOGGER.error(s"partition columns specified are not part of Dimension columns : $msg")
+          LOGGER.error(s"partition columns specified are not part of Dimension columns: $msg")
           LOGGER.audit(
             s"Validation failed for Create/Alter Table Operation for " +
-              s"${cm.databaseName}.${cm.tableName} " +
-            s"partition columns specified are not part of Dimension columns : $msg")
-          sys.error(s"partition columns specified are not part of Dimension columns : $msg")
-        }
-        else {
+            s"${ cm.databaseName }.${ cm.tableName } " +
+            s"partition columns specified are not part of Dimension columns: $msg")
+          sys.error(s"partition columns specified are not part of Dimension columns: $msg")
+        } else {
 
           try {
             Class.forName(part.partitionClass).newInstance()
@@ -405,9 +408,9 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
               val cl = part.partitionClass
               LOGGER.audit(
                 s"Validation failed for Create/Alter Table Operation for " +
-                  s"${cm.databaseName}.${cm.tableName} " +
-                s"partition class specified can not be found or loaded : $cl")
-              sys.error(s"partition class specified can not be found or loaded : $cl")
+                s"${ cm.databaseName }.${ cm.tableName } " +
+                s"partition class specified can not be found or loaded: $cl")
+              sys.error(s"partition class specified can not be found or loaded: $cl")
           }
 
           Partitioner(part.partitionClass, columnBuffer.toArray, part.partitionCount, null)
@@ -578,42 +581,42 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
     // Its based on the dimension name and measure name
     levels.groupBy(_.name).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Duplicate dimensions found with name : $name")
+      LOGGER.error(s"Duplicate dimensions found with name: $name")
       LOGGER.audit(
-        s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName} " +
-        s"Duplicate dimensions found with name : $name")
-      sys.error(s"Duplicate dimensions found with name : $name")
+        "Validation failed for Create/Alter Table Operation " +
+        s"for ${ cm.databaseName }.${ cm.tableName } " +
+        s"Duplicate dimensions found with name: $name")
+      sys.error(s"Duplicate dimensions found with name: $name")
     })
 
     levels.groupBy(_.column).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Duplicate dimensions found with column name : $name")
+      LOGGER.error(s"Duplicate dimensions found with column name: $name")
       LOGGER.audit(
-        s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName} " +
-        s"Duplicate dimensions found with column name : $name")
-      sys.error(s"Duplicate dimensions found with column name : $name")
+        "Validation failed for Create/Alter Table Operation " +
+        s"for ${ cm.databaseName }.${ cm.tableName } " +
+        s"Duplicate dimensions found with column name: $name")
+      sys.error(s"Duplicate dimensions found with column name: $name")
     })
 
     measures.groupBy(_.name).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Duplicate measures found with name : $name")
+      LOGGER.error(s"Duplicate measures found with name: $name")
       LOGGER.audit(
         s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName} " +
-        s"Duplicate measures found with name : $name")
-      sys.error(s"Duplicate measures found with name : $name")
+        s"for ${ cm.databaseName }.${ cm.tableName } " +
+        s"Duplicate measures found with name: $name")
+      sys.error(s"Duplicate measures found with name: $name")
     })
 
     measures.groupBy(_.column).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Duplicate measures found with column name : $name")
+      LOGGER.error(s"Duplicate measures found with column name: $name")
       LOGGER.audit(
         s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName} " +
-        s"Duplicate measures found with column name : $name")
-      sys.error(s"Duplicate measures found with column name : $name")
+        s"for ${ cm.databaseName }.${ cm.tableName } " +
+        s"Duplicate measures found with column name: $name")
+      sys.error(s"Duplicate measures found with column name: $name")
     })
 
     val levelsArray = levels.map(_.name)
@@ -625,7 +628,7 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
         LOGGER.error(s"Aggregator should not be defined for dimension fields [$fault]")
         LOGGER.audit(
           s"Validation failed for Create/Alter Table Operation for " +
-            s"${cm.databaseName}.${cm.tableName} " +
+          s"${ cm.databaseName }.${ cm.tableName } " +
           s"Aggregator should not be defined for dimension fields [$fault]")
         sys.error(s"Aggregator should not be defined for dimension fields [$fault]")
       }
@@ -633,12 +636,12 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
 
     levelsNdMesures.groupBy(x => x).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Dimension and Measure defined with same name : $name")
+      LOGGER.error(s"Dimension and Measure defined with same name: $name")
       LOGGER.audit(
         s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName} " +
-        s"Dimension and Measure defined with same name : $name")
-      sys.error(s"Dimension and Measure defined with same name : $name")
+        s"for ${ cm.databaseName }.${ cm.tableName } " +
+        s"Dimension and Measure defined with same name: $name")
+      sys.error(s"Dimension and Measure defined with same name: $name")
     })
 
     dimSrcDimensions.foreach(d => {
@@ -677,8 +680,7 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
           val matchedMapping = aggs.filter(agg => f.name.equals(agg.msrName))
           if (matchedMapping.isEmpty) {
             f
-          }
-          else {
+          } else {
             Measure(f.name, f.column, f.dataType, matchedMapping.head.aggType)
           }
         }
@@ -708,17 +710,14 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
           Partitioner(
             "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
             Array(""), part.partitionCount, null)
-        }
-        else if (definedpartCols.nonEmpty) {
+        } else if (definedpartCols.nonEmpty) {
           val msg = definedpartCols.mkString(", ")
-          LOGGER.error(s"partition columns specified are not part of Dimension columns : $msg")
+          LOGGER.error(s"partition columns specified are not part of Dimension columns: $msg")
           LOGGER.audit(
             s"Validation failed for Create/Alter Table Operation - " +
-            s"partition columns specified are not part of Dimension columns : $msg")
-          sys.error(s"partition columns specified are not part of Dimension columns : $msg")
-        }
-        else {
-
+            s"partition columns specified are not part of Dimension columns: $msg")
+          sys.error(s"partition columns specified are not part of Dimension columns: $msg")
+        } else {
           try {
             Class.forName(part.partitionClass).newInstance()
           } catch {
@@ -726,9 +725,9 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
               val cl = part.partitionClass
               LOGGER.audit(
                 s"Validation failed for Create/Alter Table Operation for " +
-                  s"${cm.databaseName}.${cm.tableName} " +
-                s"partition class specified can not be found or loaded : $cl")
-              sys.error(s"partition class specified can not be found or loaded : $cl")
+                s"${ cm.databaseName }.${ cm.tableName } " +
+                s"partition class specified can not be found or loaded: $cl")
+              sys.error(s"partition class specified can not be found or loaded: $cl")
           }
 
           Partitioner(part.partitionClass, columnBuffer.toArray, part.partitionCount, null)
@@ -782,8 +781,8 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
     val databaseName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext)
     if (null == org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
       .getCarbonTable(databaseName + "_" + tableName)) {
-      logError("alter table failed. table not found: " + databaseName + "." + tableName)
-      sys.error("alter table failed. table not found: " + databaseName + "." + tableName)
+      logError(s"alter table failed. table not found: $databaseName.$tableName")
+      sys.error(s"alter table failed. table not found: $databaseName.$tableName")
     }
 
     val relation =
@@ -824,17 +823,14 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
           kettleHomePath,
           storeLocation
         )
-    }
-    catch {
+    } catch {
       case e: Exception =>
         if (null != e.getMessage) {
-          sys.error("Compaction failed. Please check logs for more info." + e.getMessage)
-        }
-        else {
+          sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
+        } else {
           sys.error("Exception in compaction. Please check logs for more info.")
         }
     }
-
     Seq.empty
   }
 }
@@ -861,9 +857,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
           s"Table [$tbName] already exists under database [$dbName]")
         sys.error(s"Table [$tbName] already exists under database [$dbName]")
       }
-    }
-    else {
-
+    } else {
       // Add Database to catalog and persist
       val catalog = CarbonEnv.getInstance(sqlContext).carbonCatalog
       // Need to fill partitioner class when we support partition
@@ -872,7 +866,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
         sqlContext.sql(
           s"""CREATE TABLE $dbName.$tbName USING carbondata""" +
           s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """)
-              .collect
+          .collect
       } catch {
         case e: Exception =>
           val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
@@ -882,7 +876,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
             .dropTable(catalog.storePath, identifier)(sqlContext)
 
           LOGGER.audit(s"Table creation with Database name [$dbName] " +
-            s"and Table name [$tbName] failed")
+                       s"and Table name [$tbName] failed")
           throw e
       }
 
@@ -940,8 +934,8 @@ private[sql] case class DeleteLoadsById(
         LOGGER.audit(s"Delete segment by Id is successfull for $databaseName.$tableName.")
       }
       else {
-        sys.error("Delete segment by Id is failed. Invalid ID is :"
-                  + invalidLoadIds.mkString(","))
+        sys.error("Delete segment by Id is failed. Invalid ID is:" +
+                  s" ${ invalidLoadIds.mkString(",") }")
       }
     } catch {
       case ex: Exception =>
@@ -963,10 +957,10 @@ private[sql] case class DeleteLoadsById(
 }
 
 private[sql] case class DeleteLoadsByLoadDate(
-   databaseNameOp: Option[String],
-  tableName: String,
-  dateField: String,
-  loadDate: String) extends RunnableCommand {
+    databaseNameOp: Option[String],
+    tableName: String,
+    dateField: String,
+    loadDate: String) extends RunnableCommand {
 
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.tablemodel.tableSchema")
 
@@ -980,12 +974,12 @@ private[sql] case class DeleteLoadsByLoadDate(
     if (relation == null) {
       LOGGER
         .audit(s"Delete segment by load date is failed. Table $dbName.$tableName does not " +
-         s"exist")
+               s"exist")
       sys.error(s"Table $dbName.$tableName does not exist")
     }
 
     val timeObj = Cast(Literal(loadDate), TimestampType).eval()
-    if(null == timeObj) {
+    if (null == timeObj) {
       val errorMessage = "Error: Invalid load start time format " + loadDate
       throw new MalformedCarbonCommandException(errorMessage)
     }
@@ -1037,20 +1031,20 @@ case class LoadTable(
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
     val identifier = TableIdentifier(tableName, Option(dbName))
     if (isOverwriteExist) {
-      sys.error("Overwrite is not supported for carbon table with " + dbName + "." + tableName)
+      sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
     }
     if (null == org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
       .getCarbonTable(dbName + "_" + tableName)) {
-      logError("Data loading failed. table not found: " + dbName + "." + tableName)
-      LOGGER.audit("Data loading failed. table not found: " + dbName + "." + tableName)
-      sys.error("Data loading failed. table not found: " + dbName + "." + tableName)
+      logError(s"Data loading failed. table not found: $dbName.$tableName")
+      LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+      sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
 
     val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
-        .lookupRelation1(Option(dbName), tableName)(sqlContext)
-        .asInstanceOf[CarbonRelation]
+      .lookupRelation1(Option(dbName), tableName)(sqlContext)
+      .asInstanceOf[CarbonRelation]
     if (relation == null) {
-        sys.error(s"Table $dbName.$tableName does not exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
     }
     CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
     val carbonLock = CarbonLockFactory
@@ -1066,8 +1060,13 @@ case class LoadTable(
         sys.error("Table is locked for updation. Please try after some time")
       }
 
-      val factPath = if (dataFrame.isDefined) "" else FileUtils.getPaths(
-        CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
+      val factPath = if (dataFrame.isDefined) {
+        ""
+      }
+      else {
+        FileUtils.getPaths(
+          CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
+      }
       val carbonLoadModel = new CarbonLoadModel()
       carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
       carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
@@ -1127,7 +1126,8 @@ case class LoadTable(
         case "false" => false
         case illegal =>
           val errorMessage = "Illegal syntax found: [" + illegal + "] .The value multiline in " +
-            "load DDL which you set can only be 'true' or 'false', please check your input DDL."
+                             "load DDL which you set can only be 'true' or 'false', please check " +
+                             "your input DDL."
           throw new MalformedCarbonCommandException(errorMessage)
       }
       val maxColumns = options.getOrElse("maxcolumns", null)
@@ -1165,8 +1165,7 @@ case class LoadTable(
         // First system has to partition the data first and then call the load data
         if (null == relation.tableMeta.partitioner.partitionColumn ||
             relation.tableMeta.partitioner.partitionColumn(0).isEmpty) {
-          LOGGER.info("Initiating Direct Load for the Table : (" +
-                      dbName + "." + tableName + ")")
+          LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
           carbonLoadModel.setFactFilePath(factPath)
           carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
           carbonLoadModel.setCsvHeader(fileHeader)
@@ -1185,14 +1184,12 @@ case class LoadTable(
             partitionStatus,
             useKettle,
             dataFrame)
-      }
-      catch {
+      } catch {
         case ex: Exception =>
           LOGGER.error(ex)
           LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
           throw ex
-      }
-      finally {
+      } finally {
         // Once the data load is successful delete the unwanted partition files
         try {
           val fileType = FileFactory.getFileType(partitionLocation)
@@ -1205,7 +1202,7 @@ case class LoadTable(
           case ex: Exception =>
             LOGGER.error(ex)
             LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
-              "Problem deleting the partition folder")
+                         "Problem deleting the partition folder")
             throw ex
         }
 
@@ -1229,12 +1226,12 @@ case class LoadTable(
     Seq.empty
   }
 
-  private  def validateDateFormat(dateFormat: String, table: CarbonTable): Unit = {
+  private def validateDateFormat(dateFormat: String, table: CarbonTable): Unit = {
     val dimensions = table.getDimensionByTableName(tableName).asScala
     if (dateFormat != null) {
       if (dateFormat.trim == "") {
         throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
-          "string.")
+                                                  "string.")
       } else {
         var dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
         for (singleDateFormat <- dateFormats) {
@@ -1242,11 +1239,13 @@ case class LoadTable(
           val columnName = dateFormatSplits(0).trim.toLowerCase
           if (!dimensions.exists(_.getColName.equals(columnName))) {
             throw new MalformedCarbonCommandException("Error: Wrong Column Name " +
-              dateFormatSplits(0) + " is provided in Option DateFormat.")
+                                                      dateFormatSplits(0) +
+                                                      " is provided in Option DateFormat.")
           }
           if (dateFormatSplits.length < 2 || dateFormatSplits(1).trim.isEmpty) {
             throw new MalformedCarbonCommandException("Error: Option DateFormat is not provided " +
-              "for " + "Column " + dateFormatSplits(0) + ".")
+                                                      "for " + "Column " + dateFormatSplits(0) +
+                                                      ".")
           }
         }
       }
@@ -1279,8 +1278,7 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
       CarbonEnv.getInstance(sqlContext).carbonCatalog.dropTable(storePath, identifier)(sqlContext)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
-    }
-    finally {
+    } finally {
       if (carbonLock != null && isLocked) {
         if (carbonLock.unlock()) {
           logInfo("Table MetaData Unlocked Successfully after dropping the table")
@@ -1293,7 +1291,7 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
             CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
           }
           // delete bad record log after drop table
-          val badLogPath = CarbonUtil.getBadLogPath(dbName +  File.separator + tableName)
+          val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
           val badLogFileType = FileFactory.getFileType(badLogPath)
           if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
             val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
@@ -1353,8 +1351,7 @@ private[sql] case class ShowLoads(
         try {
           val lim = Integer.parseInt(limitLoads)
           loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
-        }
-        catch {
+        } catch {
           case ex: NumberFormatException => sys.error(s" Entered limit is not a valid Number")
         }
 
@@ -1389,13 +1386,13 @@ private[sql] case class DescribeCommandFormatted(
     var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
       val comment = if (relation.metaData.dims.contains(field.name)) {
         val dimension = relation.metaData.carbonTable.getDimensionByName(
-            relation.tableMeta.carbonTableIdentifier.getTableName,
-            field.name)
+          relation.tableMeta.carbonTableIdentifier.getTableName,
+          field.name)
         if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
           val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
           colProps.append(field.name).append(".")
-          .append(mapper.writeValueAsString(dimension.getColumnProperties))
-          .append(",")
+            .append(mapper.writeValueAsString(dimension.getColumnProperties))
+            .append(",")
         }
         if (dimension.hasEncoding(Encoding.DICTIONARY) &&
             !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
@@ -1415,11 +1412,11 @@ private[sql] case class DescribeCommandFormatted(
       colProps.toString()
     }
     results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
-    results ++= Seq(("Database Name : ", relation.tableMeta.carbonTableIdentifier
+    results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
       .getDatabaseName, "")
     )
-    results ++= Seq(("Table Name : ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
-    results ++= Seq(("CARBON Store Path : ", relation.tableMeta.storePath, ""))
+    results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
+    results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
     val carbonTable = relation.tableMeta.carbonTable
     results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
     results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
@@ -1438,7 +1435,7 @@ private[sql] case class DescribeCommandFormatted(
 
   private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
     var results: Seq[(String, String, String)] =
-        Seq(("", "", ""), ("##Column Group Information", "", ""))
+      Seq(("", "", ""), ("##Column Group Information", "", ""))
     val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
       case (groupId, _) => groupId != -1
     }.toSeq.sortBy(_._1)
@@ -1447,7 +1444,7 @@ private[sql] case class DescribeCommandFormatted(
     })
     var index = 1
     groups.map { x =>
-      results = results:+(s"Column Group $index", x, "")
+      results = results :+ (s"Column Group $index", x, "")
       index = index + 1
     }
     results
@@ -1464,7 +1461,6 @@ private[sql] case class DeleteLoadByDate(
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   def run(sqlContext: SQLContext): Seq[Row] = {
-
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
     LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
     val identifier = TableIdentifier(tableName, Option(dbName))
@@ -1472,26 +1468,21 @@ private[sql] case class DeleteLoadByDate(
       .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
     var level: String = ""
     val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata
-         .getInstance().getCarbonTable(dbName + '_' + tableName)
+      .getInstance().getCarbonTable(dbName + '_' + tableName)
     if (relation == null) {
       LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
       sys.error(s"Table $dbName.$tableName does not exist")
     }
-
     val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
       filter => filter.name.equalsIgnoreCase(dateField) &&
                 filter.dataType.isInstanceOf[TimestampType]).toList
-
     if (matches.isEmpty) {
-      LOGGER.audit(
-        "The delete load by date is failed. " +
-        "Table $dbName.$tableName does not contain date field :" + dateField)
-      sys.error(s"Table $dbName.$tableName does not contain date field " + dateField)
-    }
-    else {
+      LOGGER.audit("The delete load by date is failed. " +
+                   s"Table $dbName.$tableName does not contain date field: $dateField")
+      sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
+    } else {
       level = matches.asJava.get(0).name
     }
-
     val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
       .getColName
     CarbonDataRDDFactory.deleteLoadByDate(
@@ -1507,6 +1498,7 @@ private[sql] case class DeleteLoadByDate(
     LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
     Seq.empty
   }
+
 }
 
 private[sql] case class CleanFiles(
@@ -1544,7 +1536,7 @@ private[sql] case class CleanFiles(
         relation.tableMeta.partitioner)
       LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.")
     } catch {
-      case ex : Exception =>
+      case ex: Exception =>
         sys.error(ex.getMessage)
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index d551e10..3fe62cc 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@ -55,9 +55,9 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
 case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
 
 case class CarbonMetaData(dims: Seq[String],
-  msrs: Seq[String],
-  carbonTable: CarbonTable,
-  dictionaryMap: DictionaryMap)
+    msrs: Seq[String],
+    carbonTable: CarbonTable,
+    dictionaryMap: DictionaryMap)
 
 case class TableMeta(carbonTableIdentifier: CarbonTableIdentifier, storePath: String,
     var carbonTable: CarbonTable, partitioner: Partitioner)
@@ -176,12 +176,12 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
       ZookeeperInit.getInstance(zookeeperUrl)
       LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
       var configuredLockType = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.LOCK_TYPE)
+        .getProperty(CarbonCommonConstants.LOCK_TYPE)
       if (null == configuredLockType) {
         configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
         CarbonProperties.getInstance
-            .addProperty(CarbonCommonConstants.LOCK_TYPE,
-                configuredLockType)
+          .addProperty(CarbonCommonConstants.LOCK_TYPE,
+            configuredLockType)
       }
     }
 
@@ -214,7 +214,7 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
             tableFolders.foreach(tableFolder => {
               if (tableFolder.isDirectory) {
                 val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
-                    tableFolder.getName, UUID.randomUUID().toString)
+                  tableFolder.getName, UUID.randomUUID().toString)
                 val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
                   carbonTableIdentifier)
                 val tableMetadataFile = carbonTablePath.getSchemaFilePath
@@ -260,22 +260,17 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
             })
           }
         })
-      }
-      else {
+      } else {
         // Create folders and files.
         FileFactory.mkdirs(databasePath, fileType)
-
       }
-    }
-    catch {
+    } catch {
       case s: java.io.FileNotFoundException =>
         // Create folders and files.
         FileFactory.mkdirs(databasePath, fileType)
-
     }
   }
 
-
   /**
    *
    * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
@@ -286,11 +281,9 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
       tableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo,
       dbName: String, tableName: String, partitioner: Partitioner)
     (sqlContext: SQLContext): String = {
-
     if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) {
       sys.error(s"Table [$tableName] already exists under Database [$dbName]")
     }
-
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val thriftTableInfo = schemaConverter
       .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
@@ -299,14 +292,13 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
       .add(schemaEvolutionEntry)
 
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-        tableInfo.getFactTable.getTableId)
+      tableInfo.getFactTable.getTableId)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
     tableInfo.setStorePath(storePath)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-
     val tableMeta = TableMeta(
       carbonTableIdentifier,
       storePath,
@@ -318,15 +310,13 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
       FileFactory.mkdirs(schemaMetadataPath, fileType)
     }
-
     val thriftWriter = new ThriftWriter(schemaFilePath, false)
     thriftWriter.open()
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
-
     metadata.tablesMeta += tableMeta
     logInfo(s"Table $tableName for Database $dbName created successfully.")
-    LOGGER.info("Table " + tableName + " for Database " + dbName + " created successfully.")
+    LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
     updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
     carbonTablePath.getPath
   }
@@ -392,8 +382,7 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
           if (c.carbonTableIdentifier.getDatabaseName.contains(name)) {
             c.carbonTableIdentifier
               .getDatabaseName
-          }
-          else {
+          } else {
             null
           }
         case _ => c.carbonTableIdentifier.getDatabaseName
@@ -420,8 +409,8 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
   def getAllTables()(sqlContext: SQLContext): Seq[TableIdentifier] = {
     checkSchemasModifiedTimeAndReloadTables()
     metadata.tablesMeta.map { c =>
-        TableIdentifier(c.carbonTableIdentifier.getTableName,
-          Some(c.carbonTableIdentifier.getDatabaseName))
+      TableIdentifier(c.carbonTableIdentifier.getTableName,
+        Some(c.carbonTableIdentifier.getDatabaseName))
     }
   }
 
@@ -526,7 +515,7 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
     if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
       if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
         getLastModifiedTime ==
-        tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
+            tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
         refreshCache()
       }
     }
@@ -636,18 +625,18 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
 object CarbonMetastoreTypes extends RegexParsers {
   protected lazy val primitiveType: Parser[DataType] =
     "string" ^^^ StringType |
-      "float" ^^^ FloatType |
-      "int" ^^^ IntegerType |
-      "tinyint" ^^^ ShortType |
-      "short" ^^^ ShortType |
-      "double" ^^^ DoubleType |
-      "long" ^^^ LongType |
-      "binary" ^^^ BinaryType |
-      "boolean" ^^^ BooleanType |
-      fixedDecimalType |
-      "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
-      "varchar\\((\\d+)\\)".r ^^^ StringType |
-      "timestamp" ^^^ TimestampType
+    "float" ^^^ FloatType |
+    "int" ^^^ IntegerType |
+    "tinyint" ^^^ ShortType |
+    "short" ^^^ ShortType |
+    "double" ^^^ DoubleType |
+    "long" ^^^ LongType |
+    "binary" ^^^ BinaryType |
+    "boolean" ^^^ BooleanType |
+    fixedDecimalType |
+    "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
+    "varchar\\((\\d+)\\)".r ^^^ StringType |
+    "timestamp" ^^^ TimestampType
 
   protected lazy val fixedDecimalType: Parser[DataType] =
     "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 81abbfb..0c13293 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.spark.load.CarbonLoaderUtil
 object DistributionUtil {
   @transient
   val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
+
   /*
    * This method will return the list of executers in the cluster.
    * For this we take the  memory status of all node with getExecutorMemoryStatus
@@ -62,13 +63,11 @@ object DistributionUtil {
           addr.getHostName
         }
         nodeNames.toArray
-      }
-      else {
+      } else {
         // For Standalone cluster, node IPs will be returned.
         nodelist.toArray
       }
-    }
-    else {
+    } else {
       Seq(InetAddress.getLocalHost.getHostName).toArray
     }
   }
@@ -111,37 +110,41 @@ object DistributionUtil {
    * @return
    */
   def ensureExecutorsAndGetNodeList(blockList: Array[Distributable],
-    sparkContext: SparkContext):
+      sparkContext: SparkContext):
   Array[String] = {
     val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
     var confExecutorsTemp: String = null
     if (sparkContext.getConf.contains("spark.executor.instances")) {
       confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
     } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
-      && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
-      .equalsIgnoreCase("true")) {
+               && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
+                 .equalsIgnoreCase("true")) {
       if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
         confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
       }
     }
 
-    val confExecutors = if (null != confExecutorsTemp) confExecutorsTemp.toInt else 1
+    val confExecutors = if (null != confExecutorsTemp) {
+      confExecutorsTemp.toInt
+    } else {
+      1
+    }
     val requiredExecutors = if (nodeMapping.size > confExecutors) {
       confExecutors
-    } else {nodeMapping.size()}
+    } else { nodeMapping.size() }
 
-    val startTime = System.currentTimeMillis();
+    val startTime = System.currentTimeMillis()
     CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
     var nodes = DistributionUtil.getNodeList(sparkContext)
-    var maxTimes = 30;
+    var maxTimes = 30
     while (nodes.length < requiredExecutors && maxTimes > 0) {
-      Thread.sleep(500);
+      Thread.sleep(500)
       nodes = DistributionUtil.getNodeList(sparkContext)
-      maxTimes = maxTimes - 1;
+      maxTimes = maxTimes - 1
     }
-    val timDiff = System.currentTimeMillis() - startTime;
-    LOGGER.info("Total Time taken to ensure the required executors : " + timDiff)
-    LOGGER.info("Time elapsed to allocate the required executors : " + (30 - maxTimes) * 500)
+    val timDiff = System.currentTimeMillis() - startTime
+    LOGGER.info(s"Total Time taken to ensure the required executors: $timDiff")
+    LOGGER.info(s"Time elapsed to allocate the required executors: ${ (30 - maxTimes) * 500 }")
     nodes.distinct
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 368a1ad..d60bed4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -75,7 +75,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
   def apply(plan: LogicalPlan): LogicalPlan = {
     if (relations.nonEmpty && !isOptimized(plan)) {
       LOGGER.info("Starting to optimize plan")
-      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("");
+      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
       val queryStatistic = new QueryStatistic()
       val result = transformCarbonPlan(plan, relations)
       queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ",
@@ -99,8 +99,8 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
   case class ExtraNodeInfo(var hasCarbonRelation: Boolean)
 
   def fillNodeInfo(
-       plan: LogicalPlan,
-       extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = {
+      plan: LogicalPlan,
+      extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = {
     plan match {
       case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
         val extraNodeInfo = ExtraNodeInfo(true)
@@ -465,9 +465,10 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
           decoder = true
           cd
         case currentPlan =>
-          hasCarbonRelation(currentPlan) match {
-            case true => addTempDecoder(currentPlan)
-            case false => currentPlan
+          if (hasCarbonRelation(currentPlan)) {
+            addTempDecoder(currentPlan)
+          } else {
+            currentPlan
           }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
index b683629..e755b2e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -29,19 +29,18 @@ object FileUtils extends Logging {
    * append all csv file path to a String, file path separated by comma
    */
   private def getPathsFromCarbonFile(carbonFile: CarbonFile, stringBuild: StringBuilder): Unit = {
-    carbonFile.isDirectory match {
-    case true =>
+    if (carbonFile.isDirectory) {
       val files = carbonFile.listFiles()
       for (j <- 0 until files.size) {
         getPathsFromCarbonFile(files(j), stringBuild)
       }
-    case false =>
+    } else {
       val path = carbonFile.getAbsolutePath
       val fileName = carbonFile.getName
       if (carbonFile.getSize == 0) {
         logWarning(s"skip empty input file: $path")
       } else if (fileName.startsWith(CarbonCommonConstants.UNDERSCORE) ||
-          fileName.startsWith(CarbonCommonConstants.POINT)) {
+                 fileName.startsWith(CarbonCommonConstants.POINT)) {
         logWarning(s"skip invisible input file: $path")
       } else {
         stringBuild.append(path.replace('\\', '/')).append(CarbonCommonConstants.COMMA)
@@ -71,7 +70,7 @@ object FileUtils extends Logging {
         stringBuild.substring(0, stringBuild.size - 1)
       } else {
         throw new DataLoadingException("Please check your input path and make sure " +
-          "that files end with '.csv' and content is not empty.")
+                                       "that files end with '.csv' and content is not empty.")
       }
     }
   }
@@ -90,4 +89,5 @@ object FileUtils extends Logging {
       size
     }
   }
+
 }


[2/4] incubator-carbondata git commit: Improved spark module code. * Removed some compliation warnings. * Replace pattern matching for boolean to IF-ELSE. * Improved code according to scala standards. * Removed unnecessary new lines. * Added string inter

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index b8f4087..1a7aaf6 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -32,13 +32,15 @@ import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TableBlockInfo, TableTaskInfo, TaskBlockInfo}
+import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties,
+TableBlockInfo, TableTaskInfo, TaskBlockInfo}
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
 import org.apache.carbondata.core.carbon.path.CarbonTablePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger}
+import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor,
+CarbonCompactionUtil, RowResultMerger}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.scan.result.iterator.RawResultIterator
@@ -49,11 +51,11 @@ import org.apache.carbondata.spark.util.QueryPlanUtil
 
 
 class CarbonMergerRDD[K, V](
-  sc: SparkContext,
-  result: MergeResult[K, V],
-  carbonLoadModel: CarbonLoadModel,
-  carbonMergerMapping : CarbonMergerMapping,
-  confExecutorsTemp: String)
+    sc: SparkContext,
+    result: MergeResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    carbonMergerMapping: CarbonMergerMapping,
+    confExecutorsTemp: String)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
@@ -66,37 +68,37 @@ class CarbonMergerRDD[K, V](
   val databaseName = carbonMergerMapping.databaseName
   val factTableName = carbonMergerMapping.factTableName
   val tableId = carbonMergerMapping.tableId
+
   override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
 
       carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
       val tempLocationKey: String = CarbonCommonConstants
-        .COMPACTION_KEY_WORD + '_' + carbonLoadModel
-        .getDatabaseName + '_' + carbonLoadModel
-        .getTableName + '_' + carbonLoadModel.getTaskNo
+                                      .COMPACTION_KEY_WORD + '_' + carbonLoadModel
+                                      .getDatabaseName + '_' + carbonLoadModel
+                                      .getTableName + '_' + carbonLoadModel.getTaskNo
 
       // this property is used to determine whether temp location for carbon is inside
       // container temp dir or is yarn application directory.
       val carbonUseLocalDir = CarbonProperties.getInstance()
         .getProperty("carbon.use.local.dir", "false")
 
-      if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+      if (carbonUseLocalDir.equalsIgnoreCase("true")) {
 
         val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
-        if (null != storeLocations && storeLocations.length > 0) {
+        if (null != storeLocations && storeLocations.nonEmpty) {
           storeLocation = storeLocations(Random.nextInt(storeLocations.length))
         }
         if (storeLocation == null) {
           storeLocation = System.getProperty("java.io.tmpdir")
         }
-      }
-      else {
+      } else {
         storeLocation = System.getProperty("java.io.tmpdir")
       }
       storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
       CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
-      LOGGER.info("Temp storeLocation taken is " + storeLocation)
+      LOGGER.info(s"Temp storeLocation taken is $storeLocation")
       var mergeStatus = false
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
@@ -111,7 +113,7 @@ class CarbonMergerRDD[K, V](
           carbonMergerMapping.maxSegmentColCardinality)
 
         // sorting the table block info List.
-        var tableBlockInfoList = carbonSparkPartition.tableBlockInfos
+        val tableBlockInfoList = carbonSparkPartition.tableBlockInfos
 
         Collections.sort(tableBlockInfoList)
 
@@ -123,7 +125,7 @@ class CarbonMergerRDD[K, V](
 
         carbonLoadModel.setStorePath(storePath)
 
-          exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
+        exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
           factTableName, storePath, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
           dataFileMetadataSegMapping
         )
@@ -135,18 +137,18 @@ class CarbonMergerRDD[K, V](
         } catch {
           case e: Throwable =>
             if (null != exec) {
-              exec.finish
+              exec.finish()
             }
             LOGGER.error(e)
             if (null != e.getMessage) {
-              sys.error("Exception occurred in query execution :: " + e.getMessage)
+              sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
             } else {
               sys.error("Exception occurred in query execution.Please check logs.")
             }
         }
         mergeNumber = mergedLoadName
           .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
-            CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+                     CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
           )
 
         val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
@@ -170,13 +172,11 @@ class CarbonMergerRDD[K, V](
           )
         mergeStatus = merger.mergerSlice()
 
-      }
-      catch {
+      } catch {
         case e: Exception =>
           LOGGER.error(e)
           throw e
-      }
-      finally {
+      } finally {
         // delete temp location data
         val newSlice = CarbonCommonConstants.LOAD_FOLDER + mergeNumber
         try {
@@ -187,9 +187,9 @@ class CarbonMergerRDD[K, V](
           case e: Exception =>
             LOGGER.error(e)
         }
-       if (null != exec) {
-         exec.finish
-       }
+        if (null != exec) {
+          exec.finish
+        }
       }
 
       var finished = false
@@ -198,8 +198,7 @@ class CarbonMergerRDD[K, V](
         if (!finished) {
           finished = true
           finished
-        }
-        else {
+        } else {
           !finished
         }
       }
@@ -261,7 +260,7 @@ class CarbonMergerRDD[K, V](
       )
 
       // keep on assigning till last one is reached.
-      if (null != blocksOfOneSegment && blocksOfOneSegment.size > 0) {
+      if (null != blocksOfOneSegment && blocksOfOneSegment.nonEmpty) {
         blocksOfLastSegment = blocksOfOneSegment.asJava
       }
 
@@ -273,8 +272,7 @@ class CarbonMergerRDD[K, V](
           val blockListTemp = new util.ArrayList[TableBlockInfo]()
           blockListTemp.add(tableBlockInfo)
           taskIdMapping.put(taskNo, blockListTemp)
-        }
-        else {
+        } else {
           blockList.add(tableBlockInfo)
         }
       }
@@ -288,8 +286,7 @@ class CarbonMergerRDD[K, V](
     }
 
     // prepare the details required to extract the segment properties using last segment.
-    if (null != blocksOfLastSegment && blocksOfLastSegment.size > 0)
-    {
+    if (null != blocksOfLastSegment && blocksOfLastSegment.size > 0) {
       val lastBlockInfo = blocksOfLastSegment.get(blocksOfLastSegment.size - 1)
 
       var dataFileFooter: DataFileFooter = null
@@ -317,8 +314,8 @@ class CarbonMergerRDD[K, V](
     } else { nodeMapping.size() }
     CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
     logInfo("No.of Executors required=" + requiredExecutors
-      + " , spark.executor.instances=" + confExecutors
-      + ", no.of.nodes where data present=" + nodeMapping.size())
+            + " , spark.executor.instances=" + confExecutors
+            + ", no.of.nodes where data present=" + nodeMapping.size())
     var nodes = DistributionUtil.getNodeList(sparkContext)
     var maxTimes = 30
     while (nodes.length < requiredExecutors && maxTimes > 0) {
@@ -341,11 +338,11 @@ class CarbonMergerRDD[K, V](
 
       val list = new util.ArrayList[TableBlockInfo]
       entry._2.asScala.foreach(taskInfo => {
-         val blocksPerNode = taskInfo.asInstanceOf[TableTaskInfo]
-         list.addAll(blocksPerNode.getTableBlockInfoList)
+        val blocksPerNode = taskInfo.asInstanceOf[TableTaskInfo]
+        list.addAll(blocksPerNode.getTableBlockInfoList)
         taskBlockList
           .add(new NodeInfo(blocksPerNode.getTaskId, blocksPerNode.getTableBlockInfoList.size))
-       })
+      })
       if (list.size() != 0) {
         result.add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, list))
         i += 1
@@ -354,25 +351,25 @@ class CarbonMergerRDD[K, V](
 
     // print the node info along with task and number of blocks for the task.
 
-    nodeTaskBlocksMap.asScala.foreach((entry : (String, List[NodeInfo])) => {
-      logInfo(s"for the node ${entry._1}" )
+    nodeTaskBlocksMap.asScala.foreach((entry: (String, List[NodeInfo])) => {
+      logInfo(s"for the node ${ entry._1 }")
       for (elem <- entry._2.asScala) {
         logInfo("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks)
       }
-    } )
+    })
 
     val noOfNodes = nodes.length
     val noOfTasks = result.size
     logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
             + s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
     )
-    logInfo("Time taken to identify Blocks to scan : " + (System
-                                                            .currentTimeMillis() - startTime)
+    logInfo("Time taken to identify Blocks to scan: " + (System
+                                                           .currentTimeMillis() - startTime)
     )
-    for (j <- 0 until result.size ) {
+    for (j <- 0 until result.size) {
       val cp = result.get(j).asInstanceOf[CarbonSparkPartition]
-      logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
-              + ", No.Of Blocks : " + cp.tableBlockInfos.size
+      logInfo(s"Node: " + cp.locations.toSeq.mkString(",")
+              + ", No.Of Blocks: " + cp.tableBlockInfos.size
       )
     }
     result.toArray(new Array[Partition](result.size))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index c496099..d56b00f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -49,7 +49,6 @@ import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.util.QueryPlanUtil
 
 
-
 class CarbonSparkPartition(rddId: Int, val idx: Int,
     val locations: Array[String],
     val tableBlockInfos: util.List[TableBlockInfo])
@@ -92,7 +91,7 @@ class CarbonScanRDD[V: ClassTag](
     val result = new util.ArrayList[Partition](defaultParallelism)
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val validAndInvalidSegments = new SegmentStatusManager(queryModel.getAbsoluteTableIdentifier)
-        .getValidAndInvalidSegments
+      .getValidAndInvalidSegments
     // set filter resolver tree
     try {
       // before applying filter check whether segments are available in the table.
@@ -110,11 +109,10 @@ class CarbonScanRDD[V: ClassTag](
             queryModel.getAbsoluteTableIdentifier
           )
       }
-    }
-    catch {
+    } catch {
       case e: Exception =>
         LOGGER.error(e)
-        sys.error("Exception occurred in query execution :: " + e.getMessage)
+        sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
     }
     // get splits
     val splits = carbonInputFormat.getSplits(job)
@@ -129,8 +127,8 @@ class CarbonScanRDD[V: ClassTag](
         )
       )
       var activeNodes = Array[String]()
-      if(blockListTemp.nonEmpty) {
-         activeNodes = DistributionUtil
+      if (blockListTemp.nonEmpty) {
+        activeNodes = DistributionUtil
           .ensureExecutorsAndGetNodeList(blockListTemp.toArray, sparkContext)
       }
       defaultParallelism = sparkContext.defaultParallelism
@@ -141,9 +139,9 @@ class CarbonScanRDD[V: ClassTag](
         var statistic = new QueryStatistic()
         // group blocks to nodes, tasks
         val nodeBlockMapping =
-          CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
-            activeNodes.toList.asJava
-          )
+        CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
+          activeNodes.toList.asJava
+        )
         statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
         statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId())
         statistic = new QueryStatistic()
@@ -173,15 +171,14 @@ class CarbonScanRDD[V: ClassTag](
         statisticRecorder.logStatisticsAsTableDriver()
         result.asScala.foreach { r =>
           val cp = r.asInstanceOf[CarbonSparkPartition]
-          logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
-                  + ", No.Of Blocks : " + cp.tableBlockInfos.size()
+          logInfo(s"Node: ${ cp.locations.toSeq.mkString(",") }" +
+                  s", No.Of Blocks:  ${ cp.tableBlockInfos.size() }"
           )
         }
       } else {
         logInfo("No blocks identified to scan")
       }
-    }
-    else {
+    } else {
       logInfo("No valid segments found to scan")
     }
     result.toArray(new Array[Partition](result.size()))
@@ -200,7 +197,7 @@ class CarbonScanRDD[V: ClassTag](
           queryExecutor.finish
         })
         val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
-        if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
+        if (!carbonSparkPartition.tableBlockInfos.isEmpty) {
           queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
           // fill table block info
           queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
@@ -221,7 +218,7 @@ class CarbonScanRDD[V: ClassTag](
         case e: Exception =>
           LOGGER.error(e)
           if (null != e.getMessage) {
-            sys.error("Exception occurred in query execution :: " + e.getMessage)
+            sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
           } else {
             sys.error("Exception occurred in query execution.Please check logs.")
           }
@@ -256,22 +253,22 @@ class CarbonScanRDD[V: ClassTag](
       }
 
       def logStatistics(): Unit = {
-          if (null != queryModel.getStatisticsRecorder) {
-            var queryStatistic = new QueryStatistic()
-            queryStatistic
-              .addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
-                System.currentTimeMillis - queryStartTime
-              )
-            queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
-            // result size
-            queryStatistic = new QueryStatistic()
-            queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
-            queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
-            // print executor query statistics for each task_id
-            queryModel.getStatisticsRecorder.logStatisticsAsTableExecutor()
-          }
+        if (null != queryModel.getStatisticsRecorder) {
+          var queryStatistic = new QueryStatistic()
+          queryStatistic
+            .addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+              System.currentTimeMillis - queryStartTime
+            )
+          queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
+          // result size
+          queryStatistic = new QueryStatistic()
+          queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
+          queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
+          // print executor query statistics for each task_id
+          queryModel.getStatisticsRecorder.logStatisticsAsTableExecutor()
         }
       }
+    }
 
     iter
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 28c37f3..9c9be8d 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -50,18 +50,18 @@ object Compactor {
     val carbonLoadModel = compactionCallableModel.carbonLoadModel
     val compactionType = compactionCallableModel.compactionType
 
-    val startTime = System.nanoTime();
+    val startTime = System.nanoTime()
     val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
     var finalMergeStatus = false
     val schemaName: String = carbonLoadModel.getDatabaseName
     val factTableName = carbonLoadModel.getTableName
     val validSegments: Array[String] = CarbonDataMergerUtil
       .getValidSegments(loadsToMerge).split(',')
-    val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime();
+    val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime()
     val carbonMergerMapping = CarbonMergerMapping(storeLocation,
       storePath,
       partitioner,
-      carbonTable.getMetaDataFilepath(),
+      carbonTable.getMetaDataFilepath,
       mergedLoadName,
       kettleHomePath,
       cubeCreationTime,
@@ -82,19 +82,19 @@ object Compactor {
     )
     )
     carbonLoadModel.setLoadMetadataDetails(segmentStatusManager
-      .readLoadMetadata(carbonTable.getMetaDataFilepath()).toList.asJava
+      .readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava
     )
     var execInstance = "1"
     // in case of non dynamic executor allocation, number of executors are fixed.
     if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
       execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
-      logger.info("spark.executor.instances property is set to =" + execInstance)
+      logger.info(s"spark.executor.instances property is set to = $execInstance")
     } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation.
     else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
       if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
         .equalsIgnoreCase("true")) {
         execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
-        logger.info("spark.dynamicAllocation.maxExecutors property is set to =" + execInstance)
+        logger.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance")
       }
     }
 
@@ -106,51 +106,38 @@ object Compactor {
       execInstance
     ).collect
 
-    if(mergeStatus.length == 0) {
+    if (mergeStatus.length == 0) {
       finalMergeStatus = false
-    }
-    else {
+    } else {
       finalMergeStatus = mergeStatus.forall(_._2)
     }
 
     if (finalMergeStatus) {
-      val endTime = System.nanoTime();
-      logger.info("time taken to merge " + mergedLoadName + " is " + (endTime - startTime))
+      val endTime = System.nanoTime()
+      logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
       if (!CarbonDataMergerUtil
-        .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath(),
+        .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
           mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType
         )) {
-        logger
-          .audit("Compaction request failed for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
-        logger
-          .error("Compaction request failed for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
-        throw new Exception("Compaction failed to update metadata for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName)
-      }
-      else {
-        logger
-          .audit("Compaction request completed for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
-        logger
-          .info("Compaction request completed for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
+        logger.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+                     s"${ carbonLoadModel.getTableName }")
+        logger.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+                     s"${ carbonLoadModel.getTableName }")
+        throw new Exception(s"Compaction failed to update metadata for table" +
+                            s" ${ carbonLoadModel.getDatabaseName }." +
+                            s"${ carbonLoadModel.getTableName }")
+      } else {
+        logger.audit(s"Compaction request completed for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        logger.info("Compaction request completed for table ${ carbonLoadModel.getDatabaseName } " +
+                    s".${ carbonLoadModel.getTableName }")
       }
-    }
-    else {
-      logger
-        .audit("Compaction request failed for table " + carbonLoadModel
-          .getDatabaseName + "." + carbonLoadModel.getTableName
-        )
-      logger
-        .error("Compaction request failed for table " + carbonLoadModel
-          .getDatabaseName + "." + carbonLoadModel.getTableName
-        )
+    } else {
+      logger.audit("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
+                   s".${ carbonLoadModel.getTableName }"
+      )
+      logger.error("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
+                   s".${ carbonLoadModel.getTableName }")
       throw new Exception("Compaction Failure in Merger Rdd.")
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
index d1b8bf3..e23b58d 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable
 import org.apache.carbondata.common.factory.CarbonCommonFactory
 import org.apache.carbondata.core.cache.dictionary.Dictionary
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter
 import org.apache.carbondata.spark.rdd.DictionaryLoadModel
 
@@ -63,8 +64,7 @@ class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String],
       if (values.length >= 1) {
         if (model.dictFileExists(columnIndex)) {
           for (value <- values) {
-            val parsedValue = org.apache.carbondata.core.util.DataTypeUtil
-              .normalizeColumnValueForItsDataType(value,
+            val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
                 model.primDimensions(columnIndex))
             if (null != parsedValue && dictionary.getSurrogateKey(parsedValue) ==
               CarbonCommonConstants.INVALID_SURROGATE_KEY) {
@@ -75,8 +75,7 @@ class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String],
 
         } else {
           for (value <- values) {
-            val parsedValue = org.apache.carbondata.core.util.DataTypeUtil
-              .normalizeColumnValueForItsDataType(value,
+            val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
                 model.primDimensions(columnIndex))
             if (null != parsedValue) {
               writer.write(parsedValue)
@@ -88,8 +87,7 @@ class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String],
     } catch {
       case ex: IOException =>
         throw ex
-    }
-    finally {
+    } finally {
       if (null != writer) {
         writer.close()
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index 30d5871..efedc91 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -45,7 +45,7 @@ object CarbonThriftServer {
     } catch {
       case e: Exception =>
         val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-        LOG.error("Wrong value for carbon.spark.warmUpTime " + warmUpTime +
+        LOG.error(s"Wrong value for carbon.spark.warmUpTime $warmUpTime " +
                   "Using default Value and proceeding")
         Thread.sleep(30000)
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 8b89f5d..d5051cf 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -36,7 +36,7 @@ object CommonUtil {
       if (noDictionaryDims.contains(x)) {
         throw new MalformedCarbonCommandException(
           "Column group is not supported for no dictionary columns:" + x)
-      } else if (msrs.filter { msr => msr.column.equals(x) }.nonEmpty) {
+      } else if (msrs.exists(msr => msr.column.equals(x))) {
         // if column is measure
         throw new MalformedCarbonCommandException("Column group is not supported for measures:" + x)
       } else if (foundIndExistingColGrp(x)) {
@@ -47,11 +47,11 @@ object CommonUtil {
       } else if (isTimeStampColumn(x, dims)) {
         throw new MalformedCarbonCommandException(
           "Column group doesn't support Timestamp datatype:" + x)
-      }
-      // if invalid column is present
-      else if (dims.filter { dim => dim.column.equalsIgnoreCase(x) }.isEmpty) {
+      }// if invalid column is
+      else if (!dims.exists(dim => dim.column.equalsIgnoreCase(x))) {
+        // present
         throw new MalformedCarbonCommandException(
-          "column in column group is not a valid column :" + x
+          "column in column group is not a valid column: " + x
         )
       }
     }
@@ -110,7 +110,7 @@ object CommonUtil {
             key.length()), value))
         }
     }
-    if (fieldProps.isEmpty()) {
+    if (fieldProps.isEmpty) {
       None
     } else {
       Some(fieldProps)
@@ -211,7 +211,7 @@ object CommonUtil {
     var tableBlockSize: Integer = 0
     if (tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE).isDefined) {
       val blockSizeStr: String =
-        parsePropertyValueStringInMB(tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE).get)
+        parsePropertyValueStringInMB(tableProperties(CarbonCommonConstants.TABLE_BLOCKSIZE))
       try {
         tableBlockSize = Integer.parseInt(blockSizeStr)
       } catch {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index 1fee428..aa8fcd5 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -35,7 +35,7 @@ object DataTypeConverterUtil {
       case "timestamp" => DataType.TIMESTAMP
       case "array" => DataType.ARRAY
       case "struct" => DataType.STRUCT
-      case _ => sys.error("Unsupported data type : " + dataType)
+      case _ => sys.error(s"Unsupported data type: $dataType")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index bd295bc..db01367 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -69,9 +69,9 @@ object GlobalDictionaryUtil extends Logging {
   /**
    * find columns which need to generate global dictionary.
    *
-   * @param dimensions  dimension list of schema
-   * @param headers  column headers
-   * @param columns column list of csv file
+   * @param dimensions dimension list of schema
+   * @param headers    column headers
+   * @param columns    column list of csv file
    */
   def pruneDimensions(dimensions: Array[CarbonDimension],
       headers: Array[String],
@@ -96,10 +96,10 @@ object GlobalDictionaryUtil extends Logging {
 
   /**
    * use this method to judge whether CarbonDimension use some encoding or not
-    *
-    * @param dimension   carbonDimension
-   * @param encoding   the coding way of dimension
-   * @param excludeEncoding  the coding way to exclude
+   *
+   * @param dimension       carbonDimension
+   * @param encoding        the coding way of dimension
+   * @param excludeEncoding the coding way to exclude
    */
   def hasEncoding(dimension: CarbonDimension,
       encoding: Encoding,
@@ -129,7 +129,7 @@ object GlobalDictionaryUtil extends Logging {
       if (dimension.hasEncoding(encoding) &&
           (excludeEncoding == null || !dimension.hasEncoding(excludeEncoding))) {
         if ((forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) != null) ||
-          (!forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) == null)) {
+            (!forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) == null)) {
           dimensionsWithEncoding += dimension
         }
       }
@@ -137,8 +137,8 @@ object GlobalDictionaryUtil extends Logging {
   }
 
   def getPrimDimensionWithDict(carbonLoadModel: CarbonLoadModel,
-    dimension: CarbonDimension,
-    forPreDefDict: Boolean): Array[CarbonDimension] = {
+      dimension: CarbonDimension,
+      forPreDefDict: Boolean): Array[CarbonDimension] = {
     val dimensionsWithDict = new ArrayBuffer[CarbonDimension]
     gatherDimensionByEncoding(carbonLoadModel, dimension, Encoding.DICTIONARY,
       Encoding.DIRECT_DICTIONARY,
@@ -267,19 +267,20 @@ object GlobalDictionaryUtil extends Logging {
   }
 
   def isHighCardinalityColumn(columnCardinality: Int,
-                              rowCount: Long,
-                              model: DictionaryLoadModel): Boolean = {
-    (columnCardinality > model.highCardThreshold) && (rowCount > 0) &&
-      (columnCardinality.toDouble / rowCount * 100 > model.rowCountPercentage)
+      rowCount: Long,
+      model: DictionaryLoadModel): Boolean = {
+    (columnCardinality > model.highCardThreshold) &&
+    (rowCount > 0) &&
+    (columnCardinality.toDouble / rowCount * 100 > model.rowCountPercentage)
   }
 
   /**
    * create a instance of DictionaryLoadModel
    *
-   * @param carbonLoadModel  carbon load model
-   * @param table  CarbonTableIdentifier
-   * @param dimensions  column list
-   * @param hdfsLocation  store location in HDFS
+   * @param carbonLoadModel carbon load model
+   * @param table           CarbonTableIdentifier
+   * @param dimensions      column list
+   * @param hdfsLocation    store location in HDFS
    * @param dictfolderPath  path of dictionary folder
    */
   def createDictionaryLoadModel(carbonLoadModel: CarbonLoadModel,
@@ -300,7 +301,7 @@ object GlobalDictionaryUtil extends Logging {
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table)
     val primDimensions = primDimensionsBuffer.map { x => x }.toArray
     val dictDetail = CarbonSparkFactory.getDictionaryDetailService().
-    getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
+      getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
     val dictFilePaths = dictDetail.dictFilePaths
     val dictFileExists = dictDetail.dictFileExists
     val columnIdentifier = dictDetail.columnIdentifiers
@@ -311,14 +312,14 @@ object GlobalDictionaryUtil extends Logging {
     val zookeeperUrl = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.ZOOKEEPER_URL)
     // load high cardinality identify configure
     val highCardIdentifyEnable = CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
-        CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT).toBoolean
+      CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
+      CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT).toBoolean
     val highCardThreshold = CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
-        CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT).toInt
+      CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
+      CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT).toInt
     val rowCountPercentage = CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
-        CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT).toDouble
+      CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
+      CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT).toDouble
 
     // get load count
     if (null == carbonLoadModel.getLoadMetadataDetails) {
@@ -346,8 +347,8 @@ object GlobalDictionaryUtil extends Logging {
   /**
    * load CSV files to DataFrame by using datasource "com.databricks.spark.csv"
    *
-   * @param sqlContext  SQLContext
-   * @param carbonLoadModel  carbon data load model
+   * @param sqlContext      SQLContext
+   * @param carbonLoadModel carbon data load model
    */
   def loadDataFrame(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel): DataFrame = {
@@ -356,16 +357,14 @@ object GlobalDictionaryUtil extends Logging {
       .option("header", {
         if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
           "true"
-        }
-        else {
+        } else {
           "false"
         }
       })
       .option("delimiter", {
         if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
           "" + DEFAULT_SEPARATOR
-        }
-        else {
+        } else {
           carbonLoadModel.getCsvDelimiter
         }
       })
@@ -377,8 +376,7 @@ object GlobalDictionaryUtil extends Logging {
       .option("quote", {
         if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) {
           "" + DEFAULT_QUOTE_CHARACTER
-        }
-        else {
+        } else {
           carbonLoadModel.getQuoteChar
         }
       })
@@ -388,9 +386,9 @@ object GlobalDictionaryUtil extends Logging {
   }
 
   private def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
-                                  sqlContext: SQLContext,
-                                  model: DictionaryLoadModel,
-                                  noDictDimension: Array[CarbonDimension]): Unit = {
+      sqlContext: SQLContext,
+      model: DictionaryLoadModel,
+      noDictDimension: Array[CarbonDimension]): Unit = {
 
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
       model.table)
@@ -413,12 +411,11 @@ object GlobalDictionaryUtil extends Logging {
     // update Metadata
     val catalog = CarbonEnv.getInstance(sqlContext).carbonCatalog
     catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
-        model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
+      model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
 
     // update CarbonDataLoadSchema
     val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
-          model.table.getTableName)(sqlContext)
-        .asInstanceOf[CarbonRelation].tableMeta.carbonTable
+      model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
     carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
 
   }
@@ -429,9 +426,9 @@ object GlobalDictionaryUtil extends Logging {
    * @param status checking whether the generating is  successful
    */
   private def checkStatus(carbonLoadModel: CarbonLoadModel,
-                          sqlContext: SQLContext,
-                          model: DictionaryLoadModel,
-                          status: Array[(Int, String, Boolean)]) = {
+      sqlContext: SQLContext,
+      model: DictionaryLoadModel,
+      status: Array[(Int, String, Boolean)]) = {
     var result = false
     val noDictionaryColumns = new ArrayBuffer[CarbonDimension]
     val tableName = model.table.getTableName
@@ -441,7 +438,9 @@ object GlobalDictionaryUtil extends Logging {
         result = true
         logError(s"table:$tableName column:$columnName generate global dictionary file failed")
       }
-      if (x._3) noDictionaryColumns +=  model.primDimensions(x._1)
+      if (x._3) {
+        noDictionaryColumns += model.primDimensions(x._1)
+      }
     }
     if (noDictionaryColumns.nonEmpty) {
       updateTableMetadata(carbonLoadModel, sqlContext, model, noDictionaryColumns.toArray)
@@ -457,23 +456,23 @@ object GlobalDictionaryUtil extends Logging {
   /**
    * get external columns and whose dictionary file path
    *
-   * @param colDictFilePath  external column dict file path
-   * @param table  table identifier
-   * @param dimensions  dimension columns
+   * @param colDictFilePath external column dict file path
+   * @param table           table identifier
+   * @param dimensions      dimension columns
    */
   private def setPredefinedColumnDictPath(carbonLoadModel: CarbonLoadModel,
-                                          colDictFilePath: String,
-                                          table: CarbonTableIdentifier,
-                                          dimensions: Array[CarbonDimension]) = {
+      colDictFilePath: String,
+      table: CarbonTableIdentifier,
+      dimensions: Array[CarbonDimension]) = {
     val colFileMapArray = colDictFilePath.split(",")
     for (colPathMap <- colFileMapArray) {
       val colPathMapTrim = colPathMap.trim
       val colNameWithPath = colPathMapTrim.split(":")
       if (colNameWithPath.length == 1) {
         logError("the format of external column dictionary should be " +
-        "columnName:columnPath, please check")
+                 "columnName:columnPath, please check")
         throw new DataLoadingException("the format of predefined column dictionary" +
-        " should be columnName:columnPath, please check")
+                                       " should be columnName:columnPath, please check")
       }
       setPredefineDict(carbonLoadModel, dimensions, table, colNameWithPath(0),
         FileUtils.getPaths(colPathMapTrim.substring(colNameWithPath(0).length + 1)))
@@ -482,26 +481,25 @@ object GlobalDictionaryUtil extends Logging {
 
   /**
    * set pre defined dictionary for dimension
-    *
-    * @param dimensions  all the dimensions
-   * @param table   carbon table identifier
-   * @param colName  user specified  column name for predefined dict
-   * @param colDictPath  column dictionary file path
-   * @param parentDimName  parent dimenion for complex type
+   *
+   * @param dimensions    all the dimensions
+   * @param table         carbon table identifier
+   * @param colName       user specified  column name for predefined dict
+   * @param colDictPath   column dictionary file path
+   * @param parentDimName parent dimenion for complex type
    */
   def setPredefineDict(carbonLoadModel: CarbonLoadModel,
-                       dimensions: Array[CarbonDimension],
-                       table: CarbonTableIdentifier,
-                       colName: String,
-                       colDictPath: String,
-                       parentDimName: String = "") {
+      dimensions: Array[CarbonDimension],
+      table: CarbonTableIdentifier,
+      colName: String,
+      colDictPath: String,
+      parentDimName: String = "") {
     val middleDimName = colName.split("\\.")(0)
     val dimParent = parentDimName + {
       colName match {
         case "" => colName
         case _ =>
-          if (parentDimName.isEmpty) middleDimName
-          else "." + middleDimName
+          if (parentDimName.isEmpty) middleDimName else "." + middleDimName
       }
     }
     // judge whether the column is exists
@@ -509,9 +507,10 @@ object GlobalDictionaryUtil extends Logging {
       _.getColName.equalsIgnoreCase(dimParent))
     if (preDictDimensionOption.length == 0) {
       logError(s"Column $dimParent is not a key column " +
-        s"in ${table.getDatabaseName}.${table.getTableName}")
+               s"in ${ table.getDatabaseName }.${ table.getTableName }")
       throw new DataLoadingException(s"Column $dimParent is not a key column. " +
-        s"Only key column can be part of dictionary and used in COLUMNDICT option.")
+                                     s"Only key column can be part of dictionary " +
+                                     s"and used in COLUMNDICT option.")
     }
     val preDictDimension = preDictDimensionOption(0)
     if (preDictDimension.isComplex) {
@@ -520,9 +519,11 @@ object GlobalDictionaryUtil extends Logging {
       val currentColName = {
         preDictDimension.getDataType match {
           case DataType.ARRAY =>
-            if (children(0).isComplex) "val." +
-            colName.substring(middleDimName.length + 1)
-        else "val"
+            if (children(0).isComplex) {
+              "val." + colName.substring(middleDimName.length + 1)
+            } else {
+              "val"
+            }
           case _ => colName.substring(middleDimName.length + 1)
         }
       }
@@ -534,27 +535,27 @@ object GlobalDictionaryUtil extends Logging {
   }
 
   /**
-   *  use external dimension column to generate global dictionary
+   * use external dimension column to generate global dictionary
    *
-   * @param colDictFilePath  external column dict file path
-   * @param table  table identifier
-   * @param dimensions  dimension column
-   * @param carbonLoadModel  carbon load model
-   * @param sqlContext   spark sql context
-   * @param hdfsLocation  store location on hdfs
+   * @param colDictFilePath external column dict file path
+   * @param table           table identifier
+   * @param dimensions      dimension column
+   * @param carbonLoadModel carbon load model
+   * @param sqlContext      spark sql context
+   * @param hdfsLocation    store location on hdfs
    * @param dictFolderPath  generated global dict file path
    */
   private def generatePredefinedColDictionary(colDictFilePath: String,
-                                              table: CarbonTableIdentifier,
-                                              dimensions: Array[CarbonDimension],
-                                              carbonLoadModel: CarbonLoadModel,
-                                              sqlContext: SQLContext,
-                                              hdfsLocation: String,
-                                              dictFolderPath: String) = {
+      table: CarbonTableIdentifier,
+      dimensions: Array[CarbonDimension],
+      carbonLoadModel: CarbonLoadModel,
+      sqlContext: SQLContext,
+      hdfsLocation: String,
+      dictFolderPath: String) = {
     // set pre defined dictionary column
     setPredefinedColumnDictPath(carbonLoadModel, colDictFilePath, table, dimensions)
     val dictLoadModel = createDictionaryLoadModel(carbonLoadModel, table, dimensions,
-      hdfsLocation, dictFolderPath, true)
+      hdfsLocation, dictFolderPath, forPreDefDict = true)
     // new RDD to achieve distributed column dict generation
     val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel,
       sqlContext.sparkContext, table, dimensions, hdfsLocation, dictFolderPath)
@@ -600,7 +601,7 @@ object GlobalDictionaryUtil extends Logging {
    * @param csvFileColumns
    */
   private def parseRecord(x: String, accum: Accumulator[Int],
-                  csvFileColumns: Array[String]) : (String, String) = {
+      csvFileColumns: Array[String]): (String, String) = {
     val tokens = x.split("" + DEFAULT_SEPARATOR)
     var columnName: String = ""
     var value: String = ""
@@ -610,7 +611,7 @@ object GlobalDictionaryUtil extends Logging {
       accum += 1
     } else if (tokens.size == 1) {
       // such as "1", "jone", throw ex
-      if (x.contains(",") == false) {
+      if (!x.contains(",")) {
         accum += 1
       } else {
         try {
@@ -644,10 +645,10 @@ object GlobalDictionaryUtil extends Logging {
    * @return allDictionaryRdd
    */
   private def readAllDictionaryFiles(sqlContext: SQLContext,
-                                     csvFileColumns: Array[String],
-                                     requireColumns: Array[String],
-                                     allDictionaryPath: String,
-                                     accumulator: Accumulator[Int]) = {
+      csvFileColumns: Array[String],
+      requireColumns: Array[String],
+      allDictionaryPath: String,
+      accumulator: Accumulator[Int]) = {
     var allDictionaryRdd: RDD[(String, Iterable[String])] = null
     try {
       // read local dictionary file, and spilt (columnIndex, columnValue)
@@ -686,7 +687,7 @@ object GlobalDictionaryUtil extends Logging {
           true
         } else {
           logWarning("No dictionary files found or empty dictionary files! " +
-            "Won't generate new dictionary.")
+                     "Won't generate new dictionary.")
           false
         }
       } else {
@@ -699,7 +700,7 @@ object GlobalDictionaryUtil extends Logging {
           true
         } else {
           logWarning("No dictionary files found or empty dictionary files! " +
-            "Won't generate new dictionary.")
+                     "Won't generate new dictionary.")
           false
         }
       } else {
@@ -726,7 +727,7 @@ object GlobalDictionaryUtil extends Logging {
       } else {
         carbonLoadModel.getCsvDelimiter
       }
-      headers = readLine.toLowerCase().split(delimiter);
+      headers = readLine.toLowerCase().split(delimiter)
     } else {
       logError("Not found file header! Please set fileheader")
       throw new IOException("Failed to get file header")
@@ -737,13 +738,13 @@ object GlobalDictionaryUtil extends Logging {
   /**
    * generate global dictionary with SQLContext and CarbonLoadModel
    *
-   * @param sqlContext  sql context
-   * @param carbonLoadModel  carbon load model
+   * @param sqlContext      sql context
+   * @param carbonLoadModel carbon load model
    */
   def generateGlobalDictionary(sqlContext: SQLContext,
-                               carbonLoadModel: CarbonLoadModel,
-                               storePath: String,
-                               dataFrame: Option[DataFrame] = None): Unit = {
+      carbonLoadModel: CarbonLoadModel,
+      storePath: String,
+      dataFrame: Option[DataFrame] = None): Unit = {
     try {
       var carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
       var carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
@@ -757,7 +758,7 @@ object GlobalDictionaryUtil extends Logging {
       carbonLoadModel.initPredefDictMap()
 
       val allDictionaryPath = carbonLoadModel.getAllDictPath
-      if(StringUtils.isEmpty(allDictionaryPath)) {
+      if (StringUtils.isEmpty(allDictionaryPath)) {
         logInfo("Generate global dictionary from source data files!")
         // load data by using dataSource com.databricks.spark.csv
         var df = if (dataFrame.isDefined) {
@@ -767,8 +768,7 @@ object GlobalDictionaryUtil extends Logging {
         }
         var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
           df.columns
-        }
-        else {
+        } else {
           carbonLoadModel.getCsvHeader.split("" + DEFAULT_SEPARATOR)
         }
         headers = headers.map(headerName => headerName.trim)
@@ -779,8 +779,9 @@ object GlobalDictionaryUtil extends Logging {
             dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)
         }
         if (headers.length > df.columns.length) {
-          val msg = "The number of columns in the file header do not match the number of " +
-            "columns in the data file; Either delimiter or fileheader provided is not correct"
+          val msg = "The number of columns in the file header do not match the " +
+                    "number of columns in the data file; Either delimiter " +
+                    "or fileheader provided is not correct"
           logError(msg)
           throw new DataLoadingException(msg)
         }
@@ -829,7 +830,7 @@ object GlobalDictionaryUtil extends Logging {
       } else {
         logInfo("Generate global dictionary from dictionary files!")
         val isNonempty = validateAllDictionaryPath(allDictionaryPath)
-        if(isNonempty) {
+        if (isNonempty) {
           var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
             getHeaderFormFactFile(carbonLoadModel)
           } else {
@@ -837,8 +838,7 @@ object GlobalDictionaryUtil extends Logging {
           }
           headers = headers.map(headerName => headerName.trim)
           // prune columns according to the CSV file header, dimension columns
-          val (requireDimension, requireColumnNames) =
-            pruneDimensions(dimensions, headers, headers)
+          val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, headers)
           if (requireDimension.nonEmpty) {
             val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
               requireDimension, storePath, dictfolderPath, false)
@@ -857,7 +857,7 @@ object GlobalDictionaryUtil extends Logging {
             // if the dictionary contains wrong format record, throw ex
             if (accumulator.value > 0) {
               throw new DataLoadingException("Data Loading failure, dictionary values are " +
-                "not in correct format!")
+                                             "not in correct format!")
             }
           } else {
             logInfo("have no column need to generate global dictionary")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index e27d166..a792eca 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -44,14 +44,14 @@ class CarbonContext(
     metaStorePath: String) extends HiveContext(sc) with Logging {
   self =>
 
-  def this (sc: SparkContext) = {
-    this (sc,
+  def this(sc: SparkContext) = {
+    this(sc,
       new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath,
       new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
   }
 
-  def this (sc: SparkContext, storePath: String) = {
-    this (sc,
+  def this(sc: SparkContext, storePath: String) = {
+    this(sc,
       storePath,
       new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
   }
@@ -107,9 +107,9 @@ class CarbonContext(
     if (sc.hadoopConfiguration.get(CarbonCommonConstants.HIVE_CONNECTION_URL) == null) {
       val metaStorePathAbsolute = new File(metaStorePath).getCanonicalPath
       val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
-      logDebug(s"metastore db is going to be created in location : $hiveMetaStoreDB")
+      logDebug(s"metastore db is going to be created in location: $hiveMetaStoreDB")
       super.configure() ++ Map((CarbonCommonConstants.HIVE_CONNECTION_URL,
-              s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true"),
+        s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true"),
         ("hive.metastore.warehouse.dir", metaStorePathAbsolute + "/hivemetadata"))
     } else {
       super.configure()
@@ -186,7 +186,7 @@ object CarbonContext {
   /**
    *
    * Requesting the extra executors other than the existing ones.
- *
+   *
    * @param sc
    * @param numExecutors
    * @return
@@ -194,12 +194,12 @@ object CarbonContext {
   final def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
     sc.schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        val requiredExecutors = numExecutors -  b.numExistingExecutors
+        val requiredExecutors = numExecutors - b.numExistingExecutors
         LOGGER
-          .info("number of executors is =" + numExecutors + " existing executors are =" + b
-            .numExistingExecutors
+          .info(s"number of executors is =$numExecutors existing executors are =" +
+                s"${ b.numExistingExecutors }"
           )
-        if(requiredExecutors > 0) {
+        if (requiredExecutors > 0) {
           b.requestExecutors(requiredExecutors)
         }
         true

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index c1a8818..f4fe900 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -111,10 +111,10 @@ case class CarbonDictionaryDecoder(
       case DataType.TIMESTAMP => TimestampType
       case DataType.STRUCT =>
         CarbonMetastoreTypes
-        .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
+          .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
       case DataType.ARRAY =>
         CarbonMetastoreTypes
-        .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
+          .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
     }
   }
 
@@ -131,7 +131,7 @@ case class CarbonDictionaryDecoder(
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
           (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
-              carbonDimension.getDataType)
+            carbonDimension.getDataType)
         } else {
           (null, null, null)
         }
@@ -160,7 +160,7 @@ case class CarbonDictionaryDecoder(
         (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
       }.toMap
 
-      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId);
+      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
       if (isRequiredToDecode) {
         val dataTypes = child.output.map { attr => attr.dataType }
         child.execute().mapPartitions { iter =>
@@ -186,7 +186,7 @@ case class CarbonDictionaryDecoder(
             var total = 0L
             override final def hasNext: Boolean = {
               flag = iter.hasNext
-              if (false == flag && total > 0) {
+              if (!flag && total > 0) {
                 val queryStatistic = new QueryStatistic()
                 queryStatistic
                   .addFixedTimeStatistic(QueryStatisticsConstants.PREPARE_RESULT, total)
@@ -202,8 +202,8 @@ case class CarbonDictionaryDecoder(
               dictIndex.foreach { index =>
                 if (data(index) != null) {
                   data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
-                      .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
-                      getDictionaryColumnIds(index)._3)
+                    .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+                    getDictionaryColumnIds(index)._3)
                 }
               }
               val result = unsafeProjection(new GenericMutableRow(data))
@@ -231,7 +231,7 @@ case class CarbonDictionaryDecoder(
       if (f._2 != null) {
         try {
           cache.get(new DictionaryColumnUniqueIdentifier(
-            atiMap.get(f._1).get.getCarbonTableIdentifier,
+            atiMap(f._1).getCarbonTableIdentifier,
             f._2, f._3))
         } catch {
           case _: Throwable => null
@@ -242,4 +242,5 @@ case class CarbonDictionaryDecoder(
     }
     dicts
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 6f149f7..f9a0a9d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -171,39 +171,48 @@ class CarbonSqlParser()
   }
 
   import lexical.Identifier
-  implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
-    s"identifier matching regex ${regex}",
+
+  implicit def regexToParser(regex: Regex): Parser[String] = {
+    acceptMatch(
+    s"identifier matching regex ${ regex }",
     { case Identifier(str) if regex.unapplySeq(str).isDefined => str }
-  )
-  override def parse(input: String): LogicalPlan = synchronized {
-    // Initialize the Keywords.
-    initLexical
-    phrase(start)(new lexical.Scanner(input)) match {
-      case Success(plan, _) => plan match {
-        case x: LoadTable =>
-          x.inputSqlString = input
-          x
-        case logicalPlan => logicalPlan
+    )
+  }
+
+  override def parse(input: String): LogicalPlan = {
+    synchronized {
+      // Initialize the Keywords.
+      initLexical
+      phrase(start)(new lexical.Scanner(input)) match {
+        case Success(plan, _) => plan match {
+          case x: LoadTable =>
+            x.inputSqlString = input
+            x
+          case logicalPlan => logicalPlan
+        }
+        case failureOrError => sys.error(failureOrError.toString)
       }
-      case failureOrError => sys.error(failureOrError.toString)
     }
   }
 
   /**
    * This will convert key word to regular expression.
+   *
    * @param keys
    * @return
    */
-  private def carbonKeyWord(keys: String) =
+  private def carbonKeyWord(keys: String) = {
     ("(?i)" + keys).r
+  }
 
   override protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
 
   protected lazy val startCommand: Parser[LogicalPlan] = createDatabase | dropDatabase |
-    loadManagement | describeTable | showLoads | alterTable | createTable
+                                                         loadManagement | describeTable |
+                                                         showLoads | alterTable | createTable
 
   protected lazy val loadManagement: Parser[LogicalPlan] = deleteLoadsByID | deleteLoadsByLoadDate |
-    cleanFiles | loadDataNew
+                                                           cleanFiles | loadDataNew
 
   protected val escapedIdentifier = "`([^`]+)`".r
 
@@ -248,9 +257,9 @@ class CarbonSqlParser()
     var dimensions: Seq[Field] = Seq()
     dims.foreach { dimension =>
       dimension.dataType.getOrElse("NIL") match {
-        case "Array" => complexDimensions = complexDimensions:+dimension
-        case "Struct" => complexDimensions = complexDimensions:+dimension
-        case _ => dimensions = dimensions:+dimension
+        case "Array" => complexDimensions = complexDimensions :+ dimension
+        case "Struct" => complexDimensions = complexDimensions :+ dimension
+        case _ => dimensions = dimensions :+ dimension
       }
     }
     dimensions ++ complexDimensions
@@ -276,22 +285,22 @@ class CarbonSqlParser()
    * For handling the create table DDl systax compatible to Hive syntax
    */
   protected lazy val createTable: Parser[LogicalPlan] =
-    restInput ^^ {
+  restInput ^^ {
 
-      case statement =>
-        try {
-          // DDl will be parsed and we get the AST tree from the HiveQl
-          val node = HiveQlWrapper.getAst(statement)
-          // processing the AST tree
-          nodeToPlan(node)
-        } catch {
-          // MalformedCarbonCommandException need to be throw directly, parser will catch it
-          case ce: MalformedCarbonCommandException =>
-            throw ce
-          case e: Exception =>
-            sys.error("Parsing error") // no need to do anything.
-        }
-    }
+    case statement =>
+      try {
+        // DDl will be parsed and we get the AST tree from the HiveQl
+        val node = HiveQlWrapper.getAst(statement)
+        // processing the AST tree
+        nodeToPlan(node)
+      } catch {
+        // MalformedCarbonCommandException need to be throw directly, parser will catch it
+        case ce: MalformedCarbonCommandException =>
+          throw ce
+        case e: Exception =>
+          sys.error("Parsing error") // no need to do anything.
+      }
+  }
 
   private def getScaleAndPrecision(dataType: String): (Int, Int) = {
     val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
@@ -313,17 +322,17 @@ class CarbonSqlParser()
       case Token("TOK_CREATETABLE", children) =>
 
 
-          var fields: Seq[Field] = Seq[Field]()
-          var tableComment: String = ""
-          var tableProperties = Map[String, String]()
-          var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
-          var likeTableName: String = ""
-          var storedBy: String = ""
-          var ifNotExistPresent: Boolean = false
-          var dbName: Option[String] = None
-          var tableName: String = ""
+        var fields: Seq[Field] = Seq[Field]()
+        var tableComment: String = ""
+        var tableProperties = Map[String, String]()
+        var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
+        var likeTableName: String = ""
+        var storedBy: String = ""
+        var ifNotExistPresent: Boolean = false
+        var dbName: Option[String] = None
+        var tableName: String = ""
 
-          try {
+        try {
 
           // Checking whether create table request is carbon table
           children.collect {
@@ -332,7 +341,7 @@ class CarbonSqlParser()
             case _ =>
           }
           if (!(storedBy.equals(CarbonContext.datasourceName) ||
-              storedBy.equals(CarbonContext.datasourceShortName))) {
+                storedBy.equals(CarbonContext.datasourceShortName))) {
             sys.error("Not a carbon format request")
           }
 
@@ -362,7 +371,7 @@ class CarbonSqlParser()
                   match {
                     case Success(field, _) => field
                     case failureOrError => throw new MalformedCarbonCommandException(
-                        s"Unsupported data type : $col.getType")
+                      s"Unsupported data type: $col.getType")
                   }
                   // the data type of the decimal type will be like decimal(10,0)
                   // so checking the start of the string and taking the precision and scale.
@@ -405,7 +414,7 @@ class CarbonSqlParser()
               if (repeatedProperties.nonEmpty) {
                 val repeatedPropStr: String = repeatedProperties.mkString(",")
                 throw new MalformedCarbonCommandException("Table properties is repeated: " +
-                  repeatedPropStr)
+                                                          repeatedPropStr)
               }
               tableProperties ++= propertySeq
 
@@ -430,12 +439,17 @@ class CarbonSqlParser()
 
           // get logical plan.
           CreateTable(tableModel)
-        }
-        catch {
+        } catch {
           case ce: MalformedCarbonCommandException =>
-            val message = if (tableName.isEmpty) "Create table command failed. "
-            else if (dbName.isEmpty) s"Create table command failed for $tableName. "
-            else s"Create table command failed for ${dbName.get}.$tableName. "
+            val message = if (tableName.isEmpty) {
+              "Create table command failed. "
+            }
+            else if (dbName.isEmpty) {
+              s"Create table command failed for $tableName. "
+            }
+            else {
+              s"Create table command failed for ${ dbName.get }.$tableName. "
+            }
             LOGGER.audit(message + ce.getMessage)
             throw ce
         }
@@ -488,19 +502,24 @@ class CarbonSqlParser()
    * @return
    */
   protected def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
-                                  , tableName: String, fields: Seq[Field],
-                                  partitionCols: Seq[PartitionerField],
-                                  tableProperties: Map[String, String]): tableModel
+      , tableName: String, fields: Seq[Field],
+      partitionCols: Seq[PartitionerField],
+      tableProperties: Map[String, String]): tableModel
   = {
 
     val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
       fields, tableProperties)
     if (dims.isEmpty) {
-      throw new MalformedCarbonCommandException(s"Table ${dbName.getOrElse(
-        CarbonCommonConstants.DATABASE_DEFAULT_NAME)}.$tableName"
-        + " can not be created without key columns. Please use DICTIONARY_INCLUDE or " +
-        "DICTIONARY_EXCLUDE to set at least one key column " +
-        "if all specified columns are numeric types")
+      throw new MalformedCarbonCommandException(s"Table ${
+        dbName.getOrElse(
+          CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+      }.$tableName"
+                                                +
+                                                " can not be created without key columns. Please " +
+                                                "use DICTIONARY_INCLUDE or " +
+                                                "DICTIONARY_EXCLUDE to set at least one key " +
+                                                "column " +
+                                                "if all specified columns are numeric types")
     }
     val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
 
@@ -508,7 +527,7 @@ class CarbonSqlParser()
     val colProps = extractColumnProperties(fields, tableProperties)
     // get column groups configuration from table properties.
     val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
-        noDictionaryDims, msrs, dims)
+      noDictionaryDims, msrs, dims)
 
     // get no inverted index columns from table properties.
     val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
@@ -554,8 +573,7 @@ class CarbonSqlParser()
       }
       // This will  be furthur handled.
       CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
-    }
-    else {
+    } else {
       null
     }
   }
@@ -603,7 +621,7 @@ class CarbonSqlParser()
    * @return
    */
   protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
-                                     tableProperties: Map[String, String]):
+      tableProperties: Map[String, String]):
   Option[Partitioner] = {
 
     // by default setting partition class empty.
@@ -649,8 +667,8 @@ class CarbonSqlParser()
   }
 
   protected def fillAllChildrenColumnProperty(parent: String, fieldChildren: Option[List[Field]],
-    tableProperties: Map[String, String],
-    colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+      tableProperties: Map[String, String],
+      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
     fieldChildren.foreach(fields => {
       fields.foreach(field => {
         fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
@@ -661,9 +679,9 @@ class CarbonSqlParser()
   }
 
   protected def fillColumnProperty(parentColumnName: Option[String],
-    columnName: String,
-    tableProperties: Map[String, String],
-    colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+      columnName: String,
+      tableProperties: Map[String, String],
+      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
     val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
     val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
     if (colProps.isDefined) {
@@ -672,7 +690,7 @@ class CarbonSqlParser()
   }
 
   def getKey(parentColumnName: Option[String],
-    columnName: String): (String, String) = {
+      columnName: String): (String, String) = {
     if (parentColumnName.isDefined) {
       if (columnName == "val") {
         (parentColumnName.get, parentColumnName.get + "." + columnName)
@@ -683,6 +701,7 @@ class CarbonSqlParser()
       (columnName, columnName)
     }
   }
+
   /**
    * This will extract the no inverted columns fields.
    * By default all dimensions use inverted index.
@@ -692,7 +711,7 @@ class CarbonSqlParser()
    * @return
    */
   protected def extractNoInvertedIndexColumns(fields: Seq[Field],
-    tableProperties: Map[String, String]):
+      tableProperties: Map[String, String]):
   Seq[String] = {
     // check whether the column name is in fields
     var noInvertedIdxColsProps: Array[String] = Array[String]()
@@ -703,17 +722,17 @@ class CarbonSqlParser()
         tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
       noInvertedIdxColsProps
         .map { noInvertedIdxColProp =>
-        if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
-          val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
-            " does not exist in table. Please check create table statement."
-          throw new MalformedCarbonCommandException(errormsg)
+          if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
+            val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
+                           " does not exist in table. Please check create table statement."
+            throw new MalformedCarbonCommandException(errormsg)
+          }
         }
-      }
     }
     // check duplicate columns and only 1 col left
     val distinctCols = noInvertedIdxColsProps.toSet
     // extract the no inverted index columns
-    fields.foreach( field => {
+    fields.foreach(field => {
       if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
         noInvertedIdxCols :+= field.column
       }
@@ -731,7 +750,7 @@ class CarbonSqlParser()
    * @return
    */
   protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
-                                                    tableProperties: Map[String, String]):
+      tableProperties: Map[String, String]):
   (Seq[Field], Seq[String]) = {
     var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
     var dictExcludeCols: Array[String] = Array[String]()
@@ -746,18 +765,18 @@ class CarbonSqlParser()
         .map { dictExcludeCol =>
           if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
             val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
-              " does not exist in table. Please check create table statement."
+                           " does not exist in table. Please check create table statement."
             throw new MalformedCarbonCommandException(errormsg)
           } else {
-            val dataType = fields.find (x =>
+            val dataType = fields.find(x =>
               x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
             if (isComplexDimDictionaryExclude(dataType)) {
               val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
-                dictExcludeCol
+                             dictExcludeCol
               throw new MalformedCarbonCommandException(errormsg)
             } else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
               val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
-                " data type column: " + dictExcludeCol
+                             " data type column: " + dictExcludeCol
               throw new MalformedCarbonCommandException(errorMsg)
             }
           }
@@ -768,19 +787,19 @@ class CarbonSqlParser()
       dictIncludeCols =
         tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
       dictIncludeCols.map { distIncludeCol =>
-          if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
-            val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
-              " does not exist in table. Please check create table statement."
-            throw new MalformedCarbonCommandException(errormsg)
-          }
+        if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
+          val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
+                         " does not exist in table. Please check create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
         }
+      }
     }
 
     // include cols should contain exclude cols
     dictExcludeCols.foreach { dicExcludeCol =>
       if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
         val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
-          " with DICTIONARY_INCLUDE. Please check create table statement."
+                       " with DICTIONARY_INCLUDE. Please check create table statement."
         throw new MalformedCarbonCommandException(errormsg)
       }
     }
@@ -794,11 +813,9 @@ class CarbonSqlParser()
           noDictionaryDims :+= field.column
         }
         dimFields += field
-      }
-      else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
+      } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
         dimFields += (field)
-      }
-      else if (isDetectAsDimentionDatatype(field.dataType.get)) {
+      } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
         dimFields += (field)
       }
     }
@@ -806,11 +823,12 @@ class CarbonSqlParser()
 
     (dimFields.toSeq, noDictionaryDims)
   }
+
   /**
    * It fills non string dimensions in dimFields
    */
   def fillNonStringDimension(dictIncludeCols: Seq[String],
-    field: Field, dimFields: LinkedHashSet[Field]) {
+      field: Field, dimFields: LinkedHashSet[Field]) {
     var dictInclude = false
     if (dictIncludeCols.nonEmpty) {
       dictIncludeCols.foreach(dictIncludeCol =>
@@ -841,9 +859,9 @@ class CarbonSqlParser()
     dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
   }
 
-   /**
-    * detects whether double or decimal column is part of dictionary_exclude
-    */
+  /**
+   * detects whether double or decimal column is part of dictionary_exclude
+   */
   def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
     val dataTypes = Array("string", "timestamp")
     dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
@@ -857,7 +875,7 @@ class CarbonSqlParser()
    * @return
    */
   protected def extractMsrColsFromFields(fields: Seq[Field],
-                                         tableProperties: Map[String, String]): Seq[Field] = {
+      tableProperties: Map[String, String]): Seq[Field] = {
     var msrFields: Seq[Field] = Seq[Field]()
     var dictIncludedCols: Array[String] = Array[String]()
     var dictExcludedCols: Array[String] = Array[String]()
@@ -877,10 +895,10 @@ class CarbonSqlParser()
     // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
     fields.foreach(field => {
       if (!isDetectAsDimentionDatatype(field.dataType.get)) {
-          if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
+        if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
             !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
-            msrFields :+= field
-          }
+          msrFields :+= field
+        }
       }
     })
 
@@ -905,35 +923,43 @@ class CarbonSqlParser()
     (db, tableName)
   }
 
-  protected def cleanIdentifier(ident: String): String = ident match {
-    case escapedIdentifier(i) => i
-    case plainIdent => plainIdent
+  protected def cleanIdentifier(ident: String): String = {
+    ident match {
+      case escapedIdentifier(i) => i
+      case plainIdent => plainIdent
+    }
   }
 
   protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = {
     var remainingNodes = nodeList
     val clauses = clauseNames.map { clauseName =>
       val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
-      remainingNodes = nonMatches ++ (if (matches.nonEmpty) matches.tail else Nil)
+      remainingNodes = nonMatches ++ (if (matches.nonEmpty) {
+        matches.tail
+      } else {
+        Nil
+      })
       matches.headOption
     }
 
     if (remainingNodes.nonEmpty) {
       sys.error(
         s"""Unhandled clauses:
-           |You are likely trying to use an unsupported carbon feature."""".stripMargin)
+            |You are likely trying to use an unsupported carbon feature."""".stripMargin)
     }
     clauses
   }
 
   object Token {
     /** @return matches of the form (tokenName, children). */
-    def unapply(t: Any): Option[(String, Seq[ASTNode])] = t match {
-      case t: ASTNode =>
-        CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
-        Some((t.getText,
-          Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
-      case _ => None
+    def unapply(t: Any): Option[(String, Seq[ASTNode])] = {
+      t match {
+        case t: ASTNode =>
+          CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
+          Some((t.getText,
+            Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
+        case _ => None
+      }
     }
   }
 
@@ -943,35 +969,39 @@ class CarbonSqlParser()
    * @param node
    * @return
    */
-  protected def getProperties(node: Node): Seq[(String, String)] = node match {
-    case Token("TOK_TABLEPROPLIST", list) =>
-      list.map {
-        case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
-          (unquoteString(key) -> unquoteString(value))
-      }
+  protected def getProperties(node: Node): Seq[(String, String)] = {
+    node match {
+      case Token("TOK_TABLEPROPLIST", list) =>
+        list.map {
+          case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
+            (unquoteString(key) -> unquoteString(value))
+        }
+    }
   }
 
-  protected def unquoteString(str: String) = str match {
-    case singleQuotedString(s) => s.toLowerCase()
-    case doubleQuotedString(s) => s.toLowerCase()
-    case other => other
+  protected def unquoteString(str: String) = {
+    str match {
+      case singleQuotedString(s) => s.toLowerCase()
+      case doubleQuotedString(s) => s.toLowerCase()
+      case other => other
+    }
   }
 
   protected lazy val loadDataNew: Parser[LogicalPlan] =
     LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
-      (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
-      (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
-        case filePath ~ isOverwrite ~ table ~ optionsList =>
-          val (databaseNameOp, tableName) = table match {
-            case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
-          }
-          if(optionsList.isDefined) {
-            validateOptions(optionsList)
-          }
-          val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
-          LoadTable(databaseNameOp, tableName, filePath, Seq(), optionsMap,
-            isOverwrite.isDefined)
-      }
+    (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
+    (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
+      case filePath ~ isOverwrite ~ table ~ optionsList =>
+        val (databaseNameOp, tableName) = table match {
+          case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
+        }
+        if (optionsList.isDefined) {
+          validateOptions(optionsList)
+        }
+        val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
+        LoadTable(databaseNameOp, tableName, filePath, Seq(), optionsMap,
+          isOverwrite.isDefined)
+    }
 
   private def validateOptions(optionList: Option[List[(String, String)]]): Unit = {
 
@@ -999,9 +1029,9 @@ class CarbonSqlParser()
 
     //  COLUMNDICT and ALL_DICTIONARY_PATH can not be used together.
     if (options.exists(_._1.equalsIgnoreCase("COLUMNDICT")) &&
-      options.exists(_._1.equalsIgnoreCase("ALL_DICTIONARY_PATH"))) {
+        options.exists(_._1.equalsIgnoreCase("ALL_DICTIONARY_PATH"))) {
       val errorMessage = "Error: COLUMNDICT and ALL_DICTIONARY_PATH can not be used together" +
-        " in options"
+                         " in options"
       throw new MalformedCarbonCommandException(errorMessage)
     }
 
@@ -1052,20 +1082,21 @@ class CarbonSqlParser()
 
   protected lazy val primitiveTypes =
     STRING ^^^ "string" | INTEGER ^^^ "integer" | TIMESTAMP ^^^
-    "timestamp" | NUMERIC ^^^ "numeric" | BIGINT ^^^ "bigint" |
-       INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType
+                                                  "timestamp" | NUMERIC ^^^ "numeric" |
+    BIGINT ^^^ "bigint" |
+    INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType
 
   /**
    * Matching the decimal(10,0) data type and returning the same.
    */
   private lazy val decimalType =
-    DECIMAL ~ ("(" ~> numericLit <~",") ~ (numericLit <~ ")")  ^^ {
-      case decimal ~ precision ~scale =>
-        s"$decimal($precision, $scale)"
-    }
+  DECIMAL ~ ("(" ~> numericLit <~ ",") ~ (numericLit <~ ")") ^^ {
+    case decimal ~ precision ~ scale =>
+      s"$decimal($precision, $scale)"
+  }
 
   protected lazy val nestedType: Parser[Field] = structFieldType | arrayFieldType |
-    primitiveFieldType
+                                                 primitiveFieldType
 
   protected lazy val anyFieldDef: Parser[Field] =
     (ident | stringLit) ~ ((":").? ~> nestedType) ~ (IN ~> (ident | stringLit)).? ^^ {
@@ -1095,8 +1126,8 @@ class CarbonSqlParser()
 
   protected lazy val measureCol: Parser[Field] =
     (ident | stringLit) ~ (INTEGER ^^^ "integer" | NUMERIC ^^^ "numeric" |
-      BIGINT ^^^ "bigint" | DECIMAL ^^^ "decimal").? ~
-      (AS ~> (ident | stringLit)).? ~ (IN ~> (ident | stringLit)).? ^^ {
+                           BIGINT ^^^ "bigint" | DECIMAL ^^^ "decimal").? ~
+    (AS ~> (ident | stringLit)).? ~ (IN ~> (ident | stringLit)).? ^^ {
       case e1 ~ e2 ~ e3 ~ e4 => Field(e1, e2, e3, Some(null))
     }
 
@@ -1112,11 +1143,11 @@ class CarbonSqlParser()
         if (ef.isDefined && "FORMATTED".equalsIgnoreCase(ef.get)) {
           new DescribeFormattedCommand("describe formatted " + tblIdentifier,
             tblIdentifier)
-        }
-        else {
+        } else {
           new DescribeCommand(UnresolvedRelation(tblIdentifier, None), ef.isDefined)
         }
     }
+
   private def normalizeType(field: Field): Field = {
     val dataType = field.dataType.getOrElse("NIL")
     dataType match {
@@ -1217,22 +1248,23 @@ class CarbonSqlParser()
 
   protected lazy val showLoads: Parser[LogicalPlan] =
     SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
-      (LIMIT ~> numericLit).? <~
-      opt(";") ^^ {
+    (LIMIT ~> numericLit).? <~
+    opt(";") ^^ {
       case databaseName ~ tableName ~ limit =>
         ShowLoadsCommand(databaseName, tableName.toLowerCase(), limit)
     }
 
   protected lazy val segmentId: Parser[String] =
     numericLit ^^ { u => u } |
-      elem("decimal", p => {
-        p.getClass.getSimpleName.equals("FloatLit") ||
-        p.getClass.getSimpleName.equals("DecimalLit") } ) ^^ (_.chars)
+    elem("decimal", p => {
+      p.getClass.getSimpleName.equals("FloatLit") ||
+      p.getClass.getSimpleName.equals("DecimalLit")
+    }) ^^ (_.chars)
 
   protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
     DELETE ~> SEGMENT ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
-      (ident <~ ".").? ~ ident) <~
-      opt(";") ^^ {
+                                                   (ident <~ ".").? ~ ident) <~
+    opt(";") ^^ {
       case loadids ~ table => table match {
         case databaseName ~ tableName =>
           DeleteLoadsById(loadids, databaseName, tableName.toLowerCase())
@@ -1241,8 +1273,8 @@ class CarbonSqlParser()
 
   protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
     DELETE ~> SEGMENTS ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
-      (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
-      opt(";") ^^ {
+    (WHERE ~> (STARTTIME <~ BEFORE) ~ stringLit) <~
+    opt(";") ^^ {
       case schema ~ table ~ condition =>
         condition match {
           case dateField ~ dateValue =>
@@ -1261,6 +1293,7 @@ class CarbonSqlParser()
         logicalPlan match {
           case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined)
           case _ => ExplainCommand(OneRowRelation)
-      }
+        }
     }
+
 }



[4/4] incubator-carbondata git commit: [CARBONDATA-328] [Spark] Improve Code and Fix Warnings This closes #279

Posted by ja...@apache.org.
[CARBONDATA-328] [Spark] Improve Code and Fix Warnings This closes #279


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0a8e782f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0a8e782f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0a8e782f

Branch: refs/heads/master
Commit: 0a8e782ffd6c7aa958e291cd4f403859a8dd4104
Parents: c5176f3 6391c2b
Author: jackylk <ja...@huawei.com>
Authored: Sat Nov 19 10:16:23 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sat Nov 19 10:16:23 2016 +0800

----------------------------------------------------------------------
 .../examples/AllDictionaryExample.scala         |   2 +
 .../carbondata/examples/CarbonExample.scala     |   2 +
 .../spark/sql/common/util/QueryTest.scala       |   6 +-
 .../spark/CarbonDataFrameWriter.scala           |   4 +-
 .../apache/carbondata/spark/CarbonFilters.scala |  34 +-
 .../spark/rdd/CarbonCleanFilesRDD.scala         |   2 +-
 .../spark/rdd/CarbonDataLoadRDD.scala           | 267 ++++----
 .../spark/rdd/CarbonDataRDDFactory.scala        | 619 +++++++++----------
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |   2 +-
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |   2 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 101 +--
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  91 ++-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  57 +-
 .../apache/carbondata/spark/rdd/Compactor.scala |  71 +--
 .../spark/tasks/DictionaryWriterTask.scala      |  10 +-
 .../spark/thriftserver/CarbonThriftServer.scala |   2 +-
 .../carbondata/spark/util/CommonUtil.scala      |  14 +-
 .../spark/util/DataTypeConverterUtil.scala      |   2 +-
 .../spark/util/GlobalDictionaryUtil.scala       | 212 +++----
 .../org/apache/spark/sql/CarbonContext.scala    |  22 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  17 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  | 347 ++++++-----
 .../spark/sql/SparkUnknownExpression.scala      |  53 +-
 .../execution/command/carbonTableSchema.scala   | 270 ++++----
 .../spark/sql/hive/CarbonMetastoreCatalog.scala |  65 +-
 .../spark/sql/hive/DistributionUtil.scala       |  35 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   |  13 +-
 .../scala/org/apache/spark/util/FileUtils.scala |  10 +-
 28 files changed, 1141 insertions(+), 1191 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-carbondata git commit: Improved spark module code. * Removed some compliation warnings. * Replace pattern matching for boolean to IF-ELSE. * Improved code according to scala standards. * Removed unnecessary new lines. * Added string inter

Posted by ja...@apache.org.
Improved spark module code.
* Removed some compliation warnings.
* Replace pattern matching for boolean to IF-ELSE.
* Improved code according to scala standards.
* Removed unnecessary new lines.
* Added string interpolation instead of string concatenation.
* Removed unnecessary semi-colons.
* Fixed indentation.
* add useKettle option for loading
* Fixed indentation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6391c2be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6391c2be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6391c2be

Branch: refs/heads/master
Commit: 6391c2be31f347688a3dbe9f9657e3dd75158684
Parents: c5176f3
Author: Prabhat Kashyap <pr...@knoldus.in>
Authored: Wed Oct 19 22:24:47 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Sat Nov 19 09:51:04 2016 +0800

----------------------------------------------------------------------
 .../examples/AllDictionaryExample.scala         |   2 +
 .../carbondata/examples/CarbonExample.scala     |   2 +
 .../spark/sql/common/util/QueryTest.scala       |   6 +-
 .../spark/CarbonDataFrameWriter.scala           |   4 +-
 .../apache/carbondata/spark/CarbonFilters.scala |  34 +-
 .../spark/rdd/CarbonCleanFilesRDD.scala         |   2 +-
 .../spark/rdd/CarbonDataLoadRDD.scala           | 267 ++++----
 .../spark/rdd/CarbonDataRDDFactory.scala        | 619 +++++++++----------
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |   2 +-
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |   2 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 101 +--
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  91 ++-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  57 +-
 .../apache/carbondata/spark/rdd/Compactor.scala |  71 +--
 .../spark/tasks/DictionaryWriterTask.scala      |  10 +-
 .../spark/thriftserver/CarbonThriftServer.scala |   2 +-
 .../carbondata/spark/util/CommonUtil.scala      |  14 +-
 .../spark/util/DataTypeConverterUtil.scala      |   2 +-
 .../spark/util/GlobalDictionaryUtil.scala       | 212 +++----
 .../org/apache/spark/sql/CarbonContext.scala    |  22 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  17 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  | 347 ++++++-----
 .../spark/sql/SparkUnknownExpression.scala      |  53 +-
 .../execution/command/carbonTableSchema.scala   | 270 ++++----
 .../spark/sql/hive/CarbonMetastoreCatalog.scala |  65 +-
 .../spark/sql/hive/DistributionUtil.scala       |  35 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   |  13 +-
 .../scala/org/apache/spark/util/FileUtils.scala |  10 +-
 28 files changed, 1141 insertions(+), 1191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
index dcdf41f..9fecadb 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
@@ -21,6 +21,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.examples.util.{AllDictionaryUtil, ExampleUtils}
 
 object AllDictionaryExample {
+
   def main(args: Array[String]) {
     val cc = ExampleUtils.createCarbonContext("CarbonExample")
     val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv"
@@ -57,4 +58,5 @@ object AllDictionaryExample {
     // clean local dictionary files
     AllDictionaryUtil.cleanDictionary(allDictFile)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 038f609..f98d46d 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.examples.util.ExampleUtils
 
 object CarbonExample {
+
   def main(args: Array[String]) {
     val cc = ExampleUtils.createCarbonContext("CarbonExample")
     val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv"
@@ -73,4 +74,5 @@ object CarbonExample {
     // Drop table
     cc.sql("DROP TABLE IF EXISTS t3")
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index f9960d3..587013f 100644
--- a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -140,7 +140,7 @@ object QueryTest {
              |$e
              |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
           """.stripMargin
-        return Some(errorMessage)
+        Some(errorMessage)
     }
 
     if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
@@ -157,9 +157,9 @@ object QueryTest {
               prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")
         }
       """.stripMargin
-      return Some(errorMessage)
+      Some(errorMessage)
     }
 
-    return None
+    None
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index a02751e..3596393 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -126,8 +126,8 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) extends Logging {
       options.tableName,
       null,
       Seq(),
-      Map(("fileheader" -> header)),
-      false,
+      Map("fileheader" -> header),
+      isOverwriteExist = false,
       null,
       Some(dataFrame)).run(cc)
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 711c51c..3162f80 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -121,8 +121,8 @@ object CarbonFilters {
       expr match {
         case or@ Or(left, right) =>
 
-          val leftFilter = translate(left, true)
-          val rightFilter = translate(right, true)
+          val leftFilter = translate(left, or = true)
+          val rightFilter = translate(right, or = true)
           if (leftFilter.isDefined && rightFilter.isDefined) {
             Some( sources.Or(leftFilter.get, rightFilter.get))
           } else {
@@ -265,29 +265,27 @@ object CarbonFilters {
             Some(new EqualToExpression(transformExpression(child).get,
              transformExpression(Literal(null)).get, true))
         case Not(In(a: Attribute, list))
-         if !list.exists(!_.isInstanceOf[Literal]) =>
-         if (list.exists(x => (isNullLiteral(x.asInstanceOf[Literal])))) {
-          Some(new FalseExpression(transformExpression(a).get))
-         }
-        else {
-          Some(new NotInExpression(transformExpression(a).get,
+          if !list.exists(!_.isInstanceOf[Literal]) =>
+          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+            Some(new FalseExpression(transformExpression(a).get))
+          } else {
+            Some(new NotInExpression(transformExpression(a).get,
               new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
-            }
+          }
         case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
           Some(new InExpression(transformExpression(a).get,
             new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
         case Not(In(Cast(a: Attribute, _), list))
           if !list.exists(!_.isInstanceOf[Literal]) =>
-        /* if any illogical expression comes in NOT IN Filter like
-         NOT IN('scala',NULL) this will be treated as false expression and will
-         always return no result. */
-          if (list.exists(x => (isNullLiteral(x.asInstanceOf[Literal])))) {
-          Some(new FalseExpression(transformExpression(a).get))
-         }
-        else {
-          Some(new NotInExpression(transformExpression(a).get, new ListExpression(
+          /* if any illogical expression comes in NOT IN Filter like
+           NOT IN('scala',NULL) this will be treated as false expression and will
+           always return no result. */
+          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+            Some(new FalseExpression(transformExpression(a).get))
+          } else {
+            Some(new NotInExpression(transformExpression(a).get, new ListExpression(
               convertToJavaList(list.map(transformExpression(_).get)))))
-              }
+          }
         case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
           Some(new InExpression(transformExpression(a).get,
             new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index 3ba32d2..3a5d952 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -76,7 +76,7 @@ class CarbonCleanFilesRDD[V: ClassTag](
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val theSplit = split.asInstanceOf[CarbonLoadPartition]
     val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name : " + s.head + s.length)
+    logInfo("Host Name: " + s.head + s.length)
     s
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 856e67c..2a36f30 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -26,7 +26,8 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 import scala.util.Random
 
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
+TaskContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
@@ -78,11 +79,11 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
 }
 
 class SparkPartitionLoader(model: CarbonLoadModel,
-                           splitIndex: Int,
-                           storePath: String,
-                           kettleHomePath: String,
-                           loadCount: Int,
-                           loadMetadataDetails: LoadMetadataDetails) extends Logging{
+    splitIndex: Int,
+    storePath: String,
+    kettleHomePath: String,
+    loadCount: Int,
+    loadMetadataDetails: LoadMetadataDetails) extends Logging {
 
   var storeLocation: String = ""
 
@@ -106,7 +107,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     // container temp dir or is yarn application directory.
     val carbonUseLocalDir = CarbonProperties.getInstance()
       .getProperty("carbon.use.local.dir", "false")
-    if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
       val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
       if (null != storeLocations && storeLocations.nonEmpty) {
         storeLocation = storeLocations(Random.nextInt(storeLocations.length))
@@ -114,8 +115,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
       if (storeLocation == null) {
         storeLocation = System.getProperty("java.io.tmpdir")
       }
-    }
-    else {
+    } else {
       storeLocation = System.getProperty("java.io.tmpdir")
     }
     storeLocation = storeLocation + '/' + System.nanoTime() + '/' + splitIndex
@@ -127,7 +127,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
         kettleHomePath)
     } catch {
       case e: DataLoadingException => if (e.getErrorCode ==
-        DataProcessorConstants.BAD_REC_FOUND) {
+                                          DataProcessorConstants.BAD_REC_FOUND) {
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
         logInfo("Bad Record Found")
       } else {
@@ -160,6 +160,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     }
   }
 }
+
 /**
  * Use this RDD class to load csv data file
  *
@@ -171,7 +172,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
  * @param partitioner           Partitioner which specify how to partition
  * @param columinar             whether it is columinar
  * @param loadCount             Current load count
- * @param tableCreationTime      Time of creating table
+ * @param tableCreationTime     Time of creating table
  * @param schemaLastUpdatedTime Time of last schema update
  * @param blocksGroupBy         Blocks Array which is group by partition or host
  * @param isTableSplitPartition Whether using table split partition
@@ -195,30 +196,29 @@ class DataFileLoaderRDD[K, V](
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
   override def getPartitions: Array[Partition] = {
-    isTableSplitPartition match {
-      case true =>
-        // for table split partition
-        var splits = Array[TableSplit]()
-        if (carbonLoadModel.isDirectLoad) {
-          splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
-            partitioner.nodeList, partitioner.partitionCount)
-        }
-        else {
-          splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
-            carbonLoadModel.getTableName, null, partitioner)
-        }
+    if (isTableSplitPartition) {
+      // for table split partition
+      var splits = Array[TableSplit]()
+      if (carbonLoadModel.isDirectLoad) {
+        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+          partitioner.nodeList, partitioner.partitionCount)
+      } else {
+        splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+          carbonLoadModel.getTableName, null, partitioner)
+      }
 
-        splits.zipWithIndex.map {s =>
-          // filter the same partition unique id, because only one will match, so get 0 element
-          val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
-            p._1 == s._1.getPartition.getUniqueID)(0)._2
-          new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
-        }
-      case false =>
-        // for node partition
-        blocksGroupBy.zipWithIndex.map{b =>
-          new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
-        }
+      splits.zipWithIndex.map { case (split, index) =>
+        // filter the same partition unique id, because only one will match, so get 0 element
+        val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter { case (uniqueId, _) =>
+          uniqueId == split.getPartition.getUniqueID
+        }(0)._2
+        new CarbonTableSplitPartition(id, index, split, blocksDetails)
+      }
+    } else {
+      // for node partition
+      blocksGroupBy.zipWithIndex.map { case ((uniqueId, blockDetails), index) =>
+        new CarbonNodePartition(id, index, uniqueId, blockDetails)
+      }
     }
   }
 
@@ -242,16 +242,14 @@ class DataFileLoaderRDD[K, V](
         setModelAndBlocksInfo()
         val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
           kettleHomePath, loadCount, loadMetadataDetails)
-        loader.initialize
+        loader.initialize()
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         if (model.isRetentionRequest) {
           recreateAggregationTableForRetention
-        }
-        else if (model.isAggLoadRequest) {
+        } else if (model.isAggLoadRequest) {
           loadMetadataDetails.setLoadStatus(createManualAggregateTable)
-        }
-        else {
-          loader.run
+        } else {
+          loader.run()
         }
       } catch {
         case e: Exception =>
@@ -261,52 +259,50 @@ class DataFileLoaderRDD[K, V](
       }
 
       def setModelAndBlocksInfo(): Unit = {
-        isTableSplitPartition match {
-          case true =>
-            // for table split partition
-            val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
-            logInfo("Input split: " + split.serializableHadoopSplit.value)
-            val blocksID = gernerateBlocksID
-            carbonLoadModel.setBlocksID(blocksID)
-            carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-            if (carbonLoadModel.isDirectLoad) {
-              model = carbonLoadModel.getCopyWithPartition(
-                split.serializableHadoopSplit.value.getPartition.getUniqueID,
-                split.serializableHadoopSplit.value.getPartition.getFilesPath,
-                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-            } else {
-              model = carbonLoadModel.getCopyWithPartition(
-                split.serializableHadoopSplit.value.getPartition.getUniqueID)
-            }
-            partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-            // get this partition data blocks and put it to global static map
-            GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
-            StandardLogService.setThreadName(partitionID, null)
-            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordPartitionBlockMap(
-              partitionID, split.partitionBlocksDetail.length)
-          case false =>
-            // for node partition
-            val split = theSplit.asInstanceOf[CarbonNodePartition]
-            logInfo("Input split: " + split.serializableHadoopSplit)
-            logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
-            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordHostBlockMap(
-              split.serializableHadoopSplit, split.nodeBlocksDetail.length)
-            val blocksID = gernerateBlocksID
-            carbonLoadModel.setBlocksID(blocksID)
-            carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-            // set this node blocks info to global static map
-            GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
-            if (carbonLoadModel.isDirectLoad) {
-              val filelist: java.util.List[String] = new java.util.ArrayList[String](
-                CarbonCommonConstants.CONSTANT_SIZE_TEN)
-              CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
-              model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
-                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-            }
-            else {
-              model = carbonLoadModel.getCopyWithPartition(partitionID)
-            }
-            StandardLogService.setThreadName(blocksID, null)
+        if (isTableSplitPartition) {
+          // for table split partition
+          val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
+          logInfo("Input split: " + split.serializableHadoopSplit.value)
+          val blocksID = gernerateBlocksID
+          carbonLoadModel.setBlocksID(blocksID)
+          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+          if (carbonLoadModel.isDirectLoad) {
+            model = carbonLoadModel.getCopyWithPartition(
+              split.serializableHadoopSplit.value.getPartition.getUniqueID,
+              split.serializableHadoopSplit.value.getPartition.getFilesPath,
+              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+          } else {
+            model = carbonLoadModel.getCopyWithPartition(
+              split.serializableHadoopSplit.value.getPartition.getUniqueID)
+          }
+          partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+          // get this partition data blocks and put it to global static map
+          GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
+          StandardLogService.setThreadName(partitionID, null)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
+            partitionID, split.partitionBlocksDetail.length)
+        } else {
+          // for node partition
+          val split = theSplit.asInstanceOf[CarbonNodePartition]
+          logInfo("Input split: " + split.serializableHadoopSplit)
+          logInfo("The Block Count in this node: " + split.nodeBlocksDetail.length)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+            split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+          val blocksID = gernerateBlocksID
+          carbonLoadModel.setBlocksID(blocksID)
+          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+          // set this node blocks info to global static map
+          GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
+          if (carbonLoadModel.isDirectLoad) {
+            val filelist: java.util.List[String] = new java.util.ArrayList[String](
+              CarbonCommonConstants.CONSTANT_SIZE_TEN)
+            CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+            model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+          } else {
+            model = carbonLoadModel.getCopyWithPartition(partitionID)
+          }
+          StandardLogService.setThreadName(blocksID, null)
         }
       }
 
@@ -316,14 +312,13 @@ class DataFileLoaderRDD[K, V](
        * @return
        */
       def gernerateBlocksID: String = {
-        isTableSplitPartition match {
-          case true =>
-            carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
-            theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
-              .getPartition.getUniqueID + "_" + UUID.randomUUID()
-          case false =>
-            carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
-            UUID.randomUUID()
+        if (isTableSplitPartition) {
+          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+          theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
+            .getPartition.getUniqueID + "_" + UUID.randomUUID()
+        } else {
+          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+          UUID.randomUUID()
         }
       }
 
@@ -351,8 +346,7 @@ class DataFileLoaderRDD[K, V](
             CarbonLoaderUtil
               .removeSliceFromMemory(model.getDatabaseName, model.getTableName, newSlice)
             logInfo(s"Aggregate table creation failed")
-          }
-          else {
+          } else {
             logInfo("Aggregate tables creation successfull")
           }
         }
@@ -425,6 +419,7 @@ class DataFileLoaderRDD[K, V](
       }
 
       var finished = false
+
       override def hasNext: Boolean = {
         !finished
       }
@@ -438,46 +433,46 @@ class DataFileLoaderRDD[K, V](
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    isTableSplitPartition match {
-      case true =>
-        // for table split partition
-        val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
-        val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
-        location
-      case false =>
-        // for node partition
-        val theSplit = split.asInstanceOf[CarbonNodePartition]
-        val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
-        logInfo("Preferred Location for split : " + firstOptionLocation(0))
-        val blockMap = new util.LinkedHashMap[String, Integer]()
-        val tableBlocks = theSplit.blocksDetails
-        tableBlocks.foreach(tableBlock => tableBlock.getLocations.foreach(
-          location => {
-            if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
-              val currentCount = blockMap.get(location)
-              if (currentCount == null) {
-                blockMap.put(location, 1)
-              } else {
-                blockMap.put(location, currentCount + 1)
-              }
+    if (isTableSplitPartition) {
+      // for table split partition
+      val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
+      val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
+      location
+    } else {
+      // for node partition
+      val theSplit = split.asInstanceOf[CarbonNodePartition]
+      val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+      logInfo("Preferred Location for split: " + firstOptionLocation.head)
+      val blockMap = new util.LinkedHashMap[String, Integer]()
+      val tableBlocks = theSplit.blocksDetails
+      tableBlocks.foreach { tableBlock =>
+        tableBlock.getLocations.foreach { location =>
+          if (!firstOptionLocation.exists(location.equalsIgnoreCase)) {
+            val currentCount = blockMap.get(location)
+            if (currentCount == null) {
+              blockMap.put(location, 1)
+            } else {
+              blockMap.put(location, currentCount + 1)
             }
           }
-        )
-        )
-
-        val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
-          nodeCount1.getValue > nodeCount2.getValue
         }
-        )
+      }
+
+      val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
+        nodeCount1.getValue > nodeCount2.getValue
+      }
+      )
 
-        val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
-        firstOptionLocation ++ sortedNodesList
+      val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+      firstOptionLocation ++ sortedNodesList
     }
   }
+
 }
 
 /**
  * Use this RDD class to load RDD
+ *
  * @param sc
  * @param result
  * @param carbonLoadModel
@@ -512,7 +507,7 @@ class DataFrameLoaderRDD[K, V](
       var partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
       var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
-        theSplit.index
+                               theSplit.index
       try {
         loadMetadataDetails.setPartitionCount(partitionID)
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
@@ -521,14 +516,14 @@ class DataFrameLoaderRDD[K, V](
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
         val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath,
           kettleHomePath, loadCount, loadMetadataDetails)
-        loader.initialize
+        loader.initialize()
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         val rddIteratorKey = UUID.randomUUID().toString
-        try{
+        try {
           RddInputUtils.put(rddIteratorKey,
             new RddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel))
           carbonLoadModel.setRddIteratorKey(rddIteratorKey)
-          loader.run
+          loader.run()
         } finally {
           RddInputUtils.remove(rddIteratorKey)
         }
@@ -540,6 +535,7 @@ class DataFrameLoaderRDD[K, V](
       }
 
       var finished = false
+
       override def hasNext: Boolean = !finished
 
       override def next(): (K, V) = {
@@ -556,11 +552,12 @@ class DataFrameLoaderRDD[K, V](
 /**
  * This class wrap Scala's Iterator to Java's Iterator.
  * It also convert all columns to string data to use csv data loading flow.
+ *
  * @param rddIter
  * @param carbonLoadModel
  */
 class RddIterator(rddIter: Iterator[Row],
-                  carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
+    carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
   val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
     .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
   val format = new SimpleDateFormat(formatString)
@@ -570,9 +567,10 @@ class RddIterator(rddIter: Iterator[Row],
   def hasNext: Boolean = rddIter.hasNext
 
   private def getString(value: Any, level: Int = 1): String = {
-    value == null match {
-      case true => ""
-      case false => value match {
+    if (value == null) {
+      ""
+    } else {
+      value match {
         case s: String => s
         case i: java.lang.Integer => i.toString
         case d: java.lang.Double => d.toString
@@ -623,4 +621,5 @@ class RddIterator(rddIter: Iterator[Row],
 
   def remove(): Unit = {
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 4392efe..1382efa 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -32,7 +32,8 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{util => _, _}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel,
+CompactionModel, Partitioner}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.{FileUtils, SplitUtils}
 
@@ -44,7 +45,8 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable, CompactionType}
+import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable,
+CompactionType}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.etl.DataLoadingException
@@ -56,6 +58,7 @@ import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
 
+
 /**
  * This is the factory class which can create different RDD depends on user needs.
  *
@@ -178,8 +181,12 @@ object CarbonDataRDDFactory extends Logging {
   }
 
   def configSplitMaxSize(context: SparkContext, filePaths: String,
-    hadoopConfiguration: Configuration): Unit = {
-    val defaultParallelism = if (context.defaultParallelism < 1) 1 else context.defaultParallelism
+      hadoopConfiguration: Configuration): Unit = {
+    val defaultParallelism = if (context.defaultParallelism < 1) {
+      1
+    } else {
+      context.defaultParallelism
+    }
     val spaceConsumed = FileUtils.getSpaceOccupied(filePaths)
     val blockSize =
       hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB)
@@ -191,30 +198,26 @@ object CarbonDataRDDFactory extends Logging {
         newSplitSize = CarbonCommonConstants.CARBON_16MB
       }
       hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
-      logInfo("totalInputSpaceConsumed : " + spaceConsumed +
-        " , defaultParallelism : " + defaultParallelism)
-      logInfo("mapreduce.input.fileinputformat.split.maxsize : " + newSplitSize.toString)
+      logInfo(s"totalInputSpaceConsumed: $spaceConsumed , defaultParallelism: $defaultParallelism")
+      logInfo(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }")
     }
   }
 
   def alterTableForCompaction(sqlContext: SQLContext,
-    alterTableModel: AlterTableModel,
-    carbonLoadModel: CarbonLoadModel, partitioner: Partitioner, storePath: String,
-    kettleHomePath: String, storeLocation: String): Unit = {
+      alterTableModel: AlterTableModel,
+      carbonLoadModel: CarbonLoadModel, partitioner: Partitioner, storePath: String,
+      kettleHomePath: String, storeLocation: String): Unit = {
     var compactionSize: Long = 0
     var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
     if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
       compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
       compactionType = CompactionType.MAJOR_COMPACTION
-    }
-    else {
+    } else {
       compactionType = CompactionType.MINOR_COMPACTION
     }
 
-    logger
-      .audit(s"Compaction request received for table " +
-        s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
-      )
+    logger.audit(s"Compaction request received for table " +
+                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
       .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
@@ -244,9 +247,7 @@ object CarbonDataRDDFactory extends Logging {
     // if any other request comes at this time then it will create a compaction request file.
     // so that this will be taken up by the compaction process which is executing.
     if (!isConcurrentCompactionAllowed) {
-      logger
-        .info("System level compaction lock is enabled."
-        )
+      logger.info("System level compaction lock is enabled.")
       handleCompactionForSystemLocking(sqlContext,
         carbonLoadModel,
         partitioner,
@@ -257,8 +258,7 @@ object CarbonDataRDDFactory extends Logging {
         carbonTable,
         compactionModel
       )
-    }
-    else {
+    } else {
       // normal flow of compaction
       val lock = CarbonLockFactory
         .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
@@ -266,10 +266,8 @@ object CarbonDataRDDFactory extends Logging {
         )
 
       if (lock.lockWithRetries()) {
-        logger
-          .info("Acquired the compaction lock for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
+        logger.info("Acquired the compaction lock for table" +
+                    s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           startCompactionThreads(sqlContext,
             carbonLoadModel,
@@ -280,45 +278,37 @@ object CarbonDataRDDFactory extends Logging {
             compactionModel,
             lock
           )
-        }
-        catch {
-          case e : Exception =>
-            logger.error("Exception in start compaction thread. " + e.getMessage)
+        } catch {
+          case e: Exception =>
+            logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
             lock.unlock()
         }
-      }
-      else {
-        logger
-          .audit("Not able to acquire the compaction lock for table " +
-            s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
-          )
-        logger
-          .error("Not able to acquire the compaction lock for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
+      } else {
+        logger.audit("Not able to acquire the compaction lock for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        logger.error(s"Not able to acquire the compaction lock for table" +
+                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         sys.error("Table is already locked for compaction. Please try after some time.")
       }
     }
   }
 
   def handleCompactionForSystemLocking(sqlContext: SQLContext,
-    carbonLoadModel: CarbonLoadModel,
-    partitioner: Partitioner,
-    storePath: String,
-    kettleHomePath: String,
-    storeLocation: String,
-    compactionType: CompactionType,
-    carbonTable: CarbonTable,
-    compactionModel: CompactionModel): Unit = {
+      carbonLoadModel: CarbonLoadModel,
+      partitioner: Partitioner,
+      storePath: String,
+      kettleHomePath: String,
+      storeLocation: String,
+      compactionType: CompactionType,
+      carbonTable: CarbonTable,
+      compactionModel: CompactionModel): Unit = {
     val lock = CarbonLockFactory
       .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
         LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
       )
     if (lock.lockWithRetries()) {
-      logger
-        .info("Acquired the compaction lock for table " + carbonLoadModel
-          .getDatabaseName + "." + carbonLoadModel.getTableName
-        )
+      logger.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
+                  s".${ carbonLoadModel.getTableName }")
       try {
         startCompactionThreads(sqlContext,
           carbonLoadModel,
@@ -329,50 +319,43 @@ object CarbonDataRDDFactory extends Logging {
           compactionModel,
           lock
         )
-      }
-      catch {
-        case e : Exception =>
-          logger.error("Exception in start compaction thread. " + e.getMessage)
+      } catch {
+        case e: Exception =>
+          logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
           lock.unlock()
           // if the compaction is a blocking call then only need to throw the exception.
           if (compactionModel.isDDLTrigger) {
             throw e
           }
       }
-    }
-    else {
-      logger
-        .audit("Not able to acquire the system level compaction lock for table " +
-          s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
-        )
-      logger
-        .error("Not able to acquire the compaction lock for table " + carbonLoadModel
-          .getDatabaseName + "." + carbonLoadModel.getTableName
-        )
+    } else {
+      logger.audit("Not able to acquire the system level compaction lock for table " +
+                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+      logger.error("Not able to acquire the compaction lock for table " +
+                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       CarbonCompactionUtil
         .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
       // do sys error only in case of DDL trigger.
-      if(compactionModel.isDDLTrigger) {
-        sys.error("Compaction is in progress, compaction request for table " + carbonLoadModel
-          .getDatabaseName + "." + carbonLoadModel.getTableName + " is in queue.")
-      }
-      else {
-        logger
-          .error("Compaction is in progress, compaction request for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName + " is in queue."
-          )
+      if (compactionModel.isDDLTrigger) {
+        sys.error("Compaction is in progress, compaction request for table " +
+                  s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+                  " is in queue.")
+      } else {
+        logger.error("Compaction is in progress, compaction request for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+                     " is in queue.")
       }
     }
   }
 
   def executeCompaction(carbonLoadModel: CarbonLoadModel,
-    storePath: String,
-    compactionModel: CompactionModel,
-    partitioner: Partitioner,
-    executor: ExecutorService,
-    sqlContext: SQLContext,
-    kettleHomePath: String,
-    storeLocation: String): Unit = {
+      storePath: String,
+      compactionModel: CompactionModel,
+      partitioner: Partitioner,
+      executor: ExecutorService,
+      sqlContext: SQLContext,
+      kettleHomePath: String,
+      storeLocation: String): Unit = {
     val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
       carbonLoadModel.getLoadMetadataDetails
     )
@@ -413,10 +396,9 @@ object CarbonDataRDDFactory extends Logging {
           future.get
         }
         )
-      }
-      catch {
+      } catch {
         case e: Exception =>
-          logger.error("Exception in compaction thread " + e.getMessage)
+          logger.error(s"Exception in compaction thread ${ e.getMessage }")
           throw e
       }
 
@@ -442,22 +424,23 @@ object CarbonDataRDDFactory extends Logging {
       )
     }
   }
+
   /**
    * This will submit the loads to be merged into the executor.
    *
    * @param futureList
    */
   def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
-    loadsToMerge: util
-    .List[LoadMetadataDetails],
-    executor: ExecutorService,
-    storePath: String,
-    sqlContext: SQLContext,
-    compactionModel: CompactionModel,
-    kettleHomePath: String,
-    carbonLoadModel: CarbonLoadModel,
-    partitioner: Partitioner,
-    storeLocation: String): Unit = {
+      loadsToMerge: util
+      .List[LoadMetadataDetails],
+      executor: ExecutorService,
+      storePath: String,
+      sqlContext: SQLContext,
+      compactionModel: CompactionModel,
+      kettleHomePath: String,
+      carbonLoadModel: CarbonLoadModel,
+      partitioner: Partitioner,
+      storeLocation: String): Unit = {
 
     loadsToMerge.asScala.foreach(seg => {
       logger.info("loads identified for merge is " + seg.getLoadName)
@@ -484,13 +467,13 @@ object CarbonDataRDDFactory extends Logging {
   }
 
   def startCompactionThreads(sqlContext: SQLContext,
-    carbonLoadModel: CarbonLoadModel,
-    partitioner: Partitioner,
-    storePath: String,
-    kettleHomePath: String,
-    storeLocation: String,
-    compactionModel: CompactionModel,
-    compactionLock: ICarbonLock): Unit = {
+      carbonLoadModel: CarbonLoadModel,
+      partitioner: Partitioner,
+      storePath: String,
+      kettleHomePath: String,
+      storeLocation: String,
+      compactionModel: CompactionModel,
+      compactionLock: ICarbonLock): Unit = {
     val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
     readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -499,138 +482,123 @@ object CarbonDataRDDFactory extends Logging {
     // clean up of the stale segments.
     try {
       CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
-    }
-    catch {
+    } catch {
       case e: Exception =>
-        logger
-          .error("Exception in compaction thread while clean up of stale segments " + e
-            .getMessage
-          )
+        logger.error(s"Exception in compaction thread while clean up of stale segments" +
+                     s" ${ e.getMessage }")
     }
 
-      val compactionThread = new Thread {
-        override def run(): Unit = {
+    val compactionThread = new Thread {
+      override def run(): Unit = {
 
+        try {
+          // compaction status of the table which is triggered by the user.
+          var triggeredCompactionStatus = false
+          var exception: Exception = null
           try {
-            // compaction status of the table which is triggered by the user.
-            var triggeredCompactionStatus = false
-            var exception : Exception = null
-            try {
-              executeCompaction(carbonLoadModel: CarbonLoadModel,
-                storePath: String,
-                compactionModel: CompactionModel,
-                partitioner: Partitioner,
-                executor, sqlContext, kettleHomePath, storeLocation
+            executeCompaction(carbonLoadModel: CarbonLoadModel,
+              storePath: String,
+              compactionModel: CompactionModel,
+              partitioner: Partitioner,
+              executor, sqlContext, kettleHomePath, storeLocation
+            )
+            triggeredCompactionStatus = true
+          } catch {
+            case e: Exception =>
+              logger.error(s"Exception in compaction thread ${ e.getMessage }")
+              exception = e
+          }
+          // continue in case of exception also, check for all the tables.
+          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+            ).equalsIgnoreCase("true")
+
+          if (!isConcurrentCompactionAllowed) {
+            logger.info("System level compaction lock is enabled.")
+            val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
+            var tableForCompaction = CarbonCompactionUtil
+              .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+                .tablesMeta.toArray, skipCompactionTables.toList.asJava
               )
-              triggeredCompactionStatus = true
-            }
-            catch {
-              case e: Exception =>
-                logger.error("Exception in compaction thread " + e.getMessage)
-                exception = e
-            }
-            // continue in case of exception also, check for all the tables.
-            val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-              ).equalsIgnoreCase("true")
-
-            if (!isConcurrentCompactionAllowed) {
-              logger.info("System level compaction lock is enabled.")
-              val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
-              var tableForCompaction = CarbonCompactionUtil
-                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
-                  .tablesMeta.toArray, skipCompactionTables.toList.asJava
+            while (null != tableForCompaction) {
+              logger.info("Compaction request has been identified for table " +
+                          s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                          s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+              val table: CarbonTable = tableForCompaction.carbonTable
+              val metadataPath = table.getMetaDataFilepath
+              val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
+
+              val newCarbonLoadModel = new CarbonLoadModel()
+              prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
+              val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+                .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+                  newCarbonLoadModel.getTableName
                 )
-              while (null != tableForCompaction) {
-                logger
-                  .info("Compaction request has been identified for table " + tableForCompaction
-                    .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
-                          .getTableName
-                  )
-                val table: CarbonTable = tableForCompaction.carbonTable
-                val metadataPath = table.getMetaDataFilepath
-                val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
-
-                val newCarbonLoadModel = new CarbonLoadModel()
-                prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
-                val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
-                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
-                    newCarbonLoadModel.getTableName
-                  )
-
-                val compactionSize = CarbonDataMergerUtil
-                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
-
-                val newcompactionModel = CompactionModel(compactionSize,
-                  compactionType,
-                  table,
-                  tableCreationTime,
-                  compactionModel.isDDLTrigger
+
+              val compactionSize = CarbonDataMergerUtil
+                .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+
+              val newcompactionModel = CompactionModel(compactionSize,
+                compactionType,
+                table,
+                tableCreationTime,
+                compactionModel.isDDLTrigger
+              )
+              // proceed for compaction
+              try {
+                executeCompaction(newCarbonLoadModel,
+                  newCarbonLoadModel.getStorePath,
+                  newcompactionModel,
+                  partitioner,
+                  executor, sqlContext, kettleHomePath, storeLocation
                 )
-                // proceed for compaction
-                try {
-                  executeCompaction(newCarbonLoadModel,
-                    newCarbonLoadModel.getStorePath,
-                    newcompactionModel,
-                    partitioner,
-                    executor, sqlContext, kettleHomePath, storeLocation
-                  )
+              } catch {
+                case e: Exception =>
+                  logger.error("Exception in compaction thread for table " +
+                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                // not handling the exception. only logging as this is not the table triggered
+                // by user.
+              } finally {
+                // delete the compaction required file in case of failure or success also.
+                if (!CarbonCompactionUtil
+                  .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+                  // if the compaction request file is not been able to delete then
+                  // add those tables details to the skip list so that it wont be considered next.
+                  skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
+                  logger.error("Compaction request file can not be deleted for table " +
+                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
                 }
-                catch {
-                  case e: Exception =>
-                    logger.error("Exception in compaction thread for table " + tableForCompaction
-                      .carbonTable.getDatabaseName + "." +
-                                 tableForCompaction.carbonTableIdentifier
-                                   .getTableName)
-                  // not handling the exception. only logging as this is not the table triggered
-                  // by user.
-                }
-                finally {
-                  // delete the compaction required file in case of failure or success also.
-                  if (!CarbonCompactionUtil
-                    .deleteCompactionRequiredFile(metadataPath, compactionType)) {
-                    // if the compaction request file is not been able to delete then
-                    // add those tables details to the skip list so that it wont be considered next.
-                    skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
-                    logger
-                      .error("Compaction request file can not be deleted for table " +
-                             tableForCompaction
-                               .carbonTable.getDatabaseName + "." + tableForCompaction
-                               .carbonTableIdentifier
-                               .getTableName
-                      )
-
-                  }
-                }
-                // ********* check again for all the tables.
-                tableForCompaction = CarbonCompactionUtil
-                  .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
-                    .tablesMeta.toArray, skipCompactionTables.asJava
-                  )
-              }
-              // giving the user his error for telling in the beeline if his triggered table
-              // compaction is failed.
-              if (!triggeredCompactionStatus) {
-                throw new Exception("Exception in compaction " + exception.getMessage)
               }
+              // ********* check again for all the tables.
+              tableForCompaction = CarbonCompactionUtil
+                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+                  .tablesMeta.toArray, skipCompactionTables.asJava
+                )
+            }
+            // giving the user his error for telling in the beeline if his triggered table
+            // compaction is failed.
+            if (!triggeredCompactionStatus) {
+              throw new Exception("Exception in compaction " + exception.getMessage)
             }
           }
-          finally {
-            executor.shutdownNow()
-            deletePartialLoadsInCompaction(carbonLoadModel)
-            compactionLock.unlock()
-          }
+        } finally {
+          executor.shutdownNow()
+          deletePartialLoadsInCompaction(carbonLoadModel)
+          compactionLock.unlock()
         }
       }
+    }
     // calling the run method of a thread to make the call as blocking call.
     // in the future we may make this as concurrent.
     compactionThread.run()
   }
 
   def prepareCarbonLoadModel(storePath: String,
-    table: CarbonTable,
-    newCarbonLoadModel: CarbonLoadModel): Unit = {
+      table: CarbonTable,
+      newCarbonLoadModel: CarbonLoadModel): Unit = {
     newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
     newCarbonLoadModel.setTableName(table.getFactTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
@@ -651,13 +619,10 @@ object CarbonDataRDDFactory extends Logging {
     // so deleting those folders.
     try {
       CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
-    }
-    catch {
+    } catch {
       case e: Exception =>
-        logger
-          .error("Exception in compaction thread while clean up of stale segments " + e
-            .getMessage
-          )
+        logger.error(s"Exception in compaction thread while clean up of stale segments" +
+                     s" ${ e.getMessage }")
     }
   }
 
@@ -674,13 +639,11 @@ object CarbonDataRDDFactory extends Logging {
     val isAgg = false
     // for handling of the segment Merging.
     def handleSegmentMerging(tableCreationTime: Long): Unit = {
-      logger
-        .info("compaction need status is " + CarbonDataMergerUtil.checkIfAutoLoadMergingRequired())
+      logger.info(s"compaction need status is" +
+                  s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
       if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
-        logger
-          .audit("Compaction request received for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
+        logger.audit(s"Compaction request received for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         val compactionSize = 0
         val isCompactionTriggerByDDl = false
         val compactionModel = CompactionModel(compactionSize,
@@ -717,8 +680,7 @@ object CarbonDataRDDFactory extends Logging {
             carbonTable,
             compactionModel
           )
-        }
-        else {
+        } else {
           val lock = CarbonLockFactory
             .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
               LockUsage.COMPACTION_LOCK
@@ -736,37 +698,34 @@ object CarbonDataRDDFactory extends Logging {
                 compactionModel,
                 lock
               )
-            }
-            catch {
-              case e : Exception =>
-                logger.error("Exception in start compaction thread. " + e.getMessage)
+            } catch {
+              case e: Exception =>
+                logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
                 lock.unlock()
                 throw e
             }
-          }
-          else {
-            logger
-              .audit("Not able to acquire the compaction lock for table " + carbonLoadModel
-                .getDatabaseName + "." + carbonLoadModel.getTableName
-              )
-            logger
-              .error("Not able to acquire the compaction lock for table " + carbonLoadModel
-                .getDatabaseName + "." + carbonLoadModel.getTableName
-              )
+          } else {
+            logger.audit("Not able to acquire the compaction lock for table " +
+                         s"${ carbonLoadModel.getDatabaseName }.${
+                           carbonLoadModel
+                             .getTableName
+                         }")
+            logger.error("Not able to acquire the compaction lock for table " +
+                         s"${ carbonLoadModel.getDatabaseName }.${
+                           carbonLoadModel
+                             .getTableName
+                         }")
           }
         }
       }
     }
 
     try {
-      logger
-        .audit("Data load request has been received for table " + carbonLoadModel
-          .getDatabaseName + "." + carbonLoadModel.getTableName
-        )
+      logger.audit(s"Data load request has been received for table" +
+                   s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       if (!useKettle) {
-        logger.audit("Data is loading with New Data Flow for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
+        logger.audit("Data is loading with New Data Flow for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
       // Check if any load need to be deleted before loading new data
       deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, partitioner, storePath,
@@ -801,13 +760,10 @@ object CarbonDataRDDFactory extends Logging {
       // so deleting those folders.
       try {
         CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
-      }
-      catch {
+      } catch {
         case e: Exception =>
           logger
-            .error("Exception in data load while clean up of stale segments " + e
-              .getMessage
-            )
+            .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
       }
 
       // reading the start time of data load.
@@ -826,14 +782,14 @@ object CarbonDataRDDFactory extends Logging {
       var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
       var status: Array[(String, LoadMetadataDetails)] = null
 
-      def loadDataFile(): Unit = { isTableSplitPartition match {
-        case true =>
+      def loadDataFile(): Unit = {
+        if (isTableSplitPartition) {
           /*
-           * when data handle by table split partition
-           * 1) get partition files, direct load or not will get the different files path
-           * 2) get files blocks by using SplitUtils
-           * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
-           */
+         * when data handle by table split partition
+         * 1) get partition files, direct load or not will get the different files path
+         * 2) get files blocks by using SplitUtils
+         * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
+         */
           var splits = Array[TableSplit]()
           if (carbonLoadModel.isDirectLoad) {
             // get all table Splits, this part means files were divide to different partitions
@@ -865,7 +821,7 @@ object CarbonDataRDDFactory extends Logging {
                 val pathBuilder = new StringBuilder()
                 pathBuilder.append(carbonLoadModel.getFactFilePath)
                 if (!carbonLoadModel.getFactFilePath.endsWith("/")
-                  && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
+                    && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
                   pathBuilder.append("/")
                 }
                 pathBuilder.append(split.getPartition.getUniqueID).append("/")
@@ -873,16 +829,15 @@ object CarbonDataRDDFactory extends Logging {
                   SplitUtils.getSplits(pathBuilder.toString, sqlContext.sparkContext))
             }
           }
-
-        case false =>
+        } else {
           /*
-           * when data load handle by node partition
-           * 1)clone the hadoop configuration,and set the file path to the configuration
-           * 2)use NewHadoopRDD to get split,size:Math.max(minSize, Math.min(maxSize, blockSize))
-           * 3)use DummyLoadRDD to group blocks by host,and let spark balance the block location
-           * 4)DummyLoadRDD output (host,Array[BlockDetails])as the parameter to CarbonDataLoadRDD
-           *   which parititon by host
-           */
+         * when data load handle by node partition
+         * 1)clone the hadoop configuration,and set the file path to the configuration
+         * 2)use NewHadoopRDD to get split,size:Math.max(minSize, Math.min(maxSize, blockSize))
+         * 3)use DummyLoadRDD to group blocks by host,and let spark balance the block location
+         * 4)DummyLoadRDD output (host,Array[BlockDetails])as the parameter to CarbonDataLoadRDD
+         *   which parititon by host
+         */
           val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
           // FileUtils will skip file which is no csv, and return all file path which split by ','
           val filePaths = carbonLoadModel.getFactFilePath
@@ -921,9 +876,11 @@ object CarbonDataRDDFactory extends Logging {
               .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
               .toSeq
           val timeElapsed: Long = System.currentTimeMillis - startTime
-          logInfo("Total Time taken in block allocation : " + timeElapsed)
-          logInfo("Total no of blocks : " + blockList.size
-            + ", No.of Nodes : " + nodeBlockMapping.size
+          logInfo("Total Time taken in block allocation: " + timeElapsed)
+          logInfo(s"Total no of blocks: ${ blockList.length }, No.of Nodes: ${
+            nodeBlockMapping
+              .size
+          }"
           )
           var str = ""
           nodeBlockMapping.foreach(entry => {
@@ -983,7 +940,7 @@ object CarbonDataRDDFactory extends Logging {
         var rdd = dataFrame.get.rdd
         var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
         numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
-        rdd = rdd.coalesce(numPartitions, false)
+        rdd = rdd.coalesce(numPartitions, shuffle = false)
 
         status = new DataFrameLoaderRDD(sqlContext.sparkContext,
           new DataLoadResultImpl(),
@@ -1061,37 +1018,34 @@ object CarbonDataRDDFactory extends Logging {
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
         logInfo("********clean up done**********")
         logger.audit(s"Data load is failed for " +
-          s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         logWarning("Cannot write load metadata file as data load failed")
         throw new Exception(errorMessage)
       } else {
-          val metadataDetails = status(0)._2
-          if (!isAgg) {
-            val status = CarbonLoaderUtil
-              .recordLoadMetadata(currentLoadCount,
-                metadataDetails,
-                carbonLoadModel,
-                loadStatus,
-                loadStartTime
-              )
-            if (!status) {
-              val errorMessage = "Dataload failed due to failure in table status updation."
-              logger.audit("Data load is failed for " +
-                           s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
-              logger.error("Dataload failed due to failure in table status updation.")
-              throw new Exception(errorMessage)
-            }
-          } else if (!carbonLoadModel.isRetentionRequest) {
-            // TODO : Handle it
-            logInfo("********Database updated**********")
+        val metadataDetails = status(0)._2
+        if (!isAgg) {
+          val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,
+            carbonLoadModel, loadStatus, loadStartTime)
+          if (!status) {
+            val errorMessage = "Dataload failed due to failure in table status updation."
+            logger.audit("Data load is failed for " +
+                         s"${ carbonLoadModel.getDatabaseName }.${
+                           carbonLoadModel
+                             .getTableName
+                         }")
+            logger.error("Dataload failed due to failure in table status updation.")
+            throw new Exception(errorMessage)
           }
-          logger.audit("Data load is successful for " +
-                       s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+        } else if (!carbonLoadModel.isRetentionRequest) {
+          // TODO : Handle it
+          logInfo("********Database updated**********")
+        }
+        logger.audit("Data load is successful for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           // compaction handling
           handleSegmentMerging(tableCreationTime)
-        }
-        catch {
+        } catch {
           case e: Exception =>
             throw new Exception(
               "Dataload is success. Auto-Compaction has failed. Please check logs.")
@@ -1111,10 +1065,10 @@ object CarbonDataRDDFactory extends Logging {
   }
 
   def deleteLoadsAndUpdateMetadata(
-    carbonLoadModel: CarbonLoadModel,
-    table: CarbonTable, partitioner: Partitioner,
-    storePath: String,
-    isForceDeletion: Boolean) {
+      carbonLoadModel: CarbonLoadModel,
+      table: CarbonTable, partitioner: Partitioner,
+      storePath: String,
+      isForceDeletion: Boolean) {
     if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
       val loadMetadataFilePath = CarbonLoaderUtil
         .extractLoadMetadataFileLocation(carbonLoadModel)
@@ -1132,36 +1086,34 @@ object CarbonDataRDDFactory extends Logging {
 
       if (isUpdationRequired) {
         try {
-        // Update load metadate file after cleaning deleted nodes
-        if (carbonTableStatusLock.lockWithRetries()) {
-          logger.info("Table status lock has been successfully acquired.")
+          // Update load metadate file after cleaning deleted nodes
+          if (carbonTableStatusLock.lockWithRetries()) {
+            logger.info("Table status lock has been successfully acquired.")
 
-          // read latest table status again.
-          val latestMetadata = segmentStatusManager
-            .readLoadMetadata(loadMetadataFilePath)
+            // read latest table status again.
+            val latestMetadata = segmentStatusManager.readLoadMetadata(loadMetadataFilePath)
 
-          // update the metadata details from old to new status.
+            // update the metadata details from old to new status.
+            val latestStatus = CarbonLoaderUtil
+              .updateLoadMetadataFromOldToNew(details, latestMetadata)
 
-          val latestStatus = CarbonLoaderUtil
-            .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
-          CarbonLoaderUtil.writeLoadMetadata(
-            carbonLoadModel.getCarbonDataLoadSchema,
-            carbonLoadModel.getDatabaseName,
-            carbonLoadModel.getTableName, latestStatus
-          )
-        }
-        else {
-          val errorMsg = "Clean files request is failed for " + carbonLoadModel.getDatabaseName +
-                         "." + carbonLoadModel.getTableName +
-                         ". Not able to acquire the table status lock due to other operation " +
-                         "running in the background."
-          logger.audit(errorMsg)
-          logger.error(errorMsg)
-          throw new Exception(errorMsg + " Please try after some time.")
+            CarbonLoaderUtil.writeLoadMetadata(
+              carbonLoadModel.getCarbonDataLoadSchema,
+              carbonLoadModel.getDatabaseName,
+              carbonLoadModel.getTableName, latestStatus
+            )
+          } else {
+            val errorMsg = "Clean files request is failed for " +
+                           s"${ carbonLoadModel.getDatabaseName }." +
+                           s"${ carbonLoadModel.getTableName }" +
+                           ". Not able to acquire the table status lock due to other operation " +
+                           "running in the background."
+            logger.audit(errorMsg)
+            logger.error(errorMsg)
+            throw new Exception(errorMsg + " Please try after some time.")
 
-        }
-      } finally {
+          }
+        } finally {
           CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
         }
       }
@@ -1197,10 +1149,9 @@ object CarbonDataRDDFactory extends Logging {
           partitioner,
           storePath,
           isForceDeletion = true)
-      }
-      else {
-        val errorMsg = "Clean files request is failed for " + carbonLoadModel.getDatabaseName +
-                       "." + carbonLoadModel.getTableName +
+      } else {
+        val errorMsg = "Clean files request is failed for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
                        ". Not able to acquire the clean files lock due to another clean files " +
                        "operation is running in the background."
         logger.audit(errorMsg)
@@ -1208,10 +1159,10 @@ object CarbonDataRDDFactory extends Logging {
         throw new Exception(errorMsg + " Please try after some time.")
 
       }
-    }
-    finally {
+    } finally {
       CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
     }
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index 8c52249..17b487c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -86,7 +86,7 @@ class CarbonDeleteLoadByDateRDD[K, V](
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val theSplit = split.asInstanceOf[CarbonLoadPartition]
     val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name : " + s.head + s.length)
+    logInfo("Host Name: " + s.head + s.length)
     s
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index df40ed7..57bf124 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -77,7 +77,7 @@ class CarbonDeleteLoadRDD[V: ClassTag](
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val theSplit = split.asInstanceOf[CarbonLoadPartition]
     val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name : " + s.head + s.length)
+    logInfo("Host Name: " + s.head + s.length)
     s
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index b7da579..bce4eb2 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifie
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
@@ -67,6 +67,7 @@ trait GenericParser {
 
 case class DictionaryStats(distinctValues: java.util.List[String],
     dictWriteTime: Long, sortIndexWriteTime: Long)
+
 case class PrimitiveParser(dimension: CarbonDimension,
     setOpt: Option[HashSet[String]]) extends GenericParser {
   val (hasDictEncoding, set: HashSet[String]) = setOpt match {
@@ -164,20 +165,21 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
  * A RDD to combine all dictionary distinct values.
  *
  * @constructor create a RDD with RDD[(String, Iterable[String])]
- * @param prev the input RDD[(String, Iterable[String])]
+ * @param prev  the input RDD[(String, Iterable[String])]
  * @param model a model package load info
  */
 class CarbonAllDictionaryCombineRDD(
-                                       prev: RDD[(String, Iterable[String])],
-                                       model: DictionaryLoadModel)
+    prev: RDD[(String, Iterable[String])],
+    model: DictionaryLoadModel)
   extends RDD[(Int, ColumnDistinctValues)](prev) with Logging {
 
-  override def getPartitions: Array[Partition] =
+  override def getPartitions: Array[Partition] = {
     firstParent[(String, Iterable[String])].partitions
+  }
 
   override def compute(split: Partition, context: TaskContext
-                      ): Iterator[(Int, ColumnDistinctValues)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass().getName())
+  ): Iterator[(Int, ColumnDistinctValues)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
     val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
     /*
@@ -240,7 +242,7 @@ class CarbonBlockDistinctValuesCombineRDD(
   override def compute(split: Partition,
       context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordLoadCsvfilesToDfTime()
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
     val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
     var rowCount = 0L
     try {
@@ -259,7 +261,7 @@ class CarbonBlockDistinctValuesCombineRDD(
           }
         }
       }
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordLoadCsvfilesToDfTime()
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
     } catch {
       case ex: Exception =>
         LOGGER.error(ex)
@@ -288,7 +290,7 @@ class CarbonGlobalDictionaryGenerateRDD(
   override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
 
   override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass().getName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     var isHighCardinalityColumn = false
     val iter = new Iterator[(Int, String, Boolean)] {
@@ -303,11 +305,11 @@ class CarbonGlobalDictionaryGenerateRDD(
            model.hdfsTempLocation)
       }
       if (StringUtils.isNotBlank(model.lockType)) {
-         CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
-           model.lockType)
+        CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
+          model.lockType)
       }
       if (StringUtils.isNotBlank(model.zooKeeperUrl)) {
-         CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
+        CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
           model.zooKeeperUrl)
       }
       val dictLock = CarbonLockFactory
@@ -320,7 +322,7 @@ class CarbonGlobalDictionaryGenerateRDD(
         val valuesBuffer = new mutable.HashSet[String]
         val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context)
         var rowCount = 0L
-        CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicShuffleAndWriteTime()
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
         breakable {
           while (rddIter.hasNext) {
             val distinctValueList = rddIter.next()._2
@@ -329,7 +331,7 @@ class CarbonGlobalDictionaryGenerateRDD(
             // check high cardinality
             if (model.isFirstLoad && model.highCardIdentifyEnable
                 && !model.isComplexes(split.index)
-                && model.dimensions(split.index).isColumnar()) {
+                && model.dimensions(split.index).isColumnar) {
               isHighCardinalityColumn = GlobalDictionaryUtil.isHighCardinalityColumn(
                 valuesBuffer.size, rowCount, model)
               if (isHighCardinalityColumn) {
@@ -338,10 +340,13 @@ class CarbonGlobalDictionaryGenerateRDD(
             }
           }
         }
-        val combineListTime = (System.currentTimeMillis() - t1)
+        val combineListTime = System.currentTimeMillis() - t1
         if (isHighCardinalityColumn) {
-          LOGGER.info("column " + model.table.getTableUniqueName + "." +
-                      model.primDimensions(split.index).getColName + " is high cardinality column")
+          LOGGER.info(s"column ${ model.table.getTableUniqueName }." +
+                      s"${
+                        model.primDimensions(split.index)
+                          .getColName
+                      } is high cardinality column")
         } else {
           isDictionaryLocked = dictLock.lockWithRetries()
           if (isDictionaryLocked) {
@@ -367,7 +372,7 @@ class CarbonGlobalDictionaryGenerateRDD(
           } else {
             null
           }
-          val dictCacheTime = (System.currentTimeMillis - t2)
+          val dictCacheTime = System.currentTimeMillis - t2
           val t3 = System.currentTimeMillis()
           val dictWriteTask = new DictionaryWriterTask(valuesBuffer,
             dictionaryForDistinctValueLookUp,
@@ -375,7 +380,7 @@ class CarbonGlobalDictionaryGenerateRDD(
             split.index)
           // execute dictionary writer task to get distinct values
           val distinctValues = dictWriteTask.execute()
-          val dictWriteTime = (System.currentTimeMillis() - t3)
+          val dictWriteTime = System.currentTimeMillis() - t3
           val t4 = System.currentTimeMillis()
           // if new data came than rewrite sort index file
           if (distinctValues.size() > 0) {
@@ -385,22 +390,21 @@ class CarbonGlobalDictionaryGenerateRDD(
               distinctValues)
             sortIndexWriteTask.execute()
           }
-          val sortIndexWriteTime = (System.currentTimeMillis() - t4)
-          CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicShuffleAndWriteTime()
+          val sortIndexWriteTime = System.currentTimeMillis() - t4
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
           // After sortIndex writing, update dictionaryMeta
           dictWriteTask.updateMetaData()
           // clear the value buffer after writing dictionary data
           valuesBuffer.clear
-          org.apache.carbondata.core.util.CarbonUtil
-            .clearDictionaryCache(dictionaryForDistinctValueLookUp);
+          CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
           dictionaryForDistinctValueLookUpCleared = true
-          LOGGER.info("\n columnName:" + model.primDimensions(split.index).getColName +
-              "\n columnId:" + model.primDimensions(split.index).getColumnId +
-              "\n new distinct values count:" + distinctValues.size() +
-              "\n combine lists:" + combineListTime +
-              "\n create dictionary cache:" + dictCacheTime +
-              "\n sort list, distinct and write:" + dictWriteTime +
-              "\n write sort info:" + sortIndexWriteTime)
+          LOGGER.info(s"\n columnName: ${ model.primDimensions(split.index).getColName }" +
+                      s"\n columnId: ${ model.primDimensions(split.index).getColumnId }" +
+                      s"\n new distinct values count: ${ distinctValues.size() }" +
+                      s"\n combine lists: $combineListTime" +
+                      s"\n create dictionary cache: $dictCacheTime" +
+                      s"\n sort list, distinct and write: $dictWriteTime" +
+                      s"\n write sort info: $sortIndexWriteTime")
         }
       } catch {
         case ex: Exception =>
@@ -408,11 +412,9 @@ class CarbonGlobalDictionaryGenerateRDD(
           throw ex
       } finally {
         if (!dictionaryForDistinctValueLookUpCleared) {
-          org.apache.carbondata.core.util.CarbonUtil
-            .clearDictionaryCache(dictionaryForDistinctValueLookUp);
+          CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
         }
-        org.apache.carbondata.core.util.CarbonUtil
-          .clearDictionaryCache(dictionaryForSortIndexWriting);
+        CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting)
         if (dictLock != null && isDictionaryLocked) {
           if (dictLock.unlock()) {
             logInfo(s"Dictionary ${
@@ -441,14 +443,17 @@ class CarbonGlobalDictionaryGenerateRDD(
         (split.index, status, isHighCardinalityColumn)
       }
     }
+
     iter
   }
+
 }
+
 /**
  * Set column dictionry patition format
  *
- * @param id  partition id
- * @param dimension  current carbon dimension
+ * @param id        partition id
+ * @param dimension current carbon dimension
  */
 class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
   extends Partition {
@@ -460,13 +465,13 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
 /**
  * Use external column dict to generate global dictionary
  *
- * @param carbonLoadModel  carbon load model
- * @param sparkContext  spark context
- * @param table  carbon table identifier
- * @param dimensions  carbon dimenisons having predefined dict
- * @param hdfsLocation  carbon base store path
+ * @param carbonLoadModel carbon load model
+ * @param sparkContext    spark context
+ * @param table           carbon table identifier
+ * @param dimensions      carbon dimenisons having predefined dict
+ * @param hdfsLocation    carbon base store path
  * @param dictFolderPath  path of dictionary folder
-*/
+ */
 class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     dictionaryLoadModel: DictionaryLoadModel,
     sparkContext: SparkContext,
@@ -505,25 +510,25 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     } catch {
       case ex: Exception =>
         logError(s"Error in reading pre-defined " +
-          s"dictionary file:${ex.getMessage}")
+                 s"dictionary file:${ ex.getMessage }")
         throw ex
     } finally {
       if (csvReader != null) {
         try {
-          csvReader.close
+          csvReader.close()
         } catch {
           case ex: Exception =>
             logError(s"Error in closing csvReader of " +
-              s"pre-defined dictionary file:${ex.getMessage}")
+                     s"pre-defined dictionary file:${ ex.getMessage }")
         }
       }
       if (inputStream != null) {
         try {
-          inputStream.close
+          inputStream.close()
         } catch {
           case ex: Exception =>
             logError(s"Error in closing inputStream of " +
-              s"pre-defined dictionary file:${ex.getMessage}")
+                     s"pre-defined dictionary file:${ ex.getMessage }")
         }
       }
     }