You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/09/18 10:01:43 UTC

[24/51] [abbrv] carbondata git commit: [CARBONDATA-1423] added integration test cases for presto

[CARBONDATA-1423] added integration test cases for presto

This closes #1303


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b6727d75
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b6727d75
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b6727d75

Branch: refs/heads/branch-1.2
Commit: b6727d75d2a79498c6861959bba24d96fd075108
Parents: 9f0ac24
Author: anubhav100 <an...@knoldus.in>
Authored: Tue Aug 29 14:19:31 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Sep 13 12:11:07 2017 +0800

----------------------------------------------------------------------
 integration/presto/pom.xml                      | 319 +++++++----
 .../presto/src/test/resources/alldatatype.csv   |  11 +
 .../presto/src/test/resources/log4j.properties  |  11 +
 .../integrationtest/PrestoAllDataTypeTest.scala | 403 +++++++++++++
 .../carbondata/presto/server/PrestoServer.scala | 170 ++++++
 .../presto/util/CarbonDataStoreCreator.scala    | 559 +++++++++++++++++++
 6 files changed, 1373 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 617ce93..924a2be 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -15,9 +15,7 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
   <modelVersion>4.0.0</modelVersion>
 
@@ -38,62 +36,35 @@
   </properties>
 
   <dependencies>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.8.1</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>2.8.1</version>
+      <scope>provided</scope>
+    </dependency>
+
+
     <dependency>
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-hadoop</artifactId>
       <version>${project.version}</version>
       <exclusions>
         <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-network-shuffle_2.11</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-sketch_2.11</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>net.java.dev.jets3t</groupId>
-          <artifactId>jets3t</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>javax.servlet</groupId>
-          <artifactId>javax.servlet-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.commons</groupId>
-          <artifactId>commons-math3</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.httpcomponents</groupId>
-          <artifactId>httpclient</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.antlr</groupId>
-          <artifactId>antlr4-runtime</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.esotericsoftware</groupId>
-          <artifactId>minlog</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.codehaus.janino</groupId>
-          <artifactId>janino</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>net.jpountz.lz4</groupId>
-          <artifactId>lz4</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>net.sf.py4j</groupId>
-          <artifactId>py4j</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.spark-project.spark</groupId>
-          <artifactId>unused</artifactId>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
         </exclusion>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
@@ -172,6 +143,7 @@
           <groupId>io.dropwizard.metrics</groupId>
           <artifactId>metrics-graphite</artifactId>
         </exclusion>
+
         <exclusion>
           <groupId>com.google.code.findbugs</groupId>
           <artifactId>jsr305</artifactId>
@@ -254,7 +226,30 @@
         </exclusion>
       </exclusions>
     </dependency>
-
+    <dependency>
+      <groupId>com.facebook.presto</groupId>
+      <artifactId>presto-tests</artifactId>
+      <scope>test</scope>
+      <version>${presto.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.openjdk.jol</groupId>
+          <artifactId>jol-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.antlr</groupId>
+          <artifactId>antlr4-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-sql_2.10</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>io.airlift</groupId>
       <artifactId>bootstrap</artifactId>
@@ -305,26 +300,82 @@
       <artifactId>json</artifactId>
       <version>0.144</version>
       <!--<scope>provided</scope>-->
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>log4j-over-slf4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+
+      </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <version>2.2.1</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>io.airlift</groupId>
       <artifactId>units</artifactId>
       <version>1.0</version>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>log4j-over-slf4j</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-      <version>2.6.0</version>
-      <scope>provided</scope>
-    </dependency>
     <!--presto integrated-->
     <dependency>
       <groupId>com.facebook.presto</groupId>
       <artifactId>presto-spi</artifactId>
       <version>${presto.version}</version>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.antlr</groupId>
+          <artifactId>antlr4-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-sql_2.10</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>commons-lang</groupId>
@@ -335,24 +386,73 @@
       <groupId>com.facebook.presto.hadoop</groupId>
       <artifactId>hadoop-apache2</artifactId>
       <version>2.7.3-1</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.antlr</groupId>
+          <artifactId>antlr4-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.facebook.presto</groupId>
+      <artifactId>presto-jdbc</artifactId>
+      <version>${presto.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.antlr</groupId>
+          <artifactId>antlr4-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-compress</artifactId>
       <version>1.4.1</version>
+    </dependency>
+    <dependency>
+      <groupId>io.airlift</groupId>
+      <artifactId>slice</artifactId>
+      <version>0.27</version>
+      <scope>provided</scope>
       <exclusions>
         <exclusion>
-          <groupId>org.tukaani</groupId>
-          <artifactId>xz</artifactId>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>log4j-over-slf4j</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
-
-    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst_2.10 -->
-    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
   </dependencies>
 
   <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <resources>
+      <resource>
+        <directory>src/resources</directory>
+      </resource>
+      <resource>
+        <directory>.</directory>
+        <includes>
+          <include>CARBON_SPARK_INTERFACELogResource.properties</include>
+        </includes>
+      </resource>
+    </resources>
     <plugins>
       <plugin>
         <artifactId>maven-compiler-plugin</artifactId>
@@ -367,11 +467,13 @@
         <version>2.18</version>
         <!-- Note config is repeated in scalatest config -->
         <configuration>
+          <skip>false</skip>
           <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
           <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
           <systemProperties>
             <java.awt.headless>true</java.awt.headless>
           </systemProperties>
+          <testFailureIgnore>false</testFailureIgnore>
           <failIfNoTests>false</failIfNoTests>
         </configuration>
       </plugin>
@@ -384,30 +486,47 @@
           <skip>true</skip>
         </configuration>
       </plugin>
-
       <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-enforcer-plugin</artifactId>
-        <version>1.4.1</version>
-        <configuration>
-          <skip>true</skip>
-        </configuration>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
       </plugin>
-
       <plugin>
-        <groupId>com.ning.maven.plugins</groupId>
-        <artifactId>maven-dependency-versions-check-plugin</artifactId>
+        <artifactId>maven-compiler-plugin</artifactId>
         <configuration>
-          <skip>true</skip>
-          <failBuildInCaseOfConflict>false</failBuildInCaseOfConflict>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.4.1</version>
         <configuration>
-          <skip>false</skip>
+          <skip>true</skip>
         </configuration>
       </plugin>
 
@@ -418,14 +537,12 @@
           <skip>true</skip>
         </configuration>
       </plugin>
-
       <plugin>
         <groupId>io.takari.maven.plugins</groupId>
         <artifactId>presto-maven-plugin</artifactId>
         <version>0.1.12</version>
         <extensions>true</extensions>
       </plugin>
-
       <plugin>
         <groupId>pl.project13.maven</groupId>
         <artifactId>git-commit-id-plugin</artifactId>
@@ -434,28 +551,30 @@
         </configuration>
       </plugin>
       <plugin>
-        <groupId>org.scala-tools</groupId>
-        <artifactId>maven-scala-plugin</artifactId>
-        <version>2.15.2</version>
+
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <version>1.0</version>
+        <!-- Note config is repeated in surefire config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <testFailureIgnore>false</testFailureIgnore>
+          <filereports>CarbonTestSuite.txt</filereports>
+          <argLine>-ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+          </argLine>
+          <stderr />
+          <environmentVariables>
+          </environmentVariables>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+          </systemProperties>
+        </configuration>
         <executions>
           <execution>
-            <id>compile</id>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-            <phase>compile</phase>
-          </execution>
-          <execution>
-            <id>testCompile</id>
-            <goals>
-              <goal>testCompile</goal>
-            </goals>
-            <phase>test</phase>
-          </execution>
-          <execution>
-            <phase>process-resources</phase>
+            <id>test</id>
             <goals>
-              <goal>compile</goal>
+              <goal>test</goal>
             </goals>
           </execution>
         </executions>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/src/test/resources/alldatatype.csv
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/resources/alldatatype.csv b/integration/presto/src/test/resources/alldatatype.csv
new file mode 100644
index 0000000..6b0259a
--- /dev/null
+++ b/integration/presto/src/test/resources/alldatatype.csv
@@ -0,0 +1,11 @@
+ID,date,country,name,phonetype,serialname,salary,bonus,dob,shortfield
+1,2015-07-23,china,anubhav,phone197,ASD69643,5000000.00,1234.444,2016-04-14 15/00/09,10
+2,2015-07-24,china,jatin,phone756,ASD42892,150010.999,1234.5555,2016-04-14 15:00:09,10
+3,2015-07-25,china,liang,phone1904,ASD37014,15002.110,600.777,2016-01-14 15:07:09,8
+4,2015-07-26,china,prince,phone2435,ASD66902,15003.00,9999.999,1992-04-14 13:00:09,4
+5,2015-07-27,china,bhavya,phone2441,ASD90633,15004.00,5000.999,2010-06-19 14:10:06,11
+6,2015-07-28,china,akash,phone294,ASD59961,15005.00,500.59,2013-07-19 12:10:08,18
+7,2015-07-29,china,sahil,phone610,ASD14875,15006.00,500.99,,2007-04-19 11:10:06,17
+8,2015-07-30,china,geetika,phone1848,ASD57308,15007.500,500.88,2008-09-21 11:10:06,10
+9,2015-07-18,china,ravindra,phone706,ASD86717,15008.00,700.999,2009-06-19 15:10:06,1
+9,2015/07/18,china,jitesh,phone706,ASD86717,15008.00,500.414,2001-08-29 13:09:03,12

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/resources/log4j.properties b/integration/presto/src/test/resources/log4j.properties
new file mode 100644
index 0000000..e369916
--- /dev/null
+++ b/integration/presto/src/test/resources/log4j.properties
@@ -0,0 +1,11 @@
+# Root logger option
+log4j.rootLogger=INFO,stdout
+
+
+# Redirect log messages to console
+log4j.appender.debug=org.apache.log4j.RollingFileAppender
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
new file mode 100644
index 0000000..1743be6
--- /dev/null
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto.integrationtest
+
+import java.io.File
+
+import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
+import util.CarbonDataStoreCreator
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.presto.server.PrestoServer
+
+
+class PrestoAllDataTypeTest extends FunSuiteLike with BeforeAndAfterAll {
+
+  private val logger = LogServiceFactory
+    .getLogService(classOf[PrestoAllDataTypeTest].getCanonicalName)
+
+  private val rootPath = new File(this.getClass.getResource("/").getPath
+                                  + "../../../..").getCanonicalPath
+  private val storePath = s"$rootPath/integration/presto/target/store"
+
+  override def beforeAll: Unit = {
+    CarbonDataStoreCreator
+      .createCarbonStore(storePath, s"$rootPath/integration/presto/src/test/resources/alldatatype.csv")
+    logger.info(s"\nCarbon store is created at location: $storePath")
+    PrestoServer.startServer(storePath)
+  }
+
+  override def afterAll(): Unit = {
+    PrestoServer.stopServer()
+  }
+
+  test("test the result for count(*) in presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT COUNT(*) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 10))
+    assert(actualResult.equals(expectedResult))
+  }
+  test("test the result for count() clause with distinct operator in presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT COUNT(DISTINCT ID) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 9))
+    assert(actualResult.equals(expectedResult))
+
+  }
+  test("test the result for sum()in presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT SUM(ID) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 54))
+    assert(actualResult.equals(expectedResult))
+  }
+  test("test the result for sum() wiTh distinct operator in presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT SUM(DISTINCT ID) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 45))
+    assert(actualResult.equals(expectedResult))
+  }
+  test("test the result for avg() with distinct operator in presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT AVG(DISTINCT ID) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 5))
+    assert(actualResult.equals(expectedResult))
+  }
+  test("test the result for min() with distinct operator in presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT MIN(DISTINCT ID) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 1))
+    assert(actualResult.equals(expectedResult))
+  }
+  test("test the result for max() with distinct operator in presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT MAX(DISTINCT ID) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 9))
+    assert(actualResult.equals(expectedResult))
+  }
+  test("test the result for count()clause with distinct operator on decimal column in presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT COUNT(DISTINCT BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 7))
+    actualResult.equals(expectedResult)
+  }
+  test("test the result for count()clause with out  distinct operator on decimal column in presto")
+  {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT COUNT(BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 10))
+    actualResult.equals(expectedResult)
+  }
+  test("test the result for sum()with out distinct operator for decimal column in presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT SUM(DISTINCT BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 54))
+    actualResult.equals(expectedResult)
+  }
+  test("test the result for sum() with distinct operator for decimal column in presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT SUM(DISTINCT BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 20774.6475))
+    assert(
+      actualResult.head("RESULT").toString.toDouble ==
+      expectedResult.head("RESULT").toString.toDouble)
+  }
+  test("test the result for avg() with distinct operator on decimal coin presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT AVG(DISTINCT BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 8900))
+    actualResult.equals(expectedResult)
+  }
+
+  test("test the result for min() with distinct operator in decimalType of presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT MIN(BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map(
+      "RESULT" -> java.math.BigDecimal.valueOf(500.414).setScale(4)))
+    actualResult.equals(expectedResult)
+  }
+
+  test("test the result for max() with distinct operator in decimalType of presto") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT MAX(BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+    val expectedResult: List[Map[String, Any]] = List(Map(
+      "RESULT" -> java.math.BigDecimal.valueOf(9999.999).setScale(4)))
+    actualResult.equals(expectedResult)
+  }
+  test("select decimal data type with ORDER BY  clause") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT DISTINCT BONUS FROM TESTDB.TESTTABLE ORDER BY BONUS limit 3 ")
+    val expectedResult: List[Map[String, Any]] = List(Map(
+      "BONUS" -> java.math.BigDecimal.valueOf(500.414).setScale(4)),
+      Map("BONUS" -> java.math.BigDecimal.valueOf(500.59).setScale(4)),
+      Map("BONUS" -> java.math.BigDecimal.valueOf(500.88).setScale(4)))
+    assert(actualResult.equals(expectedResult))
+  }
+  test("select string type with order by clause") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT NAME FROM TESTDB.TESTTABLE ORDER BY NAME")
+    val expectedResult: List[Map[String, Any]] = List(Map("NAME" -> "akash"),
+      Map("NAME" -> "anubhav"),
+      Map("NAME" -> "bhavya"),
+      Map("NAME" -> "geetika"),
+      Map("NAME" -> "jatin"),
+      Map("NAME" -> "jitesh"),
+      Map("NAME" -> "liang"),
+      Map("NAME" -> "prince"),
+      Map("NAME" -> "ravindra"),
+      Map("NAME" -> "sahil"))
+    assert(actualResult.equals(expectedResult))
+  }
+  test("select DATE type with order by clause") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT DATE FROM TESTDB.TESTTABLE ORDER BY DATE")
+    val expectedResult: List[Map[String, Any]] = List(Map("DATE" -> "2015-07-18"),
+      Map("DATE" -> "2015-07-23"),
+      Map("DATE" -> "2015-07-24"),
+      Map("DATE" -> "2015-07-25"),
+      Map("DATE" -> "2015-07-26"),
+      Map("DATE" -> "2015-07-27"),
+      Map("DATE" -> "2015-07-28"),
+      Map("DATE" -> "2015-07-29"),
+      Map("DATE" -> "2015-07-30"),
+      Map("DATE" -> null))
+
+    assert(actualResult.filterNot(_.get("DATE") == null).zipWithIndex.forall {
+      case (map, index) => map.get("DATE").toString
+        .equals(expectedResult(index).get("DATE").toString)
+    })
+    assert(actualResult.reverse.head("DATE") == null)
+  }
+  test("select int type with order by clause") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT DISTINCT ID FROM TESTDB.TESTTABLE ORDER BY ID")
+    val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 1),
+      Map("ID" -> 2),
+      Map("ID" -> 3),
+      Map("ID" -> 4),
+      Map("ID" -> 5),
+      Map("ID" -> 6),
+      Map("ID" -> 7),
+      Map("ID" -> 8),
+      Map("ID" -> 9))
+
+    assert(actualResult.equals(expectedResult))
+
+  }
+
+  test("test and filter clause with greater than expression") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery(
+        "SELECT ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY,BONUS FROM TESTDB.TESTTABLE " +
+        "WHERE BONUS>1234 AND ID>2 GROUP BY ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY," +
+        "BONUS ORDER BY ID")
+    val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 4,
+      "NAME" -> "prince",
+      "BONUS" -> java.math.BigDecimal.valueOf(9999.9990).setScale(4),
+      "DATE" -> "2015-07-26",
+      "SALARY" -> 15003.0,
+      "SERIALNAME" -> "ASD66902",
+      "COUNTRY" -> "china",
+      "PHONETYPE" -> "phone2435"),
+      Map("ID" -> 5,
+        "NAME" -> "bhavya",
+        "BONUS" -> java.math.BigDecimal.valueOf(5000.999).setScale(4),
+        "DATE" -> "2015-07-27",
+        "SALARY" -> 15004.0,
+        "SERIALNAME" -> "ASD90633",
+        "COUNTRY" -> "china",
+        "PHONETYPE" -> "phone2441"))
+    assert(actualResult.toString() equals expectedResult.toString())
+
+
+  }
+
+  test("test and filter clause with greater than equal to expression") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery(
+        "SELECT ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY,BONUS FROM TESTDB.TESTTABLE " +
+        "WHERE BONUS>=1234.444 GROUP BY ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY," +
+        "BONUS ORDER BY ID")
+    val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 1,
+      "NAME" -> "anubhav",
+      "BONUS" -> java.math.BigDecimal.valueOf(1234.4440).setScale(4),
+      "DATE" -> "2015-07-23",
+      "SALARY" -> "5000000.0",
+      "SERIALNAME" -> "ASD69643",
+      "COUNTRY" -> "china",
+      "PHONETYPE" -> "phone197"),
+      Map("ID" -> 2,
+        "NAME" -> "jatin",
+        "BONUS" -> java.math.BigDecimal.valueOf(1234.5555).setScale(4)
+        ,
+        "DATE" -> "2015-07-24",
+        "SALARY" -> java.math.BigDecimal.valueOf(150010.9990).setScale(3),
+        "SERIALNAME" -> "ASD42892",
+        "COUNTRY" -> "china",
+        "PHONETYPE" -> "phone756"),
+      Map("ID" -> 4,
+        "NAME" -> "prince",
+        "BONUS" -> java.math.BigDecimal.valueOf(9999.9990).setScale(4),
+        "DATE" -> "2015-07-26",
+        "SALARY" -> java.math.BigDecimal.valueOf(15003.0).setScale(1),
+        "SERIALNAME" -> "ASD66902",
+        "COUNTRY" -> "china",
+        "PHONETYPE" -> "phone2435"),
+      Map("ID" -> 5,
+        "NAME" -> "bhavya",
+        "BONUS" -> java.math.BigDecimal.valueOf(5000.9990).setScale(4),
+        "DATE" -> "2015-07-27",
+        "SALARY" -> java.math.BigDecimal.valueOf(15004.0).setScale(1),
+        "SERIALNAME" -> "ASD90633",
+        "COUNTRY" -> "china",
+        "PHONETYPE" -> "phone2441"))
+    assert(actualResult.toString() equals expectedResult.toString())
+  }
+  test("test and filter clause with less than equal to expression") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery(
+        "SELECT ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY,BONUS FROM TESTDB.TESTTABLE " +
+        "WHERE BONUS<=1234.444 GROUP BY ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY," +
+        "BONUS ORDER BY ID LIMIT 2")
+
+    val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 1,
+      "NAME" -> "anubhav",
+      "BONUS" -> java.math.BigDecimal.valueOf(1234.4440).setScale(4),
+      "DATE" -> "2015-07-23",
+      "SALARY" -> "5000000.0",
+      "SERIALNAME" -> "ASD69643",
+      "COUNTRY" -> "china",
+      "PHONETYPE" -> "phone197"),
+      Map("ID" -> 3,
+        "NAME" -> "liang",
+        "BONUS" -> java.math.BigDecimal.valueOf(600.7770).setScale(4),
+        "DATE" -> "2015-07-25",
+        "SALARY" -> java.math.BigDecimal.valueOf(15002.11).setScale(2),
+        "SERIALNAME" -> "ASD37014",
+        "COUNTRY" -> "china",
+        "PHONETYPE" -> "phone1904"))
+    assert(actualResult.toString() equals expectedResult.toString())
+  }
+  test("test equal to expression on decimal value") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery(
+        "SELECT ID FROM TESTDB.TESTTABLE WHERE BONUS=1234.444")
+
+    val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 1))
+
+    assert(actualResult equals expectedResult)
+  }
+  test("test less than expression with and operator") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery(
+        "SELECT ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY,BONUS FROM TESTDB.TESTTABLE " +
+        "WHERE BONUS>1234 AND ID<2 GROUP BY ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY," +
+        "BONUS ORDER BY ID")
+    val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 1,
+      "NAME" -> "anubhav",
+      "BONUS" -> java.math.BigDecimal.valueOf(1234.4440).setScale(4),
+      "DATE" -> "2015-07-23",
+      "SALARY" -> 5000000.0,
+      "SERIALNAME" -> "ASD69643",
+      "COUNTRY" -> "china",
+      "PHONETYPE" -> "phone197"))
+    assert(actualResult.toString().equals(expectedResult.toString()))
+  }
+  test("test the result for in clause") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT NAME from testdb.testtable WHERE PHONETYPE IN('phone1848','phone706')")
+    val expectedResult: List[Map[String, Any]] = List(
+      Map("NAME" -> "geetika"),
+      Map("NAME" -> "ravindra"),
+      Map("NAME" -> "jitesh"))
+
+    assert(actualResult.equals(expectedResult))
+  }
+  test("test the result for not in clause") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery(
+        "SELECT NAME from testdb.testtable WHERE PHONETYPE NOT IN('phone1848','phone706')")
+    val expectedResult: List[Map[String, Any]] = List(Map("NAME" -> "anubhav"),
+      Map("NAME" -> "jatin"),
+      Map("NAME" -> "liang"),
+      Map("NAME" -> "prince"),
+      Map("NAME" -> "bhavya"),
+      Map("NAME" -> "akash"),
+      Map("NAME" -> "sahil"))
+
+    assert(actualResult.equals(expectedResult))
+  }
+  test("test for null operator on date data type") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT ID FROM TESTDB.TESTTABLE WHERE DATE IS NULL")
+    val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 9))
+    assert(actualResult.equals(expectedResult))
+
+  }
+  test("test for not null operator on date data type") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT NAME FROM TESTDB.TESTTABLE WHERE DATE IS NOT NULL AND ID=9")
+    val expectedResult: List[Map[String, Any]] = List(Map("NAME" -> "ravindra"))
+    assert(actualResult.equals(expectedResult))
+
+  }
+  test("test for not null operator on timestamp type") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT NAME FROM TESTDB.TESTTABLE WHERE DOB IS NOT NULL AND ID=9")
+    val expectedResult: List[Map[String, Any]] = List(Map("NAME" -> "ravindra"),
+      Map("NAME" -> "jitesh"))
+    assert(actualResult.equals(expectedResult))
+
+  }
+  test("test for null operator on timestamp type") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery("SELECT NAME FROM TESTDB.TESTTABLE WHERE DOB IS NULL AND ID=1")
+    val expectedResult: List[Map[String, Any]] = List(Map("NAME" -> "anubhav"))
+    assert(actualResult.equals(expectedResult))
+
+  }
+  test("test the result for short datatype with order by clause") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery(
+        "SELECT DISTINCT SHORTFIELD from testdb.testtable ORDER BY SHORTFIELD ")
+    val expectedResult: List[Map[String, Any]] = List(Map("SHORTFIELD" -> 1),
+      Map("SHORTFIELD" -> 4),
+      Map("SHORTFIELD" -> 8),
+      Map("SHORTFIELD" -> 10),
+      Map("SHORTFIELD" -> 11),
+      Map("SHORTFIELD" -> 12),
+      Map("SHORTFIELD" -> 18),
+      Map("SHORTFIELD" -> null))
+
+    assert(actualResult.equals(expectedResult))
+  }
+  test("test the result for short datatype in clause where field is null") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery(
+        "SELECT ID from testdb.testtable WHERE SHORTFIELD IS NULL ORDER BY SHORTFIELD ")
+    val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 7))
+
+    assert(actualResult.equals(expectedResult))
+  }
+  test("test the result for short datatype with greater than operator") {
+    val actualResult: List[Map[String, Any]] = PrestoServer
+      .executeQuery(
+        "SELECT ID from testdb.testtable WHERE SHORTFIELD>11 ")
+    val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 6), Map("ID" -> 9))
+
+    assert(actualResult.equals(expectedResult))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
new file mode 100644
index 0000000..3497f47
--- /dev/null
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto.server
+
+import java.sql.{Connection, DriverManager, ResultSet}
+import java.util
+import java.util.{Locale, Optional}
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+import com.facebook.presto.Session
+import com.facebook.presto.execution.QueryIdGenerator
+import com.facebook.presto.metadata.SessionPropertyManager
+import com.facebook.presto.spi.`type`.TimeZoneKey.UTC_KEY
+import com.facebook.presto.spi.security.Identity
+import com.facebook.presto.tests.DistributedQueryRunner
+import com.google.common.collect.ImmutableMap
+import org.slf4j.{Logger, LoggerFactory}
+
+import org.apache.carbondata.presto.CarbondataPlugin
+
+object PrestoServer {
+
+  val CARBONDATA_CATALOG = "carbondata"
+  val CARBONDATA_CONNECTOR = "carbondata"
+  val CARBONDATA_SOURCE = "carbondata"
+  val logger: Logger = LoggerFactory.getLogger(this.getClass)
+
+
+  val prestoProperties: util.Map[String, String] = Map(("http-server.http.port", "8086")).asJava
+  createSession
+  val queryRunner = new DistributedQueryRunner(createSession, 4, prestoProperties)
+
+
+  /**
+   * start the presto server
+   *
+   * @param carbonStorePath
+   */
+  def startServer(carbonStorePath: String) = {
+
+    logger.info("======== STARTING PRESTO SERVER ========")
+    val queryRunner: DistributedQueryRunner = createQueryRunner(
+      prestoProperties, carbonStorePath)
+
+    logger.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl)
+  }
+
+  /**
+   * Instantiates the Presto Server to connect with the Apache CarbonData
+   */
+  private def createQueryRunner(extraProperties: util.Map[String, String],
+      carbonStorePath: String): DistributedQueryRunner = {
+    Try {
+      queryRunner.installPlugin(new CarbondataPlugin)
+      val carbonProperties = ImmutableMap.builder[String, String]
+        .put("carbondata-store", carbonStorePath).build
+
+      // CreateCatalog will create a catalog for CarbonData in etc/catalog.
+      queryRunner.createCatalog(CARBONDATA_CATALOG, CARBONDATA_CONNECTOR, carbonProperties)
+    } match {
+      case Success(result) => queryRunner
+      case Failure(exception) => queryRunner.close()
+        throw exception
+    }
+  }
+
+  /**
+   * stop the presto server
+   */
+  def stopServer(): Unit = {
+    queryRunner.close()
+    logger.info("***** Stopping The Server *****")
+  }
+
+  /**
+   * execute the query by establishing the jdbc connection
+   *
+   * @param query
+   * @return
+   */
+  def executeQuery(query: String): List[Map[String, Any]] = {
+
+    Try {
+      val conn: Connection = createJdbcConnection
+      logger.info(s"***** executing the query ***** \n $query")
+      val statement = conn.createStatement()
+      val result: ResultSet = statement.executeQuery(query)
+      convertResultSetToList(result)
+    } match {
+      case Success(result) => result
+      case Failure(jdbcException) => logger
+        .error(s"exception occurs${ jdbcException.getMessage } \n query failed $query")
+        throw jdbcException
+    }
+  }
+
+  /**
+   * Creates a JDBC Client to connect CarbonData to Presto
+   *
+   * @return
+   */
+  private def createJdbcConnection: Connection = {
+    val JDBC_DRIVER = "com.facebook.presto.jdbc.PrestoDriver"
+    val DB_URL = "jdbc:presto://localhost:8086/carbondata/testdb"
+
+    // The database Credentials
+    val USER = "username"
+    val PASS = "password"
+
+    // STEP 2: Register JDBC driver
+    Class.forName(JDBC_DRIVER)
+    // STEP 3: Open a connection
+    DriverManager.getConnection(DB_URL, USER, PASS)
+  }
+
+  /**
+   * convert result set into scala list of map
+   * each map represents a row
+   *
+   * @param queryResult
+   * @return
+   */
+  private def convertResultSetToList(queryResult: ResultSet): List[Map[String, Any]] = {
+    val metadata = queryResult.getMetaData
+    val colNames = (1 to metadata.getColumnCount) map metadata.getColumnName
+    Iterator.continually(buildMapFromQueryResult(queryResult, colNames)).takeWhile(_.isDefined)
+      .map(_.get).toList
+  }
+
+  private def buildMapFromQueryResult(queryResult: ResultSet,
+      colNames: Seq[String]): Option[Map[String, Any]] = {
+    if (queryResult.next()) {
+      Some(colNames.map(name => name -> queryResult.getObject(name)).toMap)
+    }
+    else {
+      None
+    }
+  }
+
+  /**
+   * CreateSession will create a new session in the Server to connect and execute queries.
+   */
+  private def createSession: Session = {
+    logger.info("\n Creating The Presto Server Session")
+    Session.builder(new SessionPropertyManager)
+      .setQueryId(new QueryIdGenerator().createNextQueryId)
+      .setIdentity(new Identity("user", Optional.empty()))
+      .setSource(CARBONDATA_SOURCE).setCatalog(CARBONDATA_CATALOG)
+      .setTimeZoneKey(UTC_KEY).setLocale(Locale.ENGLISH)
+      .setRemoteUserAddress("address")
+      .setUserAgent("agent").build
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
new file mode 100644
index 0000000..6cb97f1
--- /dev/null
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package util
+
+import java.io._
+import java.nio.charset.Charset
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{ArrayList, Date, List, UUID}
+
+import scala.collection.JavaConversions._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{RecordReader, TaskType}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier,
+ReverseDictionary}
+import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.fileoperations.{AtomicFileOperations, AtomicFileOperationsImpl,
+FileWriteOperation}
+import org.apache.carbondata.core.metadata.converter.{SchemaConverter,
+ThriftWrapperSchemaConverterImpl}
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension,
+CarbonMeasure, ColumnSchema}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata,
+CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter,
+CarbonDictionarySortIndexWriterImpl, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
+import org.apache.carbondata.core.writer.{CarbonDictionaryWriter, CarbonDictionaryWriterImpl,
+ThriftWriter}
+import org.apache.carbondata.processing.api.dataloader.SchemaInfo
+import org.apache.carbondata.processing.constants.TableOptionConstant
+import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat,
+CSVRecordReaderIterator, StringArrayWritable}
+import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.newflow.DataLoadExecutor
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
+
+object CarbonDataStoreCreator {
+
+  private val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * Create store without any restructure
+   */
+  def createCarbonStore(storePath: String, dataFilePath: String): Unit = {
+    try {
+      logger.info("Creating The Carbon Store")
+      val dbName: String = "testdb"
+      val tableName: String = "testtable"
+      val absoluteTableIdentifier = new AbsoluteTableIdentifier(
+        storePath,
+        new CarbonTableIdentifier(dbName,
+          tableName,
+          UUID.randomUUID().toString))
+      val factFilePath: String = new File(dataFilePath).getCanonicalPath
+      val storeDir: File = new File(absoluteTableIdentifier.getStorePath)
+      CarbonUtil.deleteFoldersAndFiles(storeDir)
+      CarbonProperties.getInstance.addProperty(
+        CarbonCommonConstants.STORE_LOCATION_HDFS,
+        absoluteTableIdentifier.getStorePath)
+      val table: CarbonTable = createTable(absoluteTableIdentifier)
+      writeDictionary(factFilePath, table, absoluteTableIdentifier)
+      val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table)
+      val loadModel: CarbonLoadModel = new CarbonLoadModel()
+      val partitionId: String = "0"
+      loadModel.setCarbonDataLoadSchema(schema)
+      loadModel.setDatabaseName(
+        absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName)
+      loadModel.setTableName(
+        absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+      loadModel.setTableName(
+        absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+      loadModel.setFactFilePath(factFilePath)
+      loadModel.setLoadMetadataDetails(new ArrayList[LoadMetadataDetails]())
+      loadModel.setStorePath(absoluteTableIdentifier.getStorePath)
+      CarbonProperties.getInstance
+        .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING, "true")
+
+      loadModel.setDefaultTimestampFormat(
+        CarbonProperties.getInstance.getProperty(
+          CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+          CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+      loadModel.setDefaultDateFormat(
+        CarbonProperties.getInstance.getProperty(
+          CarbonCommonConstants.CARBON_DATE_FORMAT,
+          CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+      loadModel.setSerializationNullFormat(
+        TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName +
+        "," +
+        "\\N")
+      loadModel.setBadRecordsLoggerEnable(
+        TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName +
+        "," +
+        "false")
+      loadModel.setBadRecordsAction(
+        TableOptionConstant.BAD_RECORDS_ACTION.getName + "," +
+        "force")
+      loadModel.setDirectLoad(true)
+      loadModel.setIsEmptyDataBadRecord(
+        DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD +
+        "," +
+        "true")
+      loadModel.setMaxColumns("15")
+      loadModel.setCsvHeader(
+        "ID,date,country,name,phonetype,serialname,salary,bonus,dob,shortField")
+      loadModel.setCsvHeaderColumns(loadModel.getCsvHeader.split(","))
+      loadModel.setTaskNo("0")
+      loadModel.setSegmentId("0")
+      loadModel.setPartitionId("0")
+      loadModel.setFactTimeStamp(System.currentTimeMillis())
+      loadModel.setMaxColumns("15")
+      executeGraph(loadModel, absoluteTableIdentifier.getStorePath)
+    } catch {
+      case e: Exception => e.printStackTrace()
+
+    }
+  }
+
+  private def createTable(absoluteTableIdentifier: AbsoluteTableIdentifier): CarbonTable = {
+    val tableInfo: TableInfo = new TableInfo()
+    tableInfo.setStorePath(absoluteTableIdentifier.getStorePath)
+    tableInfo.setDatabaseName(
+      absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName)
+    val tableSchema: TableSchema = new TableSchema()
+    tableSchema.setTableName(
+      absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+    val columnSchemas: List[ColumnSchema] = new ArrayList[ColumnSchema]()
+    val encodings: ArrayList[Encoding] = new ArrayList[Encoding]()
+    encodings.add(Encoding.INVERTED_INDEX)
+    val id: ColumnSchema = new ColumnSchema()
+    id.setColumnName("ID")
+    id.setColumnar(true)
+    id.setDataType(DataType.INT)
+    id.setEncodingList(encodings)
+    id.setColumnUniqueId(UUID.randomUUID().toString)
+    id.setColumnReferenceId(id.getColumnUniqueId)
+    id.setDimensionColumn(true)
+    id.setColumnGroup(1)
+    columnSchemas.add(id)
+
+    val dictEncoding: util.ArrayList[Encoding] = new util.ArrayList[Encoding]()
+    dictEncoding.add(Encoding.DIRECT_DICTIONARY)
+    dictEncoding.add(Encoding.DICTIONARY)
+    dictEncoding.add(Encoding.INVERTED_INDEX)
+
+    val date: ColumnSchema = new ColumnSchema()
+    date.setColumnName("date")
+    date.setColumnar(true)
+    date.setDataType(DataType.DATE)
+    date.setEncodingList(dictEncoding)
+    date.setColumnUniqueId(UUID.randomUUID().toString)
+    date.setDimensionColumn(true)
+    date.setColumnGroup(2)
+    date.setColumnReferenceId(date.getColumnUniqueId)
+    columnSchemas.add(date)
+
+    val country: ColumnSchema = new ColumnSchema()
+    country.setColumnName("country")
+    country.setColumnar(true)
+    country.setDataType(DataType.STRING)
+    country.setEncodingList(encodings)
+    country.setColumnUniqueId(UUID.randomUUID().toString)
+    country.setColumnReferenceId(country.getColumnUniqueId)
+    country.setDimensionColumn(true)
+    country.setColumnGroup(3)
+    country.setColumnReferenceId(country.getColumnUniqueId)
+    columnSchemas.add(country)
+
+    val name: ColumnSchema = new ColumnSchema()
+    name.setColumnName("name")
+    name.setColumnar(true)
+    name.setDataType(DataType.STRING)
+    name.setEncodingList(encodings)
+    name.setColumnUniqueId(UUID.randomUUID().toString)
+    name.setDimensionColumn(true)
+    name.setColumnGroup(4)
+    name.setColumnReferenceId(name.getColumnUniqueId)
+    columnSchemas.add(name)
+
+    val phonetype: ColumnSchema = new ColumnSchema()
+    phonetype.setColumnName("phonetype")
+    phonetype.setColumnar(true)
+    phonetype.setDataType(DataType.STRING)
+    phonetype.setEncodingList(encodings)
+    phonetype.setColumnUniqueId(UUID.randomUUID().toString)
+    phonetype.setDimensionColumn(true)
+    phonetype.setColumnGroup(5)
+    phonetype.setColumnReferenceId(phonetype.getColumnUniqueId)
+    columnSchemas.add(phonetype)
+
+    val serialname: ColumnSchema = new ColumnSchema()
+    serialname.setColumnName("serialname")
+    serialname.setColumnar(true)
+    serialname.setDataType(DataType.STRING)
+    serialname.setEncodingList(encodings)
+    serialname.setColumnUniqueId(UUID.randomUUID().toString)
+    serialname.setDimensionColumn(true)
+    serialname.setColumnGroup(6)
+    serialname.setColumnReferenceId(serialname.getColumnUniqueId)
+    columnSchemas.add(serialname)
+
+    val salary: ColumnSchema = new ColumnSchema()
+    salary.setColumnName("salary")
+    salary.setColumnar(true)
+    salary.setDataType(DataType.DOUBLE)
+    salary.setEncodingList(encodings)
+    salary.setColumnUniqueId(UUID.randomUUID().toString)
+    salary.setDimensionColumn(false)
+    salary.setColumnGroup(7)
+    salary.setColumnReferenceId(salary.getColumnUniqueId)
+    columnSchemas.add(salary)
+
+    val bonus: ColumnSchema = new ColumnSchema()
+    bonus.setColumnName("bonus")
+    bonus.setColumnar(true)
+    bonus.setDataType(DataType.DECIMAL)
+    bonus.setPrecision(10)
+    bonus.setScale(4)
+    bonus.setEncodingList(encodings)
+    bonus.setColumnUniqueId(UUID.randomUUID().toString)
+    bonus.setDimensionColumn(false)
+    bonus.setColumnGroup(8)
+    bonus.setColumnReferenceId(bonus.getColumnUniqueId)
+    columnSchemas.add(bonus)
+
+    val dob: ColumnSchema = new ColumnSchema()
+    dob.setColumnName("dob")
+    dob.setColumnar(true)
+    dob.setDataType(DataType.TIMESTAMP)
+    dob.setEncodingList(dictEncoding)
+    dob.setColumnUniqueId(UUID.randomUUID().toString)
+    dob.setDimensionColumn(true)
+    dob.setColumnGroup(9)
+    dob.setColumnReferenceId(dob.getColumnUniqueId)
+    columnSchemas.add(dob)
+
+    val shortField: ColumnSchema = new ColumnSchema()
+    shortField.setColumnName("shortField")
+    shortField.setColumnar(true)
+    shortField.setDataType(DataType.SHORT)
+    shortField.setEncodingList(encodings)
+    shortField.setColumnUniqueId(UUID.randomUUID().toString)
+    shortField.setDimensionColumn(false)
+    shortField.setColumnGroup(10)
+    shortField.setColumnReferenceId(shortField.getColumnUniqueId)
+    columnSchemas.add(shortField)
+
+    tableSchema.setListOfColumns(columnSchemas)
+    val schemaEvol: SchemaEvolution = new SchemaEvolution()
+    schemaEvol.setSchemaEvolutionEntryList(
+      new util.ArrayList[SchemaEvolutionEntry]())
+    tableSchema.setSchemaEvalution(schemaEvol)
+    tableSchema.setTableId(UUID.randomUUID().toString)
+    tableInfo.setTableUniqueName(
+      absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName +
+      "_" +
+      absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+    tableInfo.setLastUpdatedTime(System.currentTimeMillis())
+    tableInfo.setFactTable(tableSchema)
+    val carbonTablePath: CarbonTablePath = CarbonStorePath.getCarbonTablePath(
+      absoluteTableIdentifier.getStorePath,
+      absoluteTableIdentifier.getCarbonTableIdentifier)
+    val schemaFilePath: String = carbonTablePath.getSchemaFilePath
+    val schemaMetadataPath: String =
+      CarbonTablePath.getFolderContainingFile(schemaFilePath)
+    tableInfo.setMetaDataFilepath(schemaMetadataPath)
+    CarbonMetadata.getInstance.loadTableMetadata(tableInfo)
+    val schemaConverter: SchemaConverter =
+      new ThriftWrapperSchemaConverterImpl()
+    val thriftTableInfo: org.apache.carbondata.format.TableInfo =
+      schemaConverter.fromWrapperToExternalTableInfo(
+        tableInfo,
+        tableInfo.getDatabaseName,
+        tableInfo.getFactTable.getTableName)
+    val schemaEvolutionEntry: org.apache.carbondata.format.SchemaEvolutionEntry =
+      new org.apache.carbondata.format.SchemaEvolutionEntry(
+        tableInfo.getLastUpdatedTime)
+    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
+      .add(schemaEvolutionEntry)
+    val fileType: FileFactory.FileType =
+      FileFactory.getFileType(schemaMetadataPath)
+    if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+      FileFactory.mkdirs(schemaMetadataPath, fileType)
+    }
+    val thriftWriter: ThriftWriter = new ThriftWriter(schemaFilePath, false)
+    thriftWriter.open()
+    thriftWriter.write(thriftTableInfo)
+    thriftWriter.close()
+    CarbonMetadata.getInstance.getCarbonTable(tableInfo.getTableUniqueName)
+  }
+
+  private def writeDictionary(factFilePath: String,
+      table: CarbonTable,
+      absoluteTableIdentifier: AbsoluteTableIdentifier): Unit = {
+    val reader: BufferedReader = new BufferedReader(
+      new FileReader(factFilePath))
+    val header: String = reader.readLine()
+    val split: Array[String] = header.split(",")
+    val allCols: util.List[CarbonColumn] = new util.ArrayList[CarbonColumn]()
+    val dims: util.List[CarbonDimension] =
+      table.getDimensionByTableName(table.getFactTableName)
+    allCols.addAll(dims)
+    val msrs: List[CarbonMeasure] =
+      table.getMeasureByTableName(table.getFactTableName)
+    allCols.addAll(msrs)
+    val set: Array[util.Set[String]] = Array.ofDim[util.Set[String]](dims.size)
+    for (i <- set.indices) {
+      set(i) = new util.HashSet[String]()
+    }
+    var line: String = reader.readLine()
+    while (line != null) {
+      val data: Array[String] = line.split(",")
+      for (i <- set.indices) {
+        set(i).add(data(i))
+      }
+      line = reader.readLine()
+    }
+    val dictCache: Cache[DictionaryColumnUniqueIdentifier, ReverseDictionary] = CacheProvider
+      .getInstance.createCache(CacheType.REVERSE_DICTIONARY,
+      absoluteTableIdentifier.getStorePath)
+    for (i <- set.indices) {
+      val columnIdentifier: ColumnIdentifier =
+        new ColumnIdentifier(dims.get(i).getColumnId, null, null)
+      val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier =
+        new DictionaryColumnUniqueIdentifier(
+          table.getCarbonTableIdentifier,
+          columnIdentifier,
+          columnIdentifier.getDataType,
+          CarbonStorePath.getCarbonTablePath(table.getStorePath,
+            table.getCarbonTableIdentifier)
+        )
+      val writer: CarbonDictionaryWriter = new CarbonDictionaryWriterImpl(
+        absoluteTableIdentifier.getStorePath,
+        absoluteTableIdentifier.getCarbonTableIdentifier,
+        dictionaryColumnUniqueIdentifier)
+      for (value <- set(i)) {
+        writer.write(value)
+      }
+      writer.close()
+      writer.commit()
+      val dict: Dictionary = dictCache
+        .get(
+          new DictionaryColumnUniqueIdentifier(
+            absoluteTableIdentifier.getCarbonTableIdentifier,
+            columnIdentifier,
+            dims.get(i).getDataType,
+            CarbonStorePath.getCarbonTablePath(table.getStorePath,
+              table.getCarbonTableIdentifier)
+          ))
+        .asInstanceOf[Dictionary]
+      val preparator: CarbonDictionarySortInfoPreparator =
+        new CarbonDictionarySortInfoPreparator()
+      val newDistinctValues: List[String] = new ArrayList[String]()
+      val dictionarySortInfo: CarbonDictionarySortInfo =
+        preparator.getDictionarySortInfo(newDistinctValues,
+          dict,
+          dims.get(i).getDataType)
+      val carbonDictionaryWriter: CarbonDictionarySortIndexWriter =
+        new CarbonDictionarySortIndexWriterImpl(
+          absoluteTableIdentifier.getCarbonTableIdentifier,
+          dictionaryColumnUniqueIdentifier,
+          absoluteTableIdentifier.getStorePath)
+      try {
+        carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
+        carbonDictionaryWriter.writeInvertedSortIndex(
+          dictionarySortInfo.getSortIndexInverted)
+      }
+      catch {
+        case exception: Exception => logger.error(s"exception occurs $exception")
+      }
+      finally carbonDictionaryWriter.close()
+    }
+    reader.close()
+  }
+
+  /**
+   * Execute graph which will further load data
+   *
+   * @param loadModel
+   * @param storeLocation
+   * @throws Exception
+   */
+  private def executeGraph(loadModel: CarbonLoadModel, storeLocation: String): Unit = {
+    new File(storeLocation).mkdirs()
+    val outPutLoc: String = storeLocation + "/etl"
+    val databaseName: String = loadModel.getDatabaseName
+    val tableName: String = loadModel.getTableName
+    val tempLocationKey: String = databaseName + '_' + tableName + "_1"
+    CarbonProperties.getInstance.addProperty(tempLocationKey, storeLocation)
+    CarbonProperties.getInstance
+      .addProperty("store_output_location", outPutLoc)
+    CarbonProperties.getInstance.addProperty("send.signal.load", "false")
+    CarbonProperties.getInstance
+      .addProperty("carbon.is.columnar.storage", "true")
+    CarbonProperties.getInstance
+      .addProperty("carbon.dimension.split.value.in.columnar", "1")
+    CarbonProperties.getInstance
+      .addProperty("carbon.is.fullyfilled.bits", "true")
+    CarbonProperties.getInstance.addProperty("is.int.based.indexer", "true")
+    CarbonProperties.getInstance
+      .addProperty("aggregate.columnar.keyblock", "true")
+    CarbonProperties.getInstance
+      .addProperty("high.cardinality.value", "100000")
+    CarbonProperties.getInstance.addProperty("is.compressed.keyblock", "false")
+    CarbonProperties.getInstance.addProperty("carbon.leaf.node.size", "120000")
+    CarbonProperties.getInstance
+      .addProperty("carbon.direct.dictionary", "true")
+    val graphPath: String = outPutLoc + File.separator + loadModel.getDatabaseName +
+                            File.separator +
+                            tableName +
+                            File.separator +
+                            0 +
+                            File.separator +
+                            1 +
+                            File.separator +
+                            tableName +
+                            ".ktr"
+    val path: File = new File(graphPath)
+    if (path.exists()) {
+      path.delete()
+    }
+    val info: SchemaInfo = new SchemaInfo()
+    val blockDetails: BlockDetails = new BlockDetails(
+      new Path(loadModel.getFactFilePath),
+      0,
+      new File(loadModel.getFactFilePath).length,
+      Array("localhost"))
+    val configuration: Configuration = new Configuration()
+    CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar)
+    CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter)
+    CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar)
+    CSVInputFormat.setHeaderExtractionEnabled(configuration, true)
+    CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar)
+    CSVInputFormat.setReadBufferSize(
+      configuration,
+      CarbonProperties.getInstance.getProperty(
+        CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+        CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
+    CSVInputFormat.setNumberOfColumns(
+      configuration,
+      String.valueOf(loadModel.getCsvHeaderColumns.length))
+    CSVInputFormat.setMaxColumns(configuration, "15")
+    val hadoopAttemptContext: TaskAttemptContextImpl =
+      new TaskAttemptContextImpl(configuration,
+        new TaskAttemptID("", 1, TaskType.MAP, 0, 0))
+    val format: CSVInputFormat = new CSVInputFormat()
+    val recordReader: RecordReader[NullWritable, StringArrayWritable] =
+      format.createRecordReader(blockDetails, hadoopAttemptContext)
+    val readerIterator: CSVRecordReaderIterator = new CSVRecordReaderIterator(
+      recordReader,
+      blockDetails,
+      hadoopAttemptContext)
+    new DataLoadExecutor()
+      .execute(loadModel, Array(storeLocation), Array(readerIterator))
+    info.setDatabaseName(databaseName)
+    info.setTableName(tableName)
+    writeLoadMetadata(loadModel.getCarbonDataLoadSchema,
+      loadModel.getTableName,
+      loadModel.getTableName,
+      new ArrayList[LoadMetadataDetails]())
+    val segLocation: String = storeLocation + "/" + databaseName + "/" + tableName +
+                              "/Fact/Part0/Segment_0"
+    val file: File = new File(segLocation)
+    var factFile: File = null
+    val folderList: Array[File] = file.listFiles()
+    var folder: File = null
+    for (i <- folderList.indices if folderList(i).isDirectory) {
+      folder = folderList(i)
+    }
+    if (folder.isDirectory) {
+      val files: Array[File] = folder.listFiles()
+      for (i <- files.indices
+           if !files(i).isDirectory && files(i).getName.startsWith("part")) {
+        factFile = files(i)
+        //break
+      }
+      factFile.renameTo(new File(segLocation + "/" + factFile.getName))
+      CarbonUtil.deleteFoldersAndFiles(folder)
+    }
+  }
+
+  private def writeLoadMetadata(
+      schema: CarbonDataLoadSchema,
+      databaseName: String,
+      tableName: String,
+      listOfLoadFolderDetails: util.List[LoadMetadataDetails]): Unit = {
+    try {
+      val loadMetadataDetails: LoadMetadataDetails = new LoadMetadataDetails()
+      loadMetadataDetails.setLoadEndTime(System.currentTimeMillis())
+      loadMetadataDetails.setLoadStatus("SUCCESS")
+      loadMetadataDetails.setLoadName(String.valueOf(0))
+      loadMetadataDetails.setLoadStartTime(
+        loadMetadataDetails.getTimeStamp(readCurrentTime()))
+      listOfLoadFolderDetails.add(loadMetadataDetails)
+      val dataLoadLocation: String = schema.getCarbonTable.getMetaDataFilepath + File.separator +
+                                     CarbonCommonConstants.LOADMETADATA_FILENAME
+      val gsonObjectToWrite: Gson = new Gson()
+      val writeOperation: AtomicFileOperations = new AtomicFileOperationsImpl(
+        dataLoadLocation,
+        FileFactory.getFileType(dataLoadLocation))
+      val dataOutputStream =
+        writeOperation.openForWrite(FileWriteOperation.OVERWRITE)
+      val brWriter = new BufferedWriter(
+        new OutputStreamWriter(
+          dataOutputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)))
+      val metadataInstance: String =
+        gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray())
+      brWriter.write(metadataInstance)
+      if (Option(brWriter).isDefined) {
+        brWriter.flush()
+      }
+      CarbonUtil.closeStreams(brWriter)
+      writeOperation.close()
+    }
+    catch {
+      case exception: Exception => logger.error(s"Exception occurs $exception")
+    }
+  }
+
+  private def readCurrentTime(): String = {
+    val sdf: SimpleDateFormat = new SimpleDateFormat(
+      CarbonCommonConstants.CARBON_TIMESTAMP)
+    sdf.format(new Date())
+  }
+
+}
+