You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/09/18 10:01:43 UTC
[24/51] [abbrv] carbondata git commit: [CARBONDATA-1423] added
integration test cases for presto
[CARBONDATA-1423] added integration test cases for presto
This closes #1303
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b6727d75
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b6727d75
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b6727d75
Branch: refs/heads/branch-1.2
Commit: b6727d75d2a79498c6861959bba24d96fd075108
Parents: 9f0ac24
Author: anubhav100 <an...@knoldus.in>
Authored: Tue Aug 29 14:19:31 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Sep 13 12:11:07 2017 +0800
----------------------------------------------------------------------
integration/presto/pom.xml | 319 +++++++----
.../presto/src/test/resources/alldatatype.csv | 11 +
.../presto/src/test/resources/log4j.properties | 11 +
.../integrationtest/PrestoAllDataTypeTest.scala | 403 +++++++++++++
.../carbondata/presto/server/PrestoServer.scala | 170 ++++++
.../presto/util/CarbonDataStoreCreator.scala | 559 +++++++++++++++++++
6 files changed, 1373 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 617ce93..924a2be 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -15,9 +15,7 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -38,62 +36,35 @@
</properties>
<dependencies>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.8.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>2.8.1</version>
+ <scope>provided</scope>
+ </dependency>
+
+
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-hadoop</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-network-shuffle_2.11</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sketch_2.11</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.java.dev.jets3t</groupId>
- <artifactId>jets3t</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>javax.servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-math3</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.antlr</groupId>
- <artifactId>antlr4-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.esotericsoftware</groupId>
- <artifactId>minlog</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.janino</groupId>
- <artifactId>janino</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.jpountz.lz4</groupId>
- <artifactId>lz4</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sf.py4j</groupId>
- <artifactId>py4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.spark-project.spark</groupId>
- <artifactId>unused</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
@@ -172,6 +143,7 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</exclusion>
+
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
@@ -254,7 +226,30 @@
</exclusion>
</exclusions>
</dependency>
-
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-tests</artifactId>
+ <scope>test</scope>
+ <version>${presto.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.openjdk.jol</groupId>
+ <artifactId>jol-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.10</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
@@ -305,26 +300,82 @@
<artifactId>json</artifactId>
<version>0.144</version>
<!--<scope>provided</scope>-->
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+
+ </exclusions>
</dependency>
<dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <version>2.2.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
<version>1.0</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- <version>2.6.0</version>
- <scope>provided</scope>
- </dependency>
<!--presto integrated-->
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>${presto.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.10</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
@@ -335,24 +386,73 @@
<groupId>com.facebook.presto.hadoop</groupId>
<artifactId>hadoop-apache2</artifactId>
<version>2.7.3-1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-jdbc</artifactId>
+ <version>${presto.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.4.1</version>
+ </dependency>
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>slice</artifactId>
+ <version>0.27</version>
+ <scope>provided</scope>
<exclusions>
<exclusion>
- <groupId>org.tukaani</groupId>
- <artifactId>xz</artifactId>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst_2.10 -->
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
</dependencies>
<build>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/resources</directory>
+ </resource>
+ <resource>
+ <directory>.</directory>
+ <includes>
+ <include>CARBON_SPARK_INTERFACELogResource.properties</include>
+ </includes>
+ </resource>
+ </resources>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
@@ -367,11 +467,13 @@
<version>2.18</version>
<!-- Note config is repeated in scalatest config -->
<configuration>
+ <skip>false</skip>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
<systemProperties>
<java.awt.headless>true</java.awt.headless>
</systemProperties>
+ <testFailureIgnore>false</testFailureIgnore>
<failIfNoTests>false</failIfNoTests>
</configuration>
</plugin>
@@ -384,30 +486,47 @@
<skip>true</skip>
</configuration>
</plugin>
-
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-enforcer-plugin</artifactId>
- <version>1.4.1</version>
- <configuration>
- <skip>true</skip>
- </configuration>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>testCompile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
</plugin>
-
<plugin>
- <groupId>com.ning.maven.plugins</groupId>
- <artifactId>maven-dependency-versions-check-plugin</artifactId>
+ <artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <skip>true</skip>
- <failBuildInCaseOfConflict>false</failBuildInCaseOfConflict>
+ <source>1.8</source>
+ <target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.4.1</version>
<configuration>
- <skip>false</skip>
+ <skip>true</skip>
</configuration>
</plugin>
@@ -418,14 +537,12 @@
<skip>true</skip>
</configuration>
</plugin>
-
<plugin>
<groupId>io.takari.maven.plugins</groupId>
<artifactId>presto-maven-plugin</artifactId>
<version>0.1.12</version>
<extensions>true</extensions>
</plugin>
-
<plugin>
<groupId>pl.project13.maven</groupId>
<artifactId>git-commit-id-plugin</artifactId>
@@ -434,28 +551,30 @@
</configuration>
</plugin>
<plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <version>2.15.2</version>
+
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>1.0</version>
+ <!-- Note config is repeated in surefire config -->
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <testFailureIgnore>false</testFailureIgnore>
+ <filereports>CarbonTestSuite.txt</filereports>
+ <argLine>-ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ </argLine>
+ <stderr />
+ <environmentVariables>
+ </environmentVariables>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ </systemProperties>
+ </configuration>
<executions>
<execution>
- <id>compile</id>
- <goals>
- <goal>compile</goal>
- </goals>
- <phase>compile</phase>
- </execution>
- <execution>
- <id>testCompile</id>
- <goals>
- <goal>testCompile</goal>
- </goals>
- <phase>test</phase>
- </execution>
- <execution>
- <phase>process-resources</phase>
+ <id>test</id>
<goals>
- <goal>compile</goal>
+ <goal>test</goal>
</goals>
</execution>
</executions>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/src/test/resources/alldatatype.csv
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/resources/alldatatype.csv b/integration/presto/src/test/resources/alldatatype.csv
new file mode 100644
index 0000000..6b0259a
--- /dev/null
+++ b/integration/presto/src/test/resources/alldatatype.csv
@@ -0,0 +1,11 @@
+ID,date,country,name,phonetype,serialname,salary,bonus,dob,shortfield
+1,2015-07-23,china,anubhav,phone197,ASD69643,5000000.00,1234.444,2016-04-14 15/00/09,10
+2,2015-07-24,china,jatin,phone756,ASD42892,150010.999,1234.5555,2016-04-14 15:00:09,10
+3,2015-07-25,china,liang,phone1904,ASD37014,15002.110,600.777,2016-01-14 15:07:09,8
+4,2015-07-26,china,prince,phone2435,ASD66902,15003.00,9999.999,1992-04-14 13:00:09,4
+5,2015-07-27,china,bhavya,phone2441,ASD90633,15004.00,5000.999,2010-06-19 14:10:06,11
+6,2015-07-28,china,akash,phone294,ASD59961,15005.00,500.59,2013-07-19 12:10:08,18
+7,2015-07-29,china,sahil,phone610,ASD14875,15006.00,500.99,,2007-04-19 11:10:06,17
+8,2015-07-30,china,geetika,phone1848,ASD57308,15007.500,500.88,2008-09-21 11:10:06,10
+9,2015-07-18,china,ravindra,phone706,ASD86717,15008.00,700.999,2009-06-19 15:10:06,1
+9,2015/07/18,china,jitesh,phone706,ASD86717,15008.00,500.414,2001-08-29 13:09:03,12
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/resources/log4j.properties b/integration/presto/src/test/resources/log4j.properties
new file mode 100644
index 0000000..e369916
--- /dev/null
+++ b/integration/presto/src/test/resources/log4j.properties
@@ -0,0 +1,11 @@
+# Root logger option
+log4j.rootLogger=INFO,stdout
+
+
+# Redirect log messages to console
+log4j.appender.debug=org.apache.log4j.RollingFileAppender
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
new file mode 100644
index 0000000..1743be6
--- /dev/null
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto.integrationtest
+
+import java.io.File
+
+import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
+import util.CarbonDataStoreCreator
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.presto.server.PrestoServer
+
+
+class PrestoAllDataTypeTest extends FunSuiteLike with BeforeAndAfterAll {
+
+ private val logger = LogServiceFactory
+ .getLogService(classOf[PrestoAllDataTypeTest].getCanonicalName)
+
+ private val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ private val storePath = s"$rootPath/integration/presto/target/store"
+
+ override def beforeAll: Unit = {
+ CarbonDataStoreCreator
+ .createCarbonStore(storePath, s"$rootPath/integration/presto/src/test/resources/alldatatype.csv")
+ logger.info(s"\nCarbon store is created at location: $storePath")
+ PrestoServer.startServer(storePath)
+ }
+
+ override def afterAll(): Unit = {
+ PrestoServer.stopServer()
+ }
+
+ test("test the result for count(*) in presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT COUNT(*) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 10))
+ assert(actualResult.equals(expectedResult))
+ }
+ test("test the result for count() clause with distinct operator in presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT COUNT(DISTINCT ID) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 9))
+ assert(actualResult.equals(expectedResult))
+
+ }
+ test("test the result for sum()in presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT SUM(ID) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 54))
+ assert(actualResult.equals(expectedResult))
+ }
+ test("test the result for sum() wiTh distinct operator in presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT SUM(DISTINCT ID) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 45))
+ assert(actualResult.equals(expectedResult))
+ }
+ test("test the result for avg() with distinct operator in presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT AVG(DISTINCT ID) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 5))
+ assert(actualResult.equals(expectedResult))
+ }
+ test("test the result for min() with distinct operator in presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT MIN(DISTINCT ID) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 1))
+ assert(actualResult.equals(expectedResult))
+ }
+ test("test the result for max() with distinct operator in presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT MAX(DISTINCT ID) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 9))
+ assert(actualResult.equals(expectedResult))
+ }
+ test("test the result for count()clause with distinct operator on decimal column in presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT COUNT(DISTINCT BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 7))
+ actualResult.equals(expectedResult)
+ }
+ test("test the result for count()clause with out distinct operator on decimal column in presto")
+ {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT COUNT(BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 10))
+ actualResult.equals(expectedResult)
+ }
+ test("test the result for sum()with out distinct operator for decimal column in presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT SUM(DISTINCT BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 54))
+ actualResult.equals(expectedResult)
+ }
+ test("test the result for sum() with distinct operator for decimal column in presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT SUM(DISTINCT BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 20774.6475))
+ assert(
+ actualResult.head("RESULT").toString.toDouble ==
+ expectedResult.head("RESULT").toString.toDouble)
+ }
+ test("test the result for avg() with distinct operator on decimal coin presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT AVG(DISTINCT BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map("RESULT" -> 8900))
+ actualResult.equals(expectedResult)
+ }
+
+ test("test the result for min() with distinct operator in decimalType of presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT MIN(BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map(
+ "RESULT" -> java.math.BigDecimal.valueOf(500.414).setScale(4)))
+ actualResult.equals(expectedResult)
+ }
+
+ test("test the result for max() with distinct operator in decimalType of presto") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT MAX(BONUS) AS RESULT FROM TESTDB.TESTTABLE ")
+ val expectedResult: List[Map[String, Any]] = List(Map(
+ "RESULT" -> java.math.BigDecimal.valueOf(9999.999).setScale(4)))
+ actualResult.equals(expectedResult)
+ }
+ test("select decimal data type with ORDER BY clause") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT DISTINCT BONUS FROM TESTDB.TESTTABLE ORDER BY BONUS limit 3 ")
+ val expectedResult: List[Map[String, Any]] = List(Map(
+ "BONUS" -> java.math.BigDecimal.valueOf(500.414).setScale(4)),
+ Map("BONUS" -> java.math.BigDecimal.valueOf(500.59).setScale(4)),
+ Map("BONUS" -> java.math.BigDecimal.valueOf(500.88).setScale(4)))
+ assert(actualResult.equals(expectedResult))
+ }
+ test("select string type with order by clause") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT NAME FROM TESTDB.TESTTABLE ORDER BY NAME")
+ val expectedResult: List[Map[String, Any]] = List(Map("NAME" -> "akash"),
+ Map("NAME" -> "anubhav"),
+ Map("NAME" -> "bhavya"),
+ Map("NAME" -> "geetika"),
+ Map("NAME" -> "jatin"),
+ Map("NAME" -> "jitesh"),
+ Map("NAME" -> "liang"),
+ Map("NAME" -> "prince"),
+ Map("NAME" -> "ravindra"),
+ Map("NAME" -> "sahil"))
+ assert(actualResult.equals(expectedResult))
+ }
+ test("select DATE type with order by clause") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT DATE FROM TESTDB.TESTTABLE ORDER BY DATE")
+ val expectedResult: List[Map[String, Any]] = List(Map("DATE" -> "2015-07-18"),
+ Map("DATE" -> "2015-07-23"),
+ Map("DATE" -> "2015-07-24"),
+ Map("DATE" -> "2015-07-25"),
+ Map("DATE" -> "2015-07-26"),
+ Map("DATE" -> "2015-07-27"),
+ Map("DATE" -> "2015-07-28"),
+ Map("DATE" -> "2015-07-29"),
+ Map("DATE" -> "2015-07-30"),
+ Map("DATE" -> null))
+
+ assert(actualResult.filterNot(_.get("DATE") == null).zipWithIndex.forall {
+ case (map, index) => map.get("DATE").toString
+ .equals(expectedResult(index).get("DATE").toString)
+ })
+ assert(actualResult.reverse.head("DATE") == null)
+ }
+ test("select int type with order by clause") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT DISTINCT ID FROM TESTDB.TESTTABLE ORDER BY ID")
+ val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 1),
+ Map("ID" -> 2),
+ Map("ID" -> 3),
+ Map("ID" -> 4),
+ Map("ID" -> 5),
+ Map("ID" -> 6),
+ Map("ID" -> 7),
+ Map("ID" -> 8),
+ Map("ID" -> 9))
+
+ assert(actualResult.equals(expectedResult))
+
+ }
+
+ test("test and filter clause with greater than expression") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery(
+ "SELECT ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY,BONUS FROM TESTDB.TESTTABLE " +
+ "WHERE BONUS>1234 AND ID>2 GROUP BY ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY," +
+ "BONUS ORDER BY ID")
+ val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 4,
+ "NAME" -> "prince",
+ "BONUS" -> java.math.BigDecimal.valueOf(9999.9990).setScale(4),
+ "DATE" -> "2015-07-26",
+ "SALARY" -> 15003.0,
+ "SERIALNAME" -> "ASD66902",
+ "COUNTRY" -> "china",
+ "PHONETYPE" -> "phone2435"),
+ Map("ID" -> 5,
+ "NAME" -> "bhavya",
+ "BONUS" -> java.math.BigDecimal.valueOf(5000.999).setScale(4),
+ "DATE" -> "2015-07-27",
+ "SALARY" -> 15004.0,
+ "SERIALNAME" -> "ASD90633",
+ "COUNTRY" -> "china",
+ "PHONETYPE" -> "phone2441"))
+ assert(actualResult.toString() equals expectedResult.toString())
+
+
+ }
+
+ test("test and filter clause with greater than equal to expression") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery(
+ "SELECT ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY,BONUS FROM TESTDB.TESTTABLE " +
+ "WHERE BONUS>=1234.444 GROUP BY ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY," +
+ "BONUS ORDER BY ID")
+ val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 1,
+ "NAME" -> "anubhav",
+ "BONUS" -> java.math.BigDecimal.valueOf(1234.4440).setScale(4),
+ "DATE" -> "2015-07-23",
+ "SALARY" -> "5000000.0",
+ "SERIALNAME" -> "ASD69643",
+ "COUNTRY" -> "china",
+ "PHONETYPE" -> "phone197"),
+ Map("ID" -> 2,
+ "NAME" -> "jatin",
+ "BONUS" -> java.math.BigDecimal.valueOf(1234.5555).setScale(4)
+ ,
+ "DATE" -> "2015-07-24",
+ "SALARY" -> java.math.BigDecimal.valueOf(150010.9990).setScale(3),
+ "SERIALNAME" -> "ASD42892",
+ "COUNTRY" -> "china",
+ "PHONETYPE" -> "phone756"),
+ Map("ID" -> 4,
+ "NAME" -> "prince",
+ "BONUS" -> java.math.BigDecimal.valueOf(9999.9990).setScale(4),
+ "DATE" -> "2015-07-26",
+ "SALARY" -> java.math.BigDecimal.valueOf(15003.0).setScale(1),
+ "SERIALNAME" -> "ASD66902",
+ "COUNTRY" -> "china",
+ "PHONETYPE" -> "phone2435"),
+ Map("ID" -> 5,
+ "NAME" -> "bhavya",
+ "BONUS" -> java.math.BigDecimal.valueOf(5000.9990).setScale(4),
+ "DATE" -> "2015-07-27",
+ "SALARY" -> java.math.BigDecimal.valueOf(15004.0).setScale(1),
+ "SERIALNAME" -> "ASD90633",
+ "COUNTRY" -> "china",
+ "PHONETYPE" -> "phone2441"))
+ assert(actualResult.toString() equals expectedResult.toString())
+ }
+ test("test and filter clause with less than equal to expression") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery(
+ "SELECT ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY,BONUS FROM TESTDB.TESTTABLE " +
+ "WHERE BONUS<=1234.444 GROUP BY ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY," +
+ "BONUS ORDER BY ID LIMIT 2")
+
+ val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 1,
+ "NAME" -> "anubhav",
+ "BONUS" -> java.math.BigDecimal.valueOf(1234.4440).setScale(4),
+ "DATE" -> "2015-07-23",
+ "SALARY" -> "5000000.0",
+ "SERIALNAME" -> "ASD69643",
+ "COUNTRY" -> "china",
+ "PHONETYPE" -> "phone197"),
+ Map("ID" -> 3,
+ "NAME" -> "liang",
+ "BONUS" -> java.math.BigDecimal.valueOf(600.7770).setScale(4),
+ "DATE" -> "2015-07-25",
+ "SALARY" -> java.math.BigDecimal.valueOf(15002.11).setScale(2),
+ "SERIALNAME" -> "ASD37014",
+ "COUNTRY" -> "china",
+ "PHONETYPE" -> "phone1904"))
+ assert(actualResult.toString() equals expectedResult.toString())
+ }
+ test("test equal to expression on decimal value") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery(
+ "SELECT ID FROM TESTDB.TESTTABLE WHERE BONUS=1234.444")
+
+ val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 1))
+
+ assert(actualResult equals expectedResult)
+ }
+ test("test less than expression with and operator") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery(
+ "SELECT ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY,BONUS FROM TESTDB.TESTTABLE " +
+ "WHERE BONUS>1234 AND ID<2 GROUP BY ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY," +
+ "BONUS ORDER BY ID")
+ val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 1,
+ "NAME" -> "anubhav",
+ "BONUS" -> java.math.BigDecimal.valueOf(1234.4440).setScale(4),
+ "DATE" -> "2015-07-23",
+ "SALARY" -> 5000000.0,
+ "SERIALNAME" -> "ASD69643",
+ "COUNTRY" -> "china",
+ "PHONETYPE" -> "phone197"))
+ assert(actualResult.toString().equals(expectedResult.toString()))
+ }
+ test("test the result for in clause") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT NAME from testdb.testtable WHERE PHONETYPE IN('phone1848','phone706')")
+ val expectedResult: List[Map[String, Any]] = List(
+ Map("NAME" -> "geetika"),
+ Map("NAME" -> "ravindra"),
+ Map("NAME" -> "jitesh"))
+
+ assert(actualResult.equals(expectedResult))
+ }
+ test("test the result for not in clause") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery(
+ "SELECT NAME from testdb.testtable WHERE PHONETYPE NOT IN('phone1848','phone706')")
+ val expectedResult: List[Map[String, Any]] = List(Map("NAME" -> "anubhav"),
+ Map("NAME" -> "jatin"),
+ Map("NAME" -> "liang"),
+ Map("NAME" -> "prince"),
+ Map("NAME" -> "bhavya"),
+ Map("NAME" -> "akash"),
+ Map("NAME" -> "sahil"))
+
+ assert(actualResult.equals(expectedResult))
+ }
+ test("test for null operator on date data type") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT ID FROM TESTDB.TESTTABLE WHERE DATE IS NULL")
+ val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 9))
+ assert(actualResult.equals(expectedResult))
+
+ }
+ test("test for not null operator on date data type") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT NAME FROM TESTDB.TESTTABLE WHERE DATE IS NOT NULL AND ID=9")
+ val expectedResult: List[Map[String, Any]] = List(Map("NAME" -> "ravindra"))
+ assert(actualResult.equals(expectedResult))
+
+ }
+ test("test for not null operator on timestamp type") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT NAME FROM TESTDB.TESTTABLE WHERE DOB IS NOT NULL AND ID=9")
+ val expectedResult: List[Map[String, Any]] = List(Map("NAME" -> "ravindra"),
+ Map("NAME" -> "jitesh"))
+ assert(actualResult.equals(expectedResult))
+
+ }
+ test("test for null operator on timestamp type") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery("SELECT NAME FROM TESTDB.TESTTABLE WHERE DOB IS NULL AND ID=1")
+ val expectedResult: List[Map[String, Any]] = List(Map("NAME" -> "anubhav"))
+ assert(actualResult.equals(expectedResult))
+
+ }
+ test("test the result for short datatype with order by clause") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery(
+ "SELECT DISTINCT SHORTFIELD from testdb.testtable ORDER BY SHORTFIELD ")
+ val expectedResult: List[Map[String, Any]] = List(Map("SHORTFIELD" -> 1),
+ Map("SHORTFIELD" -> 4),
+ Map("SHORTFIELD" -> 8),
+ Map("SHORTFIELD" -> 10),
+ Map("SHORTFIELD" -> 11),
+ Map("SHORTFIELD" -> 12),
+ Map("SHORTFIELD" -> 18),
+ Map("SHORTFIELD" -> null))
+
+ assert(actualResult.equals(expectedResult))
+ }
+ test("test the result for short datatype in clause where field is null") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery(
+ "SELECT ID from testdb.testtable WHERE SHORTFIELD IS NULL ORDER BY SHORTFIELD ")
+ val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 7))
+
+ assert(actualResult.equals(expectedResult))
+ }
+ test("test the result for short datatype with greater than operator") {
+ val actualResult: List[Map[String, Any]] = PrestoServer
+ .executeQuery(
+ "SELECT ID from testdb.testtable WHERE SHORTFIELD>11 ")
+ val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 6), Map("ID" -> 9))
+
+ assert(actualResult.equals(expectedResult))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
new file mode 100644
index 0000000..3497f47
--- /dev/null
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto.server
+
+import java.sql.{Connection, DriverManager, ResultSet}
+import java.util
+import java.util.{Locale, Optional}
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+import com.facebook.presto.Session
+import com.facebook.presto.execution.QueryIdGenerator
+import com.facebook.presto.metadata.SessionPropertyManager
+import com.facebook.presto.spi.`type`.TimeZoneKey.UTC_KEY
+import com.facebook.presto.spi.security.Identity
+import com.facebook.presto.tests.DistributedQueryRunner
+import com.google.common.collect.ImmutableMap
+import org.slf4j.{Logger, LoggerFactory}
+
+import org.apache.carbondata.presto.CarbondataPlugin
+
+object PrestoServer {
+
+ val CARBONDATA_CATALOG = "carbondata"
+ val CARBONDATA_CONNECTOR = "carbondata"
+ val CARBONDATA_SOURCE = "carbondata"
+ val logger: Logger = LoggerFactory.getLogger(this.getClass)
+
+
+ val prestoProperties: util.Map[String, String] = Map(("http-server.http.port", "8086")).asJava
+ createSession
+ val queryRunner = new DistributedQueryRunner(createSession, 4, prestoProperties)
+
+
+ /**
+ * start the presto server
+ *
+ * @param carbonStorePath
+ */
+ def startServer(carbonStorePath: String) = {
+
+ logger.info("======== STARTING PRESTO SERVER ========")
+ val queryRunner: DistributedQueryRunner = createQueryRunner(
+ prestoProperties, carbonStorePath)
+
+ logger.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl)
+ }
+
+ /**
+ * Instantiates the Presto Server to connect with the Apache CarbonData
+ */
+ private def createQueryRunner(extraProperties: util.Map[String, String],
+ carbonStorePath: String): DistributedQueryRunner = {
+ Try {
+ queryRunner.installPlugin(new CarbondataPlugin)
+ val carbonProperties = ImmutableMap.builder[String, String]
+ .put("carbondata-store", carbonStorePath).build
+
+ // CreateCatalog will create a catalog for CarbonData in etc/catalog.
+ queryRunner.createCatalog(CARBONDATA_CATALOG, CARBONDATA_CONNECTOR, carbonProperties)
+ } match {
+ case Success(result) => queryRunner
+ case Failure(exception) => queryRunner.close()
+ throw exception
+ }
+ }
+
+ /**
+ * stop the presto server
+ */
+ def stopServer(): Unit = {
+ queryRunner.close()
+ logger.info("***** Stopping The Server *****")
+ }
+
+ /**
+ * execute the query by establishing the jdbc connection
+ *
+ * @param query
+ * @return
+ */
+ def executeQuery(query: String): List[Map[String, Any]] = {
+
+ Try {
+ val conn: Connection = createJdbcConnection
+ logger.info(s"***** executing the query ***** \n $query")
+ val statement = conn.createStatement()
+ val result: ResultSet = statement.executeQuery(query)
+ convertResultSetToList(result)
+ } match {
+ case Success(result) => result
+ case Failure(jdbcException) => logger
+ .error(s"exception occurs${ jdbcException.getMessage } \n query failed $query")
+ throw jdbcException
+ }
+ }
+
+ /**
+ * Creates a JDBC Client to connect CarbonData to Presto
+ *
+ * @return
+ */
+ private def createJdbcConnection: Connection = {
+ val JDBC_DRIVER = "com.facebook.presto.jdbc.PrestoDriver"
+ val DB_URL = "jdbc:presto://localhost:8086/carbondata/testdb"
+
+ // The database Credentials
+ val USER = "username"
+ val PASS = "password"
+
+ // STEP 2: Register JDBC driver
+ Class.forName(JDBC_DRIVER)
+ // STEP 3: Open a connection
+ DriverManager.getConnection(DB_URL, USER, PASS)
+ }
+
+ /**
+ * convert result set into scala list of map
+ * each map represents a row
+ *
+ * @param queryResult
+ * @return
+ */
+ private def convertResultSetToList(queryResult: ResultSet): List[Map[String, Any]] = {
+ val metadata = queryResult.getMetaData
+ val colNames = (1 to metadata.getColumnCount) map metadata.getColumnName
+ Iterator.continually(buildMapFromQueryResult(queryResult, colNames)).takeWhile(_.isDefined)
+ .map(_.get).toList
+ }
+
+ private def buildMapFromQueryResult(queryResult: ResultSet,
+ colNames: Seq[String]): Option[Map[String, Any]] = {
+ if (queryResult.next()) {
+ Some(colNames.map(name => name -> queryResult.getObject(name)).toMap)
+ }
+ else {
+ None
+ }
+ }
+
+ /**
+ * CreateSession will create a new session in the Server to connect and execute queries.
+ */
+ private def createSession: Session = {
+ logger.info("\n Creating The Presto Server Session")
+ Session.builder(new SessionPropertyManager)
+ .setQueryId(new QueryIdGenerator().createNextQueryId)
+ .setIdentity(new Identity("user", Optional.empty()))
+ .setSource(CARBONDATA_SOURCE).setCatalog(CARBONDATA_CATALOG)
+ .setTimeZoneKey(UTC_KEY).setLocale(Locale.ENGLISH)
+ .setRemoteUserAddress("address")
+ .setUserAgent("agent").build
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6727d75/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
new file mode 100644
index 0000000..6cb97f1
--- /dev/null
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package util
+
+import java.io._
+import java.nio.charset.Charset
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{ArrayList, Date, List, UUID}
+
+import scala.collection.JavaConversions._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{RecordReader, TaskType}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier,
+ReverseDictionary}
+import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.fileoperations.{AtomicFileOperations, AtomicFileOperationsImpl,
+FileWriteOperation}
+import org.apache.carbondata.core.metadata.converter.{SchemaConverter,
+ThriftWrapperSchemaConverterImpl}
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension,
+CarbonMeasure, ColumnSchema}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata,
+CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter,
+CarbonDictionarySortIndexWriterImpl, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
+import org.apache.carbondata.core.writer.{CarbonDictionaryWriter, CarbonDictionaryWriterImpl,
+ThriftWriter}
+import org.apache.carbondata.processing.api.dataloader.SchemaInfo
+import org.apache.carbondata.processing.constants.TableOptionConstant
+import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat,
+CSVRecordReaderIterator, StringArrayWritable}
+import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.newflow.DataLoadExecutor
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
+
+object CarbonDataStoreCreator {
+
+ private val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * Create store without any restructure
+ */
+ def createCarbonStore(storePath: String, dataFilePath: String): Unit = {
+ try {
+ logger.info("Creating The Carbon Store")
+ val dbName: String = "testdb"
+ val tableName: String = "testtable"
+ val absoluteTableIdentifier = new AbsoluteTableIdentifier(
+ storePath,
+ new CarbonTableIdentifier(dbName,
+ tableName,
+ UUID.randomUUID().toString))
+ val factFilePath: String = new File(dataFilePath).getCanonicalPath
+ val storeDir: File = new File(absoluteTableIdentifier.getStorePath)
+ CarbonUtil.deleteFoldersAndFiles(storeDir)
+ CarbonProperties.getInstance.addProperty(
+ CarbonCommonConstants.STORE_LOCATION_HDFS,
+ absoluteTableIdentifier.getStorePath)
+ val table: CarbonTable = createTable(absoluteTableIdentifier)
+ writeDictionary(factFilePath, table, absoluteTableIdentifier)
+ val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table)
+ val loadModel: CarbonLoadModel = new CarbonLoadModel()
+ val partitionId: String = "0"
+ loadModel.setCarbonDataLoadSchema(schema)
+ loadModel.setDatabaseName(
+ absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName)
+ loadModel.setTableName(
+ absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+ loadModel.setTableName(
+ absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+ loadModel.setFactFilePath(factFilePath)
+ loadModel.setLoadMetadataDetails(new ArrayList[LoadMetadataDetails]())
+ loadModel.setStorePath(absoluteTableIdentifier.getStorePath)
+ CarbonProperties.getInstance
+ .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING, "true")
+
+ loadModel.setDefaultTimestampFormat(
+ CarbonProperties.getInstance.getProperty(
+ CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+ loadModel.setDefaultDateFormat(
+ CarbonProperties.getInstance.getProperty(
+ CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+ loadModel.setSerializationNullFormat(
+ TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName +
+ "," +
+ "\\N")
+ loadModel.setBadRecordsLoggerEnable(
+ TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName +
+ "," +
+ "false")
+ loadModel.setBadRecordsAction(
+ TableOptionConstant.BAD_RECORDS_ACTION.getName + "," +
+ "force")
+ loadModel.setDirectLoad(true)
+ loadModel.setIsEmptyDataBadRecord(
+ DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD +
+ "," +
+ "true")
+ loadModel.setMaxColumns("15")
+ loadModel.setCsvHeader(
+ "ID,date,country,name,phonetype,serialname,salary,bonus,dob,shortField")
+ loadModel.setCsvHeaderColumns(loadModel.getCsvHeader.split(","))
+ loadModel.setTaskNo("0")
+ loadModel.setSegmentId("0")
+ loadModel.setPartitionId("0")
+ loadModel.setFactTimeStamp(System.currentTimeMillis())
+ loadModel.setMaxColumns("15")
+ executeGraph(loadModel, absoluteTableIdentifier.getStorePath)
+ } catch {
+ case e: Exception => e.printStackTrace()
+
+ }
+ }
+
+ private def createTable(absoluteTableIdentifier: AbsoluteTableIdentifier): CarbonTable = {
+ val tableInfo: TableInfo = new TableInfo()
+ tableInfo.setStorePath(absoluteTableIdentifier.getStorePath)
+ tableInfo.setDatabaseName(
+ absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName)
+ val tableSchema: TableSchema = new TableSchema()
+ tableSchema.setTableName(
+ absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+ val columnSchemas: List[ColumnSchema] = new ArrayList[ColumnSchema]()
+ val encodings: ArrayList[Encoding] = new ArrayList[Encoding]()
+ encodings.add(Encoding.INVERTED_INDEX)
+ val id: ColumnSchema = new ColumnSchema()
+ id.setColumnName("ID")
+ id.setColumnar(true)
+ id.setDataType(DataType.INT)
+ id.setEncodingList(encodings)
+ id.setColumnUniqueId(UUID.randomUUID().toString)
+ id.setColumnReferenceId(id.getColumnUniqueId)
+ id.setDimensionColumn(true)
+ id.setColumnGroup(1)
+ columnSchemas.add(id)
+
+ val dictEncoding: util.ArrayList[Encoding] = new util.ArrayList[Encoding]()
+ dictEncoding.add(Encoding.DIRECT_DICTIONARY)
+ dictEncoding.add(Encoding.DICTIONARY)
+ dictEncoding.add(Encoding.INVERTED_INDEX)
+
+ val date: ColumnSchema = new ColumnSchema()
+ date.setColumnName("date")
+ date.setColumnar(true)
+ date.setDataType(DataType.DATE)
+ date.setEncodingList(dictEncoding)
+ date.setColumnUniqueId(UUID.randomUUID().toString)
+ date.setDimensionColumn(true)
+ date.setColumnGroup(2)
+ date.setColumnReferenceId(date.getColumnUniqueId)
+ columnSchemas.add(date)
+
+ val country: ColumnSchema = new ColumnSchema()
+ country.setColumnName("country")
+ country.setColumnar(true)
+ country.setDataType(DataType.STRING)
+ country.setEncodingList(encodings)
+ country.setColumnUniqueId(UUID.randomUUID().toString)
+ country.setColumnReferenceId(country.getColumnUniqueId)
+ country.setDimensionColumn(true)
+ country.setColumnGroup(3)
+ country.setColumnReferenceId(country.getColumnUniqueId)
+ columnSchemas.add(country)
+
+ val name: ColumnSchema = new ColumnSchema()
+ name.setColumnName("name")
+ name.setColumnar(true)
+ name.setDataType(DataType.STRING)
+ name.setEncodingList(encodings)
+ name.setColumnUniqueId(UUID.randomUUID().toString)
+ name.setDimensionColumn(true)
+ name.setColumnGroup(4)
+ name.setColumnReferenceId(name.getColumnUniqueId)
+ columnSchemas.add(name)
+
+ val phonetype: ColumnSchema = new ColumnSchema()
+ phonetype.setColumnName("phonetype")
+ phonetype.setColumnar(true)
+ phonetype.setDataType(DataType.STRING)
+ phonetype.setEncodingList(encodings)
+ phonetype.setColumnUniqueId(UUID.randomUUID().toString)
+ phonetype.setDimensionColumn(true)
+ phonetype.setColumnGroup(5)
+ phonetype.setColumnReferenceId(phonetype.getColumnUniqueId)
+ columnSchemas.add(phonetype)
+
+ val serialname: ColumnSchema = new ColumnSchema()
+ serialname.setColumnName("serialname")
+ serialname.setColumnar(true)
+ serialname.setDataType(DataType.STRING)
+ serialname.setEncodingList(encodings)
+ serialname.setColumnUniqueId(UUID.randomUUID().toString)
+ serialname.setDimensionColumn(true)
+ serialname.setColumnGroup(6)
+ serialname.setColumnReferenceId(serialname.getColumnUniqueId)
+ columnSchemas.add(serialname)
+
+ val salary: ColumnSchema = new ColumnSchema()
+ salary.setColumnName("salary")
+ salary.setColumnar(true)
+ salary.setDataType(DataType.DOUBLE)
+ salary.setEncodingList(encodings)
+ salary.setColumnUniqueId(UUID.randomUUID().toString)
+ salary.setDimensionColumn(false)
+ salary.setColumnGroup(7)
+ salary.setColumnReferenceId(salary.getColumnUniqueId)
+ columnSchemas.add(salary)
+
+ val bonus: ColumnSchema = new ColumnSchema()
+ bonus.setColumnName("bonus")
+ bonus.setColumnar(true)
+ bonus.setDataType(DataType.DECIMAL)
+ bonus.setPrecision(10)
+ bonus.setScale(4)
+ bonus.setEncodingList(encodings)
+ bonus.setColumnUniqueId(UUID.randomUUID().toString)
+ bonus.setDimensionColumn(false)
+ bonus.setColumnGroup(8)
+ bonus.setColumnReferenceId(bonus.getColumnUniqueId)
+ columnSchemas.add(bonus)
+
+ val dob: ColumnSchema = new ColumnSchema()
+ dob.setColumnName("dob")
+ dob.setColumnar(true)
+ dob.setDataType(DataType.TIMESTAMP)
+ dob.setEncodingList(dictEncoding)
+ dob.setColumnUniqueId(UUID.randomUUID().toString)
+ dob.setDimensionColumn(true)
+ dob.setColumnGroup(9)
+ dob.setColumnReferenceId(dob.getColumnUniqueId)
+ columnSchemas.add(dob)
+
+ val shortField: ColumnSchema = new ColumnSchema()
+ shortField.setColumnName("shortField")
+ shortField.setColumnar(true)
+ shortField.setDataType(DataType.SHORT)
+ shortField.setEncodingList(encodings)
+ shortField.setColumnUniqueId(UUID.randomUUID().toString)
+ shortField.setDimensionColumn(false)
+ shortField.setColumnGroup(10)
+ shortField.setColumnReferenceId(shortField.getColumnUniqueId)
+ columnSchemas.add(shortField)
+
+ tableSchema.setListOfColumns(columnSchemas)
+ val schemaEvol: SchemaEvolution = new SchemaEvolution()
+ schemaEvol.setSchemaEvolutionEntryList(
+ new util.ArrayList[SchemaEvolutionEntry]())
+ tableSchema.setSchemaEvalution(schemaEvol)
+ tableSchema.setTableId(UUID.randomUUID().toString)
+ tableInfo.setTableUniqueName(
+ absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName +
+ "_" +
+ absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+ tableInfo.setLastUpdatedTime(System.currentTimeMillis())
+ tableInfo.setFactTable(tableSchema)
+ val carbonTablePath: CarbonTablePath = CarbonStorePath.getCarbonTablePath(
+ absoluteTableIdentifier.getStorePath,
+ absoluteTableIdentifier.getCarbonTableIdentifier)
+ val schemaFilePath: String = carbonTablePath.getSchemaFilePath
+ val schemaMetadataPath: String =
+ CarbonTablePath.getFolderContainingFile(schemaFilePath)
+ tableInfo.setMetaDataFilepath(schemaMetadataPath)
+ CarbonMetadata.getInstance.loadTableMetadata(tableInfo)
+ val schemaConverter: SchemaConverter =
+ new ThriftWrapperSchemaConverterImpl()
+ val thriftTableInfo: org.apache.carbondata.format.TableInfo =
+ schemaConverter.fromWrapperToExternalTableInfo(
+ tableInfo,
+ tableInfo.getDatabaseName,
+ tableInfo.getFactTable.getTableName)
+ val schemaEvolutionEntry: org.apache.carbondata.format.SchemaEvolutionEntry =
+ new org.apache.carbondata.format.SchemaEvolutionEntry(
+ tableInfo.getLastUpdatedTime)
+ thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
+ .add(schemaEvolutionEntry)
+ val fileType: FileFactory.FileType =
+ FileFactory.getFileType(schemaMetadataPath)
+ if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+ FileFactory.mkdirs(schemaMetadataPath, fileType)
+ }
+ val thriftWriter: ThriftWriter = new ThriftWriter(schemaFilePath, false)
+ thriftWriter.open()
+ thriftWriter.write(thriftTableInfo)
+ thriftWriter.close()
+ CarbonMetadata.getInstance.getCarbonTable(tableInfo.getTableUniqueName)
+ }
+
+ private def writeDictionary(factFilePath: String,
+ table: CarbonTable,
+ absoluteTableIdentifier: AbsoluteTableIdentifier): Unit = {
+ val reader: BufferedReader = new BufferedReader(
+ new FileReader(factFilePath))
+ val header: String = reader.readLine()
+ val split: Array[String] = header.split(",")
+ val allCols: util.List[CarbonColumn] = new util.ArrayList[CarbonColumn]()
+ val dims: util.List[CarbonDimension] =
+ table.getDimensionByTableName(table.getFactTableName)
+ allCols.addAll(dims)
+ val msrs: List[CarbonMeasure] =
+ table.getMeasureByTableName(table.getFactTableName)
+ allCols.addAll(msrs)
+ val set: Array[util.Set[String]] = Array.ofDim[util.Set[String]](dims.size)
+ for (i <- set.indices) {
+ set(i) = new util.HashSet[String]()
+ }
+ var line: String = reader.readLine()
+ while (line != null) {
+ val data: Array[String] = line.split(",")
+ for (i <- set.indices) {
+ set(i).add(data(i))
+ }
+ line = reader.readLine()
+ }
+ val dictCache: Cache[DictionaryColumnUniqueIdentifier, ReverseDictionary] = CacheProvider
+ .getInstance.createCache(CacheType.REVERSE_DICTIONARY,
+ absoluteTableIdentifier.getStorePath)
+ for (i <- set.indices) {
+ val columnIdentifier: ColumnIdentifier =
+ new ColumnIdentifier(dims.get(i).getColumnId, null, null)
+ val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier =
+ new DictionaryColumnUniqueIdentifier(
+ table.getCarbonTableIdentifier,
+ columnIdentifier,
+ columnIdentifier.getDataType,
+ CarbonStorePath.getCarbonTablePath(table.getStorePath,
+ table.getCarbonTableIdentifier)
+ )
+ val writer: CarbonDictionaryWriter = new CarbonDictionaryWriterImpl(
+ absoluteTableIdentifier.getStorePath,
+ absoluteTableIdentifier.getCarbonTableIdentifier,
+ dictionaryColumnUniqueIdentifier)
+ for (value <- set(i)) {
+ writer.write(value)
+ }
+ writer.close()
+ writer.commit()
+ val dict: Dictionary = dictCache
+ .get(
+ new DictionaryColumnUniqueIdentifier(
+ absoluteTableIdentifier.getCarbonTableIdentifier,
+ columnIdentifier,
+ dims.get(i).getDataType,
+ CarbonStorePath.getCarbonTablePath(table.getStorePath,
+ table.getCarbonTableIdentifier)
+ ))
+ .asInstanceOf[Dictionary]
+ val preparator: CarbonDictionarySortInfoPreparator =
+ new CarbonDictionarySortInfoPreparator()
+ val newDistinctValues: List[String] = new ArrayList[String]()
+ val dictionarySortInfo: CarbonDictionarySortInfo =
+ preparator.getDictionarySortInfo(newDistinctValues,
+ dict,
+ dims.get(i).getDataType)
+ val carbonDictionaryWriter: CarbonDictionarySortIndexWriter =
+ new CarbonDictionarySortIndexWriterImpl(
+ absoluteTableIdentifier.getCarbonTableIdentifier,
+ dictionaryColumnUniqueIdentifier,
+ absoluteTableIdentifier.getStorePath)
+ try {
+ carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
+ carbonDictionaryWriter.writeInvertedSortIndex(
+ dictionarySortInfo.getSortIndexInverted)
+ }
+ catch {
+ case exception: Exception => logger.error(s"exception occurs $exception")
+ }
+ finally carbonDictionaryWriter.close()
+ }
+ reader.close()
+ }
+
+ /**
+ * Execute graph which will further load data
+ *
+ * @param loadModel
+ * @param storeLocation
+ * @throws Exception
+ */
+ private def executeGraph(loadModel: CarbonLoadModel, storeLocation: String): Unit = {
+ new File(storeLocation).mkdirs()
+ val outPutLoc: String = storeLocation + "/etl"
+ val databaseName: String = loadModel.getDatabaseName
+ val tableName: String = loadModel.getTableName
+ val tempLocationKey: String = databaseName + '_' + tableName + "_1"
+ CarbonProperties.getInstance.addProperty(tempLocationKey, storeLocation)
+ CarbonProperties.getInstance
+ .addProperty("store_output_location", outPutLoc)
+ CarbonProperties.getInstance.addProperty("send.signal.load", "false")
+ CarbonProperties.getInstance
+ .addProperty("carbon.is.columnar.storage", "true")
+ CarbonProperties.getInstance
+ .addProperty("carbon.dimension.split.value.in.columnar", "1")
+ CarbonProperties.getInstance
+ .addProperty("carbon.is.fullyfilled.bits", "true")
+ CarbonProperties.getInstance.addProperty("is.int.based.indexer", "true")
+ CarbonProperties.getInstance
+ .addProperty("aggregate.columnar.keyblock", "true")
+ CarbonProperties.getInstance
+ .addProperty("high.cardinality.value", "100000")
+ CarbonProperties.getInstance.addProperty("is.compressed.keyblock", "false")
+ CarbonProperties.getInstance.addProperty("carbon.leaf.node.size", "120000")
+ CarbonProperties.getInstance
+ .addProperty("carbon.direct.dictionary", "true")
+ val graphPath: String = outPutLoc + File.separator + loadModel.getDatabaseName +
+ File.separator +
+ tableName +
+ File.separator +
+ 0 +
+ File.separator +
+ 1 +
+ File.separator +
+ tableName +
+ ".ktr"
+ val path: File = new File(graphPath)
+ if (path.exists()) {
+ path.delete()
+ }
+ val info: SchemaInfo = new SchemaInfo()
+ val blockDetails: BlockDetails = new BlockDetails(
+ new Path(loadModel.getFactFilePath),
+ 0,
+ new File(loadModel.getFactFilePath).length,
+ Array("localhost"))
+ val configuration: Configuration = new Configuration()
+ CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar)
+ CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter)
+ CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar)
+ CSVInputFormat.setHeaderExtractionEnabled(configuration, true)
+ CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar)
+ CSVInputFormat.setReadBufferSize(
+ configuration,
+ CarbonProperties.getInstance.getProperty(
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT))
+ CSVInputFormat.setNumberOfColumns(
+ configuration,
+ String.valueOf(loadModel.getCsvHeaderColumns.length))
+ CSVInputFormat.setMaxColumns(configuration, "15")
+ val hadoopAttemptContext: TaskAttemptContextImpl =
+ new TaskAttemptContextImpl(configuration,
+ new TaskAttemptID("", 1, TaskType.MAP, 0, 0))
+ val format: CSVInputFormat = new CSVInputFormat()
+ val recordReader: RecordReader[NullWritable, StringArrayWritable] =
+ format.createRecordReader(blockDetails, hadoopAttemptContext)
+ val readerIterator: CSVRecordReaderIterator = new CSVRecordReaderIterator(
+ recordReader,
+ blockDetails,
+ hadoopAttemptContext)
+ new DataLoadExecutor()
+ .execute(loadModel, Array(storeLocation), Array(readerIterator))
+ info.setDatabaseName(databaseName)
+ info.setTableName(tableName)
+ writeLoadMetadata(loadModel.getCarbonDataLoadSchema,
+ loadModel.getTableName,
+ loadModel.getTableName,
+ new ArrayList[LoadMetadataDetails]())
+ val segLocation: String = storeLocation + "/" + databaseName + "/" + tableName +
+ "/Fact/Part0/Segment_0"
+ val file: File = new File(segLocation)
+ var factFile: File = null
+ val folderList: Array[File] = file.listFiles()
+ var folder: File = null
+ for (i <- folderList.indices if folderList(i).isDirectory) {
+ folder = folderList(i)
+ }
+ if (folder.isDirectory) {
+ val files: Array[File] = folder.listFiles()
+ for (i <- files.indices
+ if !files(i).isDirectory && files(i).getName.startsWith("part")) {
+ factFile = files(i)
+ //break
+ }
+ factFile.renameTo(new File(segLocation + "/" + factFile.getName))
+ CarbonUtil.deleteFoldersAndFiles(folder)
+ }
+ }
+
+ private def writeLoadMetadata(
+ schema: CarbonDataLoadSchema,
+ databaseName: String,
+ tableName: String,
+ listOfLoadFolderDetails: util.List[LoadMetadataDetails]): Unit = {
+ try {
+ val loadMetadataDetails: LoadMetadataDetails = new LoadMetadataDetails()
+ loadMetadataDetails.setLoadEndTime(System.currentTimeMillis())
+ loadMetadataDetails.setLoadStatus("SUCCESS")
+ loadMetadataDetails.setLoadName(String.valueOf(0))
+ loadMetadataDetails.setLoadStartTime(
+ loadMetadataDetails.getTimeStamp(readCurrentTime()))
+ listOfLoadFolderDetails.add(loadMetadataDetails)
+ val dataLoadLocation: String = schema.getCarbonTable.getMetaDataFilepath + File.separator +
+ CarbonCommonConstants.LOADMETADATA_FILENAME
+ val gsonObjectToWrite: Gson = new Gson()
+ val writeOperation: AtomicFileOperations = new AtomicFileOperationsImpl(
+ dataLoadLocation,
+ FileFactory.getFileType(dataLoadLocation))
+ val dataOutputStream =
+ writeOperation.openForWrite(FileWriteOperation.OVERWRITE)
+ val brWriter = new BufferedWriter(
+ new OutputStreamWriter(
+ dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)))
+ val metadataInstance: String =
+ gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray())
+ brWriter.write(metadataInstance)
+ if (Option(brWriter).isDefined) {
+ brWriter.flush()
+ }
+ CarbonUtil.closeStreams(brWriter)
+ writeOperation.close()
+ }
+ catch {
+ case exception: Exception => logger.error(s"Exception occurs $exception")
+ }
+ }
+
+ private def readCurrentTime(): String = {
+ val sdf: SimpleDateFormat = new SimpleDateFormat(
+ CarbonCommonConstants.CARBON_TIMESTAMP)
+ sdf.format(new Date())
+ }
+
+}
+