You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/12/27 15:41:30 UTC

[1/3] incubator-carbondata git commit: remove redundant declaration

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master e7b46ccf0 -> a011aafb0


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
index 86c55d3..c278cd3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala
@@ -40,10 +40,7 @@ object CarbonHiveMetadataUtil {
   def invalidateAndDropTable(schemaName: String,
       cubeName: String,
       sparkSession: SparkSession): Unit = {
-    val tableWithDb = schemaName + "." + cubeName
-    val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableWithDb)
     try {
-      // todo(wf): in spark no invalidate method now
       sparkSession.sql(s"DROP TABLE IF EXISTS $schemaName.$cubeName")
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index b3a7d5a..463faf1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -163,7 +163,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
             Sort(sort.order, sort.global, child)
           }
         case union: Union
-          if !(union.children(0).isInstanceOf[CarbonDictionaryTempDecoder] ||
+          if !(union.children.head.isInstanceOf[CarbonDictionaryTempDecoder] ||
             union.children(1).isInstanceOf[CarbonDictionaryTempDecoder]) =>
           val children = union.children.map { child =>
             val condAttrs = new util.HashSet[AttributeReferenceWrapper]
@@ -173,7 +173,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
               !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
               CarbonDictionaryTempDecoder(condAttrs,
                 new util.HashSet[AttributeReferenceWrapper](),
-                union.children(0))
+                union.children.head)
             } else {
               child
             }
@@ -557,24 +557,6 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
     }
   }
 
-  private def updateRelation(relation: CarbonDatasourceHadoopRelation):
-  CarbonDatasourceHadoopRelation = {
-    val fields = relation.schema.fields
-    val numberOfFields = relation.schema.fields.length
-    val newFields = new Array[StructField](numberOfFields)
-    val dictionaryMap = relation.carbonRelation.metaData.dictionaryMap
-    for (i <- 0 until numberOfFields ) {
-      dictionaryMap.get(fields(i).name) match {
-        case Some(true) =>
-          val field = fields(i)
-          newFields(i) = StructField(field.name, IntegerType, field.nullable, field.metadata)
-        case _ => newFields(i) = fields(i)
-      }
-    }
-    CarbonDatasourceHadoopRelation(relation.sparkSession,
-      relation.paths, relation.parameters, Option(StructType(newFields)))
-  }
-
   private def updateProjection(plan: LogicalPlan): LogicalPlan = {
     val transFormedPlan = plan transform {
       case p@Project(projectList: Seq[NamedExpression], cd: CarbonDictionaryCatalystDecoder) =>
@@ -605,7 +587,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
       case a@Alias(exp, name) =>
         exp match {
           case attr: Attribute => aliasMap.put(a.toAttribute, attr)
-          case _ => aliasMap.put(a.toAttribute, new AttributeReference("", StringType)())
+          case _ => aliasMap.put(a.toAttribute, AttributeReference("", StringType)())
         }
         a
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 9a3f828..5a91ad1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -78,7 +78,6 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
       if (ctx.bucketSpec != null) {
         operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
       }
-      val comment = Option(ctx.STRING).map(string)
       val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
       val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
       val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index 399b3e6..e72abd7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -36,7 +36,7 @@ object CleanFiles {
   def main(args: Array[String]): Unit = {
 
     if (args.length < 2) {
-      System.err.println("Usage: CleanFiles <store path> <table name>");
+      System.err.println("Usage: CleanFiles <store path> <table name>")
       System.exit(1)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 2db6e48..f0cc19b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -33,7 +33,7 @@ object Compaction {
 
   def main(args: Array[String]): Unit = {
     if (args.length < 3) {
-      System.err.println("Usage: Compaction <store path> <table name> <major|minor>");
+      System.err.println("Usage: Compaction <store path> <table name> <major|minor>")
       System.exit(1)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 951cd7f..6219f1e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -35,7 +35,7 @@ object DeleteSegmentByDate {
   def main(args: Array[String]): Unit = {
     if (args.length < 3) {
       System.err.println(
-        "Usage: DeleteSegmentByDate <store path> <table name> <before date value>");
+        "Usage: DeleteSegmentByDate <store path> <table name> <before date value>")
       System.exit(1)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index dad9f59..303a062 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -40,7 +40,7 @@ object DeleteSegmentById {
 
     if (args.length < 3) {
       System.err.println(
-        "Usage: DeleteSegmentByID <store path> <table name> <segment id list>");
+        "Usage: DeleteSegmentByID <store path> <table name> <segment id list>")
       System.exit(1)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index c953089..c7286ee 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -62,7 +62,7 @@ object ShowSegments {
   def main(args: Array[String]): Unit = {
 
     if (args.length < 2) {
-      System.err.println("Usage: ShowSegments <store path> <table name> [limit]");
+      System.err.println("Usage: ShowSegments <store path> <table name> [limit]")
       System.exit(1)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 424d8fa..bcc82ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -66,11 +66,11 @@ object TableLoader {
 
   def main(args: Array[String]): Unit = {
     if (args.length < 3) {
-      System.err.println("Usage: TableLoader <properties file> <table name> <input files>");
+      System.err.println("Usage: TableLoader <properties file> <table name> <input files>")
       System.exit(1)
     }
     System.out.println("parameter list:")
-    args.foreach(System.out.println(_))
+    args.foreach(System.out.println)
     val map = extractOptions(TableAPIUtil.escape(args(0)))
     val storePath = extractStorePath(map)
     System.out.println(s"${CarbonCommonConstants.STORE_LOCATION}:$storePath")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index b7617e8..e9330c8 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -43,21 +43,26 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists Carbon_automation_hive")
     sql("drop table if exists Carbon_automation_test_hive")
 
-    sql("create table if not exists Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVer
 sion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointId int,gamePointDescription string)  USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default', 'tableName'='Carbon_automation_test','DICTIONARY_INCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')");
+    sql("create table if not exists Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVer
 sion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointId int,gamePointDescription string)  USING org.apache.spark.sql.CarbonSource OPTIONS('dbName'='default', 'tableName'='Carbon_automation_test','DICTIONARY_INCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')")
+
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
 
-    sql("create table if not exists Carbon_automation_hive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVer
 sion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointId int,gamePointDescription string) row format delimited fields terminated by ','");
-    sql("LOAD DATA LOCAL INPATH '"+currentDirectory+"/src/test/resources/100_olap.csv' INTO table Carbon_automation_hive ");
+    sql("create table if not exists Carbon_automation_hive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVer
 sion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointId int,gamePointDescription string) row format delimited fields terminated by ','")
+
+    sql("LOAD DATA LOCAL INPATH '"+currentDirectory+"/src/test/resources/100_olap.csv' INTO table Carbon_automation_hive ")
+
 
     //hive table
-    sql("create table if not exists Carbon_automation_test_hive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operato
 rsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointId int,gamePointDescription string)row format delimited fields terminated by ','");
-    sql("LOAD DATA LOCAL INPATH '"+currentDirectory+"/src/test/resources/100_olap.csv' INTO table Carbon_automation_test_hive");
+    sql("create table if not exists Carbon_automation_test_hive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operato
 rsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointId int,gamePointDescription string)row format delimited fields terminated by ','")
+
+    sql("LOAD DATA LOCAL INPATH '"+currentDirectory+"/src/test/resources/100_olap.csv' INTO table Carbon_automation_test_hive")
+
 
-    sql("INSERT INTO table Carbon_automation_test select * from Carbon_automation_test_hive");
+    sql("INSERT INTO table Carbon_automation_test select * from Carbon_automation_test_hive")
   }
 
-  def dropAllTable{
+  def dropAllTable(): Unit = {
     sql("drop table if exists Carbon_automation_test")
     sql("drop table if exists Carbon_automation_hive")
     sql("drop table if exists Carbon_automation_test_hive")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 4310d04..57a8475 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -79,7 +79,7 @@ class QueryTest extends PlanTest {
 
   def sql(sqlText: String): DataFrame  = spark.sql(sqlText)
 
-  def clean: Unit = {
+  def clean(): Unit = {
     val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
     clean(storeLocation)
   }
@@ -249,7 +249,7 @@ object QueryTest {
       return Some(errorMessage)
     }
 
-    return None
+    None
   }
 
 }


[2/3] incubator-carbondata git commit: remove redundant declaration

Posted by ch...@apache.org.
remove redundant declaration

clean up

clean up

clean up RDDFactory

fix compile

fix style

fix style


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6fee9930
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6fee9930
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6fee9930

Branch: refs/heads/master
Commit: 6fee9930e768d5019c3d6ee3a9a7c0a983011119
Parents: e7b46cc
Author: jackylk <ja...@huawei.com>
Authored: Tue Dec 27 11:27:59 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Tue Dec 27 23:40:18 2016 +0800

----------------------------------------------------------------------
 .../carbondata/spark/util/CarbonQueryUtil.java  |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 150 +++++------
 .../spark/load/DeleteLoadFromMetadata.java      |  44 ----
 .../carbondata/spark/util/CarbonQueryUtil.java  | 248 -------------------
 .../vectorreader/ColumnarVectorWrapper.java     |   2 +-
 .../VectorizedCarbonRecordReader.java           |   8 +-
 .../spark/CarbonColumnValidator.scala           |   2 +-
 .../apache/carbondata/spark/CarbonFilters.scala |  34 +--
 .../carbondata/spark/CarbonSparkFactory.scala   |   4 +-
 .../org/apache/carbondata/spark/KeyVal.scala    |  89 -------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 150 +++++------
 .../spark/sql/CarbonDictionaryDecoder.scala     |  16 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  16 +-
 .../org/apache/spark/sql/CarbonSource.scala     |   2 +-
 .../org/apache/spark/sql/TableCreator.scala     |  29 ++-
 .../execution/CarbonLateDecodeStrategy.scala    |  12 +-
 .../execution/command/carbonTableSchema.scala   |  37 +--
 .../spark/sql/hive/CarbonHiveMetadataUtil.scala |   3 -
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  24 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   1 -
 .../org/apache/spark/util/CleanFiles.scala      |   2 +-
 .../org/apache/spark/util/Compaction.scala      |   2 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   2 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   2 +-
 .../org/apache/spark/util/ShowSegments.scala    |   2 +-
 .../org/apache/spark/util/TableLoader.scala     |   4 +-
 .../AllDataTypesTestCaseAggregate.scala         |  19 +-
 .../spark/sql/common/util/QueryTest.scala       |   4 +-
 28 files changed, 242 insertions(+), 668 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
index d2e716f..9d1a281 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
@@ -39,7 +39,7 @@ import org.apache.commons.lang3.StringUtils;
 /**
  * This utilty parses the Carbon query plan to actual query model object.
  */
-public final class CarbonQueryUtil {
+public class CarbonQueryUtil {
 
   private CarbonQueryUtil() {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 93194c8..ff7bf23 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -81,10 +81,10 @@ object CarbonDataRDDFactory {
     }
 
     LOGGER.audit(s"Compaction request received for table " +
-                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val tableCreationTime = CarbonEnv.get.carbonMetastore
-      .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+        .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -102,10 +102,10 @@ object CarbonDataRDDFactory {
     )
 
     val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-        CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-      )
-      .equalsIgnoreCase("true")
+        .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+          CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+        )
+        .equalsIgnoreCase("true")
 
     // if system level compaction is enabled then only one compaction can run in the system
     // if any other request comes at this time then it will create a compaction request file.
@@ -124,13 +124,13 @@ object CarbonDataRDDFactory {
     } else {
       // normal flow of compaction
       val lock = CarbonLockFactory
-        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-          LockUsage.COMPACTION_LOCK
-        )
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+            LockUsage.COMPACTION_LOCK
+          )
 
       if (lock.lockWithRetries()) {
         LOGGER.info("Acquired the compaction lock for table" +
-                    s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           startCompactionThreads(sqlContext,
             carbonLoadModel,
@@ -147,9 +147,9 @@ object CarbonDataRDDFactory {
         }
       } else {
         LOGGER.audit("Not able to acquire the compaction lock for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.error(s"Not able to acquire the compaction lock for table" +
-                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         sys.error("Table is already locked for compaction. Please try after some time.")
       }
     }
@@ -164,12 +164,12 @@ object CarbonDataRDDFactory {
       carbonTable: CarbonTable,
       compactionModel: CompactionModel): Unit = {
     val lock = CarbonLockFactory
-      .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
-        LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
-      )
+        .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
+          LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
+        )
     if (lock.lockWithRetries()) {
       LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
-                  s".${ carbonLoadModel.getTableName }")
+          s".${ carbonLoadModel.getTableName }")
       try {
         startCompactionThreads(sqlContext,
           carbonLoadModel,
@@ -190,20 +190,20 @@ object CarbonDataRDDFactory {
       }
     } else {
       LOGGER.audit("Not able to acquire the system level compaction lock for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       LOGGER.error("Not able to acquire the compaction lock for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       CarbonCompactionUtil
-        .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
+          .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
       // do sys error only in case of DDL trigger.
       if (compactionModel.isDDLTrigger) {
         sys.error("Compaction is in progress, compaction request for table " +
-                  s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-                  " is in queue.")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+            " is in queue.")
       } else {
         LOGGER.error("Compaction is in progress, compaction request for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-                     " is in queue.")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+            " is in queue.")
       }
     }
   }
@@ -226,7 +226,7 @@ object CarbonDataRDDFactory {
     } catch {
       case e: Exception =>
         LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
-                     s" ${ e.getMessage }")
+            s" ${ e.getMessage }")
     }
 
     val compactionThread = new Thread {
@@ -250,9 +250,9 @@ object CarbonDataRDDFactory {
           }
           // continue in case of exception also, check for all the tables.
           val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-            ).equalsIgnoreCase("true")
+              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+              ).equalsIgnoreCase("true")
 
           if (!isConcurrentCompactionAllowed) {
             LOGGER.info("System level compaction lock is enabled.")
@@ -262,8 +262,8 @@ object CarbonDataRDDFactory {
                   skipCompactionTables.toList.asJava)
             while (null != tableForCompaction) {
               LOGGER.info("Compaction request has been identified for table " +
-                          s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                          s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                  s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                  s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
               val table: CarbonTable = tableForCompaction.carbonTable
               val metadataPath = table.getMetaDataFilepath
               val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
@@ -271,12 +271,12 @@ object CarbonDataRDDFactory {
               val newCarbonLoadModel = new CarbonLoadModel()
               DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
               val tableCreationTime = CarbonEnv.get.carbonMetastore
-                .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
-                  newCarbonLoadModel.getTableName
-                )
+                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+                    newCarbonLoadModel.getTableName
+                  )
 
               val compactionSize = CarbonDataMergerUtil
-                .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
 
               val newcompactionModel = CompactionModel(compactionSize,
                 compactionType,
@@ -294,27 +294,27 @@ object CarbonDataRDDFactory {
               } catch {
                 case e: Exception =>
                   LOGGER.error("Exception in compaction thread for table " +
-                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
                 // not handling the exception. only logging as this is not the table triggered
                 // by user.
               } finally {
                 // delete the compaction required file in case of failure or success also.
                 if (!CarbonCompactionUtil
-                  .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+                    .deleteCompactionRequiredFile(metadataPath, compactionType)) {
                   // if the compaction request file is not been able to delete then
                   // add those tables details to the skip list so that it wont be considered next.
                   skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
                   LOGGER.error("Compaction request file can not be deleted for table " +
-                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
                 }
               }
               // ********* check again for all the tables.
               tableForCompaction = CarbonCompactionUtil
-                .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
-                  .tablesMeta.toArray, skipCompactionTables.asJava
-                )
+                  .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
+                      .tablesMeta.toArray, skipCompactionTables.asJava
+                  )
             }
             // giving the user his error for telling in the beeline if his triggered table
             // compaction is failed.
@@ -347,10 +347,10 @@ object CarbonDataRDDFactory {
     // for handling of the segment Merging.
     def handleSegmentMerging(tableCreationTime: Long): Unit = {
       LOGGER.info(s"compaction need status is" +
-                  s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
+          s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
       if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
         LOGGER.audit(s"Compaction request received for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         val compactionSize = 0
         val isCompactionTriggerByDDl = false
         val compactionModel = CompactionModel(compactionSize,
@@ -370,10 +370,10 @@ object CarbonDataRDDFactory {
         storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
         val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-            CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-          )
-          .equalsIgnoreCase("true")
+            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+            )
+            .equalsIgnoreCase("true")
 
         if (!isConcurrentCompactionAllowed) {
 
@@ -388,9 +388,9 @@ object CarbonDataRDDFactory {
           )
         } else {
           val lock = CarbonLockFactory
-            .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-              LockUsage.COMPACTION_LOCK
-            )
+              .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+                LockUsage.COMPACTION_LOCK
+              )
 
           if (lock.lockWithRetries()) {
             LOGGER.info("Acquired the compaction lock.")
@@ -411,15 +411,15 @@ object CarbonDataRDDFactory {
             }
           } else {
             LOGGER.audit("Not able to acquire the compaction lock for table " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
             LOGGER.error("Not able to acquire the compaction lock for table " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
           }
         }
       }
@@ -427,10 +427,10 @@ object CarbonDataRDDFactory {
 
     try {
       LOGGER.audit(s"Data load request has been received for table" +
-                   s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       if (!useKettle) {
         LOGGER.audit("Data is loading with New Data Flow for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
       // Check if any load need to be deleted before loading new data
       DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
@@ -468,16 +468,16 @@ object CarbonDataRDDFactory {
       } catch {
         case e: Exception =>
           LOGGER
-            .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
+              .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
       }
 
       // reading the start time of data load.
       val loadStartTime = CarbonLoaderUtil.readCurrentTime()
       carbonLoadModel.setFactTimeStamp(loadStartTime)
       val tableCreationTime = CarbonEnv.get.carbonMetastore
-        .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+          .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
       val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
-        .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+          .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
       // get partition way from configuration
       // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
@@ -528,7 +528,7 @@ object CarbonDataRDDFactory {
                 }
                 pathBuilder.append(split.getPartition.getUniqueID).append("/")
                 (split.getPartition.getUniqueID,
-                  SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
+                    SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
             }
           }
         } else {
@@ -570,15 +570,15 @@ object CarbonDataRDDFactory {
           // group blocks to nodes, tasks
           val startTime = System.currentTimeMillis
           val activeNodes = DistributionUtil
-            .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
+              .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
           val nodeBlockMapping =
             CarbonLoaderUtil
-              .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
-              .toSeq
+                .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
+                .toSeq
           val timeElapsed: Long = System.currentTimeMillis - startTime
           LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
           LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
-                      s"No.of Nodes: ${nodeBlockMapping.size}")
+              s"No.of Nodes: ${nodeBlockMapping.size}")
           var str = ""
           nodeBlockMapping.foreach(entry => {
             val tableBlock = entry._2
@@ -588,7 +588,7 @@ object CarbonDataRDDFactory {
                 hostentry.equalsIgnoreCase(entry._1)
               )) {
                 str = str + " , mismatch locations: " + tableBlockInfo.getLocations
-                  .foldLeft("")((a, b) => a + "," + b)
+                    .foldLeft("")((a, b) => a + "," + b)
               }
             )
             str = str + "\n"
@@ -743,7 +743,7 @@ object CarbonDataRDDFactory {
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
         LOGGER.info("********clean up done**********")
         LOGGER.audit(s"Data load is failed for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
         throw new Exception(errorMessage)
       } else {
@@ -754,10 +754,10 @@ object CarbonDataRDDFactory {
           if (!status) {
             val errorMessage = "Dataload failed due to failure in table status updation."
             LOGGER.audit("Data load is failed for " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
             LOGGER.error("Dataload failed due to failure in table status updation.")
             throw new Exception(errorMessage)
           }
@@ -766,7 +766,7 @@ object CarbonDataRDDFactory {
           LOGGER.info("********Database updated**********")
         }
         LOGGER.audit("Data load is successful for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           // compaction handling
           handleSegmentMerging(tableCreationTime)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
deleted file mode 100644
index 0926e1c..0000000
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Project Name  : Carbon
- * Module Name   : CARBON Data Processor
- * Author    : R00903928
- * Created Date  : 21-Sep-2015
- * FileName   : DeleteLoadFromMetadata.java
- * Description   : Kettle step to generate MD Key
- * Class Version  : 1.0
- */
-package org.apache.carbondata.spark.load;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-public final class DeleteLoadFromMetadata {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DeleteLoadFromMetadata.class.getName());
-
-  private DeleteLoadFromMetadata() {
-
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
deleted file mode 100644
index 04ef665..0000000
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.util;
-
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.spark.partition.api.Partition;
-import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
-import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl;
-import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
-import org.apache.carbondata.spark.splits.TableSplit;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * This utilty parses the Carbon query plan to actual query model object.
- */
-public final class CarbonQueryUtil {
-
-  private CarbonQueryUtil() {
-
-  }
-
-
-  /**
-   * It creates the one split for each region server.
-   */
-  public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
-      CarbonQueryPlan queryPlan) throws IOException {
-
-    //Just create splits depends on locations of region servers
-    List<Partition> allPartitions = null;
-    if (queryPlan == null) {
-      allPartitions =
-          QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
-    } else {
-      allPartitions =
-          QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
-    }
-    TableSplit[] splits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < splits.length; i++) {
-      splits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location =
-              QueryPartitionHelper.getInstance().getLocation(partition, databaseName, tableName);
-      locations.add(location);
-      splits[i].setPartition(partition);
-      splits[i].setLocations(locations);
-    }
-
-    return splits;
-  }
-
-  /**
-   * It creates the one split for each region server.
-   */
-  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception {
-
-    //Just create splits depends on locations of region servers
-    FileType fileType = FileFactory.getFileType(sourcePath);
-    DefaultLoadBalancer loadBalancer = null;
-    List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
-    loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
-    TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < tblSplits.length; i++) {
-      tblSplits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location = loadBalancer.getNodeForPartitions(partition);
-      locations.add(location);
-      tblSplits[i].setPartition(partition);
-      tblSplits[i].setLocations(locations);
-    }
-    return tblSplits;
-  }
-
-  /**
-   * It creates the one split for each region server.
-   */
-  public static TableSplit[] getPartitionSplits(String sourcePath, String[] nodeList,
-      int partitionCount) throws Exception {
-
-    //Just create splits depends on locations of region servers
-    FileType fileType = FileFactory.getFileType(sourcePath);
-    DefaultLoadBalancer loadBalancer = null;
-    List<Partition> allPartitions = getAllPartitions(sourcePath, fileType, partitionCount);
-    loadBalancer = new DefaultLoadBalancer(Arrays.asList(nodeList), allPartitions);
-    TableSplit[] splits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < splits.length; i++) {
-      splits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location = loadBalancer.getNodeForPartitions(partition);
-      locations.add(location);
-      splits[i].setPartition(partition);
-      splits[i].setLocations(locations);
-    }
-    return splits;
-  }
-
-  public static void getAllFiles(String sourcePath, List<String> partitionsFiles, FileType fileType)
-      throws Exception {
-
-    if (!FileFactory.isFileExist(sourcePath, fileType, false)) {
-      throw new Exception("Source file doesn't exist at path: " + sourcePath);
-    }
-
-    CarbonFile file = FileFactory.getCarbonFile(sourcePath, fileType);
-    if (file.isDirectory()) {
-      CarbonFile[] fileNames = file.listFiles(new CarbonFileFilter() {
-        @Override public boolean accept(CarbonFile pathname) {
-          return true;
-        }
-      });
-      for (int i = 0; i < fileNames.length; i++) {
-        getAllFiles(fileNames[i].getPath(), partitionsFiles, fileType);
-      }
-    } else {
-      // add only csv files
-      if (file.getName().endsWith("csv")) {
-        partitionsFiles.add(file.getPath());
-      }
-    }
-  }
-
-  /**
-   * split sourcePath by comma
-   */
-  public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
-      String separator) {
-    if (StringUtils.isNotEmpty(sourcePath)) {
-      String[] files = sourcePath.split(separator);
-      for (String file : files) {
-        partitionsFiles.add(file);
-      }
-    }
-  }
-
-  private static List<Partition> getAllFilesForDataLoad(String sourcePath) throws Exception {
-    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
-    List<Partition> partitionList =
-        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
-
-    partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
-    partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
-
-    for (int i = 0; i < files.size(); i++) {
-      partitionFiles.get(i % 1).add(files.get(i));
-    }
-    return partitionList;
-  }
-
-  private static List<Partition> getAllPartitions(String sourcePath, FileType fileType,
-      int partitionCount) throws Exception {
-    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
-    int[] numberOfFilesPerPartition = getNumberOfFilesPerPartition(files.size(), partitionCount);
-    int startIndex = 0;
-    int endIndex = 0;
-    List<Partition> partitionList =
-        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    if (numberOfFilesPerPartition != null) {
-      for (int i = 0; i < numberOfFilesPerPartition.length; i++) {
-        List<String> partitionFiles =
-            new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-        endIndex += numberOfFilesPerPartition[i];
-        for (int j = startIndex; j < endIndex; j++) {
-          partitionFiles.add(files.get(j));
-        }
-        startIndex += numberOfFilesPerPartition[i];
-        partitionList.add(new PartitionMultiFileImpl(i + "", partitionFiles));
-      }
-    }
-    return partitionList;
-  }
-
-  private static int[] getNumberOfFilesPerPartition(int numberOfFiles, int partitionCount) {
-    int div = numberOfFiles / partitionCount;
-    int mod = numberOfFiles % partitionCount;
-    int[] numberOfNodeToScan = null;
-    if (div > 0) {
-      numberOfNodeToScan = new int[partitionCount];
-      Arrays.fill(numberOfNodeToScan, div);
-    } else if (mod > 0) {
-      numberOfNodeToScan = new int[mod];
-    }
-    for (int i = 0; i < mod; i++) {
-      numberOfNodeToScan[i] = numberOfNodeToScan[i] + 1;
-    }
-    return numberOfNodeToScan;
-  }
-
-  public static List<String> getListOfSlices(LoadMetadataDetails[] details) {
-    List<String> slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    if (null != details) {
-      for (LoadMetadataDetails oneLoad : details) {
-        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(oneLoad.getLoadStatus())) {
-          String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
-          slices.add(loadName);
-        }
-      }
-    }
-    return slices;
-  }
-
-  /**
-   * This method will clear the dictionary cache for a given map of columns and dictionary cache
-   * mapping
-   *
-   * @param columnToDictionaryMap
-   */
-  public static void clearColumnDictionaryCache(Map<String, Dictionary> columnToDictionaryMap) {
-    for (Map.Entry<String, Dictionary> entry : columnToDictionaryMap.entrySet()) {
-      CarbonUtil.clearDictionaryCache(entry.getValue());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 84e5c07..5ed7389 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -21,7 +21,7 @@ import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
 import org.apache.spark.sql.types.Decimal;
 
-public class ColumnarVectorWrapper implements CarbonColumnVector {
+class ColumnarVectorWrapper implements CarbonColumnVector {
 
   private ColumnVector columnVector;
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index ba02bca..1beea97 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -55,7 +55,7 @@ import org.apache.spark.sql.types.StructType;
  * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
  * carbondata column APIs and fills the data directly into columns.
  */
-public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
+class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
 
   private int batchIdx = 0;
 
@@ -166,7 +166,7 @@ public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
    * before any calls to nextKeyValue/nextBatch.
    */
 
-  public void initBatch(MemoryMode memMode) {
+  private void initBatch(MemoryMode memMode) {
     List<QueryDimension> queryDimension = queryModel.getQueryDimension();
     List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
     StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
@@ -232,14 +232,14 @@ public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
   /*
    * Can be called before any rows are returned to enable returning columnar batches directly.
    */
-  public void enableReturningBatches() {
+  private void enableReturningBatches() {
     returnColumnarBatch = true;
   }
 
   /**
    * Advances to the next batch of rows. Returns false if there are no more.
    */
-  public boolean nextBatch() throws IOException {
+  private boolean nextBatch() {
     columnarBatch.reset();
     carbonColumnarBatch.reset();
     if (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
index ea97bca..31bbf19 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
@@ -23,7 +23,7 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
   * Carbon column validator
   */
 class CarbonColumnValidator extends ColumnValidator {
-  def validateColumns(allColumns: Seq[ColumnSchema]) {
+  def validateColumns(allColumns: Seq[ColumnSchema]): Unit = {
     allColumns.foreach { columnSchema =>
       val colWithSameId = allColumns.filter { x =>
         x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 6d9fb24..0a84891 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.spark
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.optimizer.{AttributeReferenceWrapper}
+import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types.StructType
 
@@ -151,13 +151,13 @@ object CarbonFilters {
         case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
           Some(sources.EqualTo(a.name, v))
 
-        case Not(EqualTo(a: Attribute, Literal(v, t))) => new
+        case Not(EqualTo(a: Attribute, Literal(v, t))) =>
             Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Literal(v, t), a: Attribute)) => new
+        case Not(EqualTo(Literal(v, t), a: Attribute)) =>
             Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
+        case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
             Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
+        case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
             Some(sources.Not(sources.EqualTo(a.name, v)))
         case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
         case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
@@ -221,7 +221,7 @@ object CarbonFilters {
           None
       }
     }
-    filters.flatMap(translate(_, false)).toArray
+    filters.flatMap(translate(_)).toArray
   }
 
   def processExpression(exprs: Seq[Expression],
@@ -231,8 +231,8 @@ object CarbonFilters {
     def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = {
       expr match {
         case or@ Or(left, right) =>
-          val leftFilter = transformExpression(left, true)
-          val rightFilter = transformExpression(right, true)
+          val leftFilter = transformExpression(left, or = true)
+          val rightFilter = transformExpression(right, or = true)
           if (leftFilter.isDefined && rightFilter.isDefined) {
             Some(new OrExpression(leftFilter.get, rightFilter.get))
           } else {
@@ -247,22 +247,22 @@ object CarbonFilters {
           (transformExpression(left) ++ transformExpression(right)).reduceOption(new
               AndExpression(_, _))
 
-        case EqualTo(a: Attribute, l@Literal(v, t)) => new
+        case EqualTo(a: Attribute, l@Literal(v, t)) =>
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(l@Literal(v, t), a: Attribute) => new
+        case EqualTo(l@Literal(v, t), a: Attribute) =>
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new
+        case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) =>
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new
+        case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) =>
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
 
-        case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new
+        case Not(EqualTo(a: Attribute, l@Literal(v, t))) =>
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new
+        case Not(EqualTo(l@Literal(v, t), a: Attribute)) =>
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new
+        case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) =>
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new
+        case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) =>
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
         case IsNotNull(child: Attribute) =>
             Some(new NotEqualsExpression(transformExpression(child).get,
@@ -357,7 +357,7 @@ object CarbonFilters {
           None
       }
     }
-    exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _))
+    exprs.flatMap(transformExpression(_)).reduceOption(new AndExpression(_, _))
   }
   private def isNullLiteral(exp: Expression): Boolean = {
     if (null != exp

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
index 7618558..6e3a1c8 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
@@ -47,14 +47,14 @@ object CarbonSparkFactory {
    /**
     * @return column validator
     */
-  def getCarbonColumnValidator(): ColumnValidator = {
+  def getCarbonColumnValidator: ColumnValidator = {
     new CarbonColumnValidator
   }
 
   /**
    * @return dictionary helper
    */
-  def getDictionaryDetailService(): DictionaryDetailService = {
+  def getDictionaryDetailService: DictionaryDetailService = {
     new DictionaryDetailHelper
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
deleted file mode 100644
index 254052b..0000000
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * It is just Key value class. I don't get any other alternate to make the RDD class to
- * work with my minimum knowledge in scala.
- * May be I will remove later once I gain good knowledge :)
- *
- */
-
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.load.LoadMetadataDetails
-
-trait Value[V] extends Serializable {
-  def getValue(value: Array[Object]): V
-}
-
-class ValueImpl extends Value[Array[Object]] {
-  override def getValue(value: Array[Object]): Array[Object] = value
-}
-
-trait RawValue[V] extends Serializable {
-  def getValue(value: Array[Any]): V
-}
-
-class RawValueImpl extends RawValue[Array[Any]] {
-  override def getValue(value: Array[Any]): Array[Any] = value
-}
-
-trait DataLoadResult[K, V] extends Serializable {
-  def getKey(key: String, value: LoadMetadataDetails): (K, V)
-}
-
-class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] {
-  override def getKey(key: String, value: LoadMetadataDetails): (String, LoadMetadataDetails) = {
-    (key, value)
-  }
-}
-
-
-trait PartitionResult[K, V] extends Serializable {
-  def getKey(key: Int, value: Boolean): (K, V)
-
-}
-
-class PartitionResultImpl extends PartitionResult[Int, Boolean] {
-  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
-
-trait MergeResult[K, V] extends Serializable {
-  def getKey(key: Int, value: Boolean): (K, V)
-
-}
-
-class MergeResultImpl extends MergeResult[Int, Boolean] {
-  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
-
-trait DeletedLoadResult[K, V] extends Serializable {
-  def getKey(key: String, value: String): (K, V)
-}
-
-class DeletedLoadResultImpl extends DeletedLoadResult[String, String] {
-  override def getKey(key: String, value: String): (String, String) = (key, value)
-}
-
-trait RestructureResult[K, V] extends Serializable {
-  def getKey(key: Int, value: Boolean): (K, V)
-}
-
-class RestructureResultImpl extends RestructureResult[Int, Boolean] {
-  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index de07707..f451a54 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -79,10 +79,10 @@ object CarbonDataRDDFactory {
     }
 
     LOGGER.audit(s"Compaction request received for table " +
-                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val tableCreationTime = CarbonEnv.get.carbonMetastore
-      .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+        .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -100,10 +100,10 @@ object CarbonDataRDDFactory {
     )
 
     val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-        CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-      )
-      .equalsIgnoreCase("true")
+        .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+          CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+        )
+        .equalsIgnoreCase("true")
 
     // if system level compaction is enabled then only one compaction can run in the system
     // if any other request comes at this time then it will create a compaction request file.
@@ -122,13 +122,13 @@ object CarbonDataRDDFactory {
     } else {
       // normal flow of compaction
       val lock = CarbonLockFactory
-        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-          LockUsage.COMPACTION_LOCK
-        )
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+            LockUsage.COMPACTION_LOCK
+          )
 
       if (lock.lockWithRetries()) {
         LOGGER.info("Acquired the compaction lock for table" +
-                    s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           startCompactionThreads(sqlContext,
             carbonLoadModel,
@@ -145,9 +145,9 @@ object CarbonDataRDDFactory {
         }
       } else {
         LOGGER.audit("Not able to acquire the compaction lock for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.error(s"Not able to acquire the compaction lock for table" +
-                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         sys.error("Table is already locked for compaction. Please try after some time.")
       }
     }
@@ -162,12 +162,12 @@ object CarbonDataRDDFactory {
       carbonTable: CarbonTable,
       compactionModel: CompactionModel): Unit = {
     val lock = CarbonLockFactory
-      .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
-        LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
-      )
+        .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
+          LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
+        )
     if (lock.lockWithRetries()) {
       LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
-                  s".${ carbonLoadModel.getTableName }")
+          s".${ carbonLoadModel.getTableName }")
       try {
         startCompactionThreads(sqlContext,
           carbonLoadModel,
@@ -188,20 +188,20 @@ object CarbonDataRDDFactory {
       }
     } else {
       LOGGER.audit("Not able to acquire the system level compaction lock for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       LOGGER.error("Not able to acquire the compaction lock for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       CarbonCompactionUtil
-        .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
+          .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
       // do sys error only in case of DDL trigger.
       if (compactionModel.isDDLTrigger) {
         sys.error("Compaction is in progress, compaction request for table " +
-                  s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-                  " is in queue.")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+            " is in queue.")
       } else {
         LOGGER.error("Compaction is in progress, compaction request for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-                     " is in queue.")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+            " is in queue.")
       }
     }
   }
@@ -224,7 +224,7 @@ object CarbonDataRDDFactory {
     } catch {
       case e: Exception =>
         LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
-                     s" ${ e.getMessage }")
+            s" ${ e.getMessage }")
     }
 
     val compactionThread = new Thread {
@@ -248,9 +248,9 @@ object CarbonDataRDDFactory {
           }
           // continue in case of exception also, check for all the tables.
           val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-            ).equalsIgnoreCase("true")
+              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+              ).equalsIgnoreCase("true")
 
           if (!isConcurrentCompactionAllowed) {
             LOGGER.info("System level compaction lock is enabled.")
@@ -260,8 +260,8 @@ object CarbonDataRDDFactory {
                   skipCompactionTables.toList.asJava)
             while (null != tableForCompaction) {
               LOGGER.info("Compaction request has been identified for table " +
-                          s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                          s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                  s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                  s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
               val table: CarbonTable = tableForCompaction.carbonTable
               val metadataPath = table.getMetaDataFilepath
               val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
@@ -269,12 +269,12 @@ object CarbonDataRDDFactory {
               val newCarbonLoadModel = new CarbonLoadModel()
               DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
               val tableCreationTime = CarbonEnv.get.carbonMetastore
-                .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
-                  newCarbonLoadModel.getTableName
-                )
+                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+                    newCarbonLoadModel.getTableName
+                  )
 
               val compactionSize = CarbonDataMergerUtil
-                .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
 
               val newcompactionModel = CompactionModel(compactionSize,
                 compactionType,
@@ -292,27 +292,27 @@ object CarbonDataRDDFactory {
               } catch {
                 case e: Exception =>
                   LOGGER.error("Exception in compaction thread for table " +
-                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
                 // not handling the exception. only logging as this is not the table triggered
                 // by user.
               } finally {
                 // delete the compaction required file in case of failure or success also.
                 if (!CarbonCompactionUtil
-                  .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+                    .deleteCompactionRequiredFile(metadataPath, compactionType)) {
                   // if the compaction request file is not been able to delete then
                   // add those tables details to the skip list so that it wont be considered next.
                   skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
                   LOGGER.error("Compaction request file can not be deleted for table " +
-                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
                 }
               }
               // ********* check again for all the tables.
               tableForCompaction = CarbonCompactionUtil
-                .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
-                  .tablesMeta.toArray, skipCompactionTables.asJava
-                )
+                  .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
+                      .tablesMeta.toArray, skipCompactionTables.asJava
+                  )
             }
             // giving the user his error for telling in the beeline if his triggered table
             // compaction is failed.
@@ -345,10 +345,10 @@ object CarbonDataRDDFactory {
     // for handling of the segment Merging.
     def handleSegmentMerging(tableCreationTime: Long): Unit = {
       LOGGER.info(s"compaction need status is" +
-                  s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
+          s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
       if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
         LOGGER.audit(s"Compaction request received for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         val compactionSize = 0
         val isCompactionTriggerByDDl = false
         val compactionModel = CompactionModel(compactionSize,
@@ -368,10 +368,10 @@ object CarbonDataRDDFactory {
         storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
         val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-            CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-          )
-          .equalsIgnoreCase("true")
+            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+            )
+            .equalsIgnoreCase("true")
 
         if (!isConcurrentCompactionAllowed) {
 
@@ -386,9 +386,9 @@ object CarbonDataRDDFactory {
           )
         } else {
           val lock = CarbonLockFactory
-            .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-              LockUsage.COMPACTION_LOCK
-            )
+              .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+                LockUsage.COMPACTION_LOCK
+              )
 
           if (lock.lockWithRetries()) {
             LOGGER.info("Acquired the compaction lock.")
@@ -409,15 +409,15 @@ object CarbonDataRDDFactory {
             }
           } else {
             LOGGER.audit("Not able to acquire the compaction lock for table " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
             LOGGER.error("Not able to acquire the compaction lock for table " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
           }
         }
       }
@@ -425,10 +425,10 @@ object CarbonDataRDDFactory {
 
     try {
       LOGGER.audit(s"Data load request has been received for table" +
-                   s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       if (!useKettle) {
         LOGGER.audit("Data is loading with New Data Flow for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
       // Check if any load need to be deleted before loading new data
       DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
@@ -466,16 +466,16 @@ object CarbonDataRDDFactory {
       } catch {
         case e: Exception =>
           LOGGER
-            .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
+              .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
       }
 
       // reading the start time of data load.
       val loadStartTime = CarbonLoaderUtil.readCurrentTime()
       carbonLoadModel.setFactTimeStamp(loadStartTime)
       val tableCreationTime = CarbonEnv.get.carbonMetastore
-        .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+          .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
       val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
-        .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+          .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
       // get partition way from configuration
       // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
@@ -526,7 +526,7 @@ object CarbonDataRDDFactory {
                 }
                 pathBuilder.append(split.getPartition.getUniqueID).append("/")
                 (split.getPartition.getUniqueID,
-                  SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
+                    SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
             }
           }
         } else {
@@ -568,15 +568,15 @@ object CarbonDataRDDFactory {
           // group blocks to nodes, tasks
           val startTime = System.currentTimeMillis
           val activeNodes = DistributionUtil
-            .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
+              .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
           val nodeBlockMapping =
             CarbonLoaderUtil
-              .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
-              .toSeq
+                .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
+                .toSeq
           val timeElapsed: Long = System.currentTimeMillis - startTime
           LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
           LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
-                      s"No.of Nodes: ${nodeBlockMapping.size}")
+              s"No.of Nodes: ${nodeBlockMapping.size}")
           var str = ""
           nodeBlockMapping.foreach(entry => {
             val tableBlock = entry._2
@@ -586,7 +586,7 @@ object CarbonDataRDDFactory {
                 hostentry.equalsIgnoreCase(entry._1)
               )) {
                 str = str + " , mismatch locations: " + tableBlockInfo.getLocations
-                  .foldLeft("")((a, b) => a + "," + b)
+                    .foldLeft("")((a, b) => a + "," + b)
               }
             )
             str = str + "\n"
@@ -723,7 +723,7 @@ object CarbonDataRDDFactory {
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
         LOGGER.info("********clean up done**********")
         LOGGER.audit(s"Data load is failed for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
         throw new Exception(errorMessage)
       } else {
@@ -734,10 +734,10 @@ object CarbonDataRDDFactory {
           if (!status) {
             val errorMessage = "Dataload failed due to failure in table status updation."
             LOGGER.audit("Data load is failed for " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
             LOGGER.error("Dataload failed due to failure in table status updation.")
             throw new Exception(errorMessage)
           }
@@ -746,7 +746,7 @@ object CarbonDataRDDFactory {
           LOGGER.info("********Database updated**********")
         }
         LOGGER.audit("Data load is successful for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           // compaction handling
           handleSegmentMerging(tableCreationTime)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index ce5962d..8deacc0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -159,10 +159,10 @@ case class CarbonDictionaryDecoder(
           // add a task completion listener to clear dictionary that is a decisive factor for
           // LRU eviction policy
           val dictionaryTaskCleaner = TaskContext.get
-          dictionaryTaskCleaner.addTaskCompletionListener(context =>
+          dictionaryTaskCleaner.addTaskCompletionListener(_ =>
             dicts.foreach { dictionary =>
               if (null != dictionary) {
-                dictionary.clear
+                dictionary.clear()
               }
             }
           )
@@ -312,10 +312,10 @@ class CarbonDecoderRDD(
     // add a task completion listener to clear dictionary that is a decisive factor for
     // LRU eviction policy
     val dictionaryTaskCleaner = TaskContext.get
-    dictionaryTaskCleaner.addTaskCompletionListener(context =>
+    dictionaryTaskCleaner.addTaskCompletionListener(_ =>
       dicts.foreach { dictionary =>
         if (null != dictionary) {
-          dictionary.clear
+          dictionary.clear()
         }
       }
     )
@@ -327,7 +327,6 @@ class CarbonDecoderRDD(
       override final def hasNext: Boolean = iter.hasNext
 
       override final def next(): InternalRow = {
-        val startTime = System.currentTimeMillis()
         val row: InternalRow = iter.next()
         val data = row.toSeq(dataTypes).toArray
         dictIndex.foreach { index =>
@@ -342,13 +341,6 @@ class CarbonDecoderRDD(
     }
   }
 
-  private def isRequiredToDecode = {
-    getDictionaryColumnIds.find(p => p._1 != null) match {
-      case Some(value) => true
-      case _ => false
-    }
-  }
-
   private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
                             cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
     val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 748d292..67ee478 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -50,10 +50,10 @@ object CarbonSession {
 
       // Get the session from current thread's active session.
       var session: SparkSession = SparkSession.getActiveSession match {
-        case Some(session) =>
-          if ((session ne null) && !session.sparkContext.isStopped) {
-            options.foreach { case (k, v) => session.conf.set(k, v) }
-            session
+        case Some(sparkSession) =>
+          if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
+            options.foreach { case (k, v) => sparkSession.conf.set(k, v) }
+            sparkSession
           } else {
             null
           }
@@ -67,10 +67,10 @@ object CarbonSession {
       SparkSession.synchronized {
         // If the current thread does not have an active session, get it from the global session.
         session = SparkSession.getDefaultSession match {
-          case Some(session) =>
-            if ((session ne null) && !session.sparkContext.isStopped) {
-              options.foreach { case (k, v) => session.conf.set(k, v) }
-              session
+          case Some(sparkSession) =>
+            if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
+              options.foreach { case (k, v) => sparkSession.conf.set(k, v) }
+              sparkSession
             } else {
               null
             }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index c78ddf3..8a946c0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -130,7 +130,7 @@ class CarbonSource extends CreatableRelationProvider
           }
           f
         }
-        val map = scala.collection.mutable.Map[String, String]();
+        val map = scala.collection.mutable.Map[String, String]()
         parameters.foreach { x => map.put(x._1, x._2) }
         val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
         CreateTable(cm, false).run(sparkSession)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index 1faaafa..362c951 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -62,9 +62,9 @@ object TableCreator {
     // All excluded cols should be there in create table cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
       dictExcludeCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+        tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
       dictExcludeCols
-        .map { dictExcludeCol =>
+        .foreach { dictExcludeCol =>
           if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
             val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
               " does not exist in table. Please check create table statement."
@@ -87,8 +87,8 @@ object TableCreator {
     // All included cols should be there in create table cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
       dictIncludeCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
-      dictIncludeCols.map { distIncludeCol =>
+        tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(",").map(_.trim)
+      dictIncludeCols.foreach { distIncludeCol =>
         if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
           val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
             " does not exist in table. Please check create table statement."
@@ -117,9 +117,9 @@ object TableCreator {
         }
         dimFields += field
       } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
-        dimFields += (field)
+        dimFields += field
       } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
-        dimFields += (field)
+        dimFields += field
       }
     }
     )
@@ -143,13 +143,13 @@ object TableCreator {
     // get all included cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
       dictIncludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
+        tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(',').map(_.trim)
     }
 
     // get all excluded cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
       dictExcludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+        tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
     }
 
     // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
@@ -264,7 +264,7 @@ object TableCreator {
     if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
 
       var splittedColGrps: Seq[String] = Seq[String]()
-      val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
+      val nonSplitCols: String = tableProperties(CarbonCommonConstants.COLUMN_GROUPS)
 
       // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
       // here first splitting the value by () . so that the above will be splitted into 2 strings.
@@ -313,9 +313,8 @@ object TableCreator {
 
     if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
       noInvertedIdxColsProps =
-        tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
-      noInvertedIdxColsProps
-        .map { noInvertedIdxColProp =>
+        tableProperties("NO_INVERTED_INDEX").split(',').map(_.trim)
+      noInvertedIdxColsProps.foreach { noInvertedIdxColProp =>
           if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
             val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
               " does not exist in table. Please check create table statement."
@@ -357,11 +356,11 @@ object TableCreator {
         field.storeType
       )
       case "array" => Field(field.column, Some("Array"), field.name,
-        field.children.map(f => f.map(normalizeType(_))),
+        field.children.map(f => f.map(normalizeType)),
         field.parent, field.storeType
       )
       case "struct" => Field(field.column, Some("Struct"), field.name,
-        field.children.map(f => f.map(normalizeType(_))),
+        field.children.map(f => f.map(normalizeType)),
         field.parent, field.storeType
       )
       case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
@@ -372,7 +371,7 @@ object TableCreator {
       // checking if the nested data type contains the child type as decimal(10,0),
       // if it is present then extracting the precision and scale. resetting the data type
       // with Decimal.
-      case _ if (dataType.startsWith("decimal")) =>
+      case _ if dataType.startsWith("decimal") =>
         val (precision, scale) = getScaleAndPrecision(dataType)
         Field(field.column,
           Some("Decimal"),

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 51b79c5..fe8bbe7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -92,7 +92,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       rdd: RDD[InternalRow],
       needDecode: ArrayBuffer[AttributeReference]):
   RDD[InternalRow] = {
-    if (needDecode.size > 0) {
+    if (needDecode.nonEmpty) {
       rdd.asInstanceOf[CarbonScanRDD].setVectorReaderSupport(false)
       getDecoderRDD(relation, needDecode, rdd, output)
     } else {
@@ -249,7 +249,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       needDecoder: ArrayBuffer[AttributeReference],
       updateRequestedColumns: Seq[AttributeReference]): DataSourceScanExec = {
     if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) &&
-        needDecoder.length == 0) {
+        needDecoder.isEmpty) {
       BatchedDataSourceScanExec(
         output,
         scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
@@ -362,13 +362,13 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
         Some(sources.EqualTo(a.name, v))
 
-      case Not(EqualTo(a: Attribute, Literal(v, t))) => new
+      case Not(EqualTo(a: Attribute, Literal(v, t))) =>
           Some(sources.Not(sources.EqualTo(a.name, v)))
-      case Not(EqualTo(Literal(v, t), a: Attribute)) => new
+      case Not(EqualTo(Literal(v, t), a: Attribute)) =>
           Some(sources.Not(sources.EqualTo(a.name, v)))
-      case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
+      case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
           Some(sources.Not(sources.EqualTo(a.name, v)))
-      case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
+      case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
           Some(sources.Not(sources.EqualTo(a.name, v)))
       case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
       case IsNull(a: Attribute) => Some(sources.IsNull(a.name))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 8f97961..86bd92b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -168,7 +168,7 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
             s"""CREATE TABLE $dbName.$tbName
                 |(${(cm.dimCols ++ cm.msrCols).map(f => f.rawSchema).mkString(",")})
                 |USING org.apache.spark.sql.CarbonSource""".stripMargin +
-            s""" OPTIONS (tableName "$tbName", dbName "${dbName}", tablePath "$tablePath") """)
+            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath "$tablePath") """)
         } catch {
           case e: Exception =>
             val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
@@ -214,14 +214,6 @@ case class DeleteLoadsById(
 
   }
 
-  // validates load ids
-  private def validateLoadIds: Unit = {
-    if (loadids.isEmpty) {
-      val errorMessage = "Error: Segment id(s) should not be empty."
-      throw new MalformedCarbonCommandException(errorMessage)
-
-    }
-  }
 }
 
 case class DeleteLoadsByLoadDate(
@@ -293,8 +285,8 @@ case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: Lo
       relation.carbonRelation.tableName,
       null,
       Seq(),
-      scala.collection.immutable.Map(("fileheader" -> header)),
-      false,
+      scala.collection.immutable.Map("fileheader" -> header),
+      isOverwriteExist = false,
       null,
       Some(df)).run(sparkSession)
     // updating relation metadata. This is in case of auto detect high cardinality
@@ -326,7 +318,6 @@ case class LoadTable(
     }
 
     val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-    val identifier = TableIdentifier(tableName, Option(dbName))
     if (isOverwriteExist) {
       sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
     }
@@ -378,9 +369,9 @@ case class LoadTable(
       // Need to fill dimension relation
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
 
-      var partitionLocation = relation.tableMeta.storePath + "/partition/" +
-                              relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
-                              relation.tableMeta.carbonTableIdentifier.getTableName + "/"
+      val partitionLocation = relation.tableMeta.storePath + "/partition/" +
+          relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
+          relation.tableMeta.carbonTableIdentifier.getTableName + "/"
 
 
       val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
@@ -412,15 +403,6 @@ case class LoadTable(
       val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
       val dateFormat = options.getOrElse("dateformat", null)
       validateDateFormat(dateFormat, table)
-      val multiLine = options.getOrElse("multiline", "false").trim.toLowerCase match {
-        case "true" => true
-        case "false" => false
-        case illegal =>
-          val errorMessage = "Illegal syntax found: [" + illegal + "] .The value multiline in " +
-                             "load DDL which you set can only be 'true' or 'false', please check " +
-                             "your input DDL."
-          throw new MalformedCarbonCommandException(errorMessage)
-      }
       val maxColumns = options.getOrElse("maxcolumns", null)
       carbonLoadModel.setMaxColumns(maxColumns)
       carbonLoadModel.setEscapeChar(escapeChar)
@@ -451,7 +433,7 @@ case class LoadTable(
       // set local dictionary path, and dictionary file extension
       carbonLoadModel.setAllDictPath(allDictionaryPath)
 
-      var partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+      val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
       try {
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
@@ -521,7 +503,7 @@ case class LoadTable(
         throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
                                                   "string.")
       } else {
-        var dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
+        val dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
         for (singleDateFormat <- dateFormats) {
           val dateFormatSplits: Array[String] = singleDateFormat.split(":", 2)
           val columnName = dateFormatSplits(0).trim.toLowerCase
@@ -667,7 +649,6 @@ private[sql] case class DescribeCommandFormatted(
           relation.tableMeta.carbonTableIdentifier.getTableName,
           field.name)
         if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
-          val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
           colProps.append(field.name).append(".")
             .append(mapper.writeValueAsString(dimension.getColumnProperties))
             .append(",")
@@ -679,7 +660,7 @@ private[sql] case class DescribeCommandFormatted(
           "KEY COLUMN"
         }
       } else {
-        ("MEASURE")
+        "MEASURE"
       }
       (field.name, field.dataType.simpleString, comment)
     }



[3/3] incubator-carbondata git commit: [CARBONDATA-566] clean up code for carbon-spark2 module This closes #471

Posted by ch...@apache.org.
[CARBONDATA-566] clean up code for carbon-spark2 module This closes #471


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a011aafb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a011aafb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a011aafb

Branch: refs/heads/master
Commit: a011aafb02ba55f0fced009755275627b2671a79
Parents: e7b46cc 6fee993
Author: chenliang613 <ch...@apache.org>
Authored: Tue Dec 27 23:41:10 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Tue Dec 27 23:41:10 2016 +0800

----------------------------------------------------------------------
 .../carbondata/spark/util/CarbonQueryUtil.java  |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 150 +++++------
 .../spark/load/DeleteLoadFromMetadata.java      |  44 ----
 .../carbondata/spark/util/CarbonQueryUtil.java  | 248 -------------------
 .../vectorreader/ColumnarVectorWrapper.java     |   2 +-
 .../VectorizedCarbonRecordReader.java           |   8 +-
 .../spark/CarbonColumnValidator.scala           |   2 +-
 .../apache/carbondata/spark/CarbonFilters.scala |  34 +--
 .../carbondata/spark/CarbonSparkFactory.scala   |   4 +-
 .../org/apache/carbondata/spark/KeyVal.scala    |  89 -------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 150 +++++------
 .../spark/sql/CarbonDictionaryDecoder.scala     |  16 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  16 +-
 .../org/apache/spark/sql/CarbonSource.scala     |   2 +-
 .../org/apache/spark/sql/TableCreator.scala     |  29 ++-
 .../execution/CarbonLateDecodeStrategy.scala    |  12 +-
 .../execution/command/carbonTableSchema.scala   |  37 +--
 .../spark/sql/hive/CarbonHiveMetadataUtil.scala |   3 -
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  24 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   1 -
 .../org/apache/spark/util/CleanFiles.scala      |   2 +-
 .../org/apache/spark/util/Compaction.scala      |   2 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   2 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   2 +-
 .../org/apache/spark/util/ShowSegments.scala    |   2 +-
 .../org/apache/spark/util/TableLoader.scala     |   4 +-
 .../AllDataTypesTestCaseAggregate.scala         |  19 +-
 .../spark/sql/common/util/QueryTest.scala       |   4 +-
 28 files changed, 242 insertions(+), 668 deletions(-)
----------------------------------------------------------------------