You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by ravipesala <gi...@git.apache.org> on 2018/08/22 06:04:20 UTC

[GitHub] carbondata pull request #2647: [WIP] Added Spark FileFormat interface implem...

GitHub user ravipesala opened a pull request:

    https://github.com/apache/carbondata/pull/2647

    [WIP] Added Spark FileFormat interface implementation in Carbon

    Be sure to do all of the following checklist to help us incorporate 
    your contribution quickly and easily:
    
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
    
     - [ ] Testing done
            Please provide details on 
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ravipesala/incubator-carbondata spark-fileformat

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2647.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2647
    
----
commit 4df28b3ac90fbd26ff674c10dd4ae3d3a95f956a
Author: ravipesala <ra...@...>
Date:   2018-08-22T06:02:21Z

    Added Spark FileFormat interface implementation in Carbon

----


---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6690/



---

[GitHub] carbondata pull request #2647: [WIP] Added Spark FileFormat interface implem...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r211872774
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
    @@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
        * @param path carbon file path
        */
       public LatestFilesReadCommittedScope(String path) {
    -    this(path, null);
    +    this(path, (String) null);
    +  }
    +
    +  /**
    +   * a new constructor with path
    +   *
    +   * @param path carbon file path
    +   */
    --- End diff --
    
    missing parameter doc


---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212250447
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
    @@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
        * @param path carbon file path
        */
       public LatestFilesReadCommittedScope(String path) {
    -    this(path, null);
    +    this(path, (String) null);
    +  }
    +
    +  /**
    +   * a new constructor with path
    +   *
    +   * @param path carbon file path
    +   */
    +  public LatestFilesReadCommittedScope(String path, String[] subFolders) {
    +    Objects.requireNonNull(path);
    +    this.carbonFilePath = path;
    +    this.subFolders = subFolders;
    +    try {
    +      takeCarbonIndexFileSnapShot();
    +    } catch (IOException ex) {
    +      throw new RuntimeException("Error while taking index snapshot", ex);
    --- End diff --
    
    Not required handling here, so removed.


---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212305587
  
    --- Diff: integration/spark-datasource/pom.xml ---
    @@ -0,0 +1,279 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +    Licensed to the Apache Software Foundation (ASF) under one or more
    +    contributor license agreements.  See the NOTICE file distributed with
    +    this work for additional information regarding copyright ownership.
    +    The ASF licenses this file to You under the Apache License, Version 2.0
    +    (the "License"); you may not use this file except in compliance with
    +    the License.  You may obtain a copy of the License at
    +
    +       http://www.apache.org/licenses/LICENSE-2.0
    +
    +    Unless required by applicable law or agreed to in writing, software
    +    distributed under the License is distributed on an "AS IS" BASIS,
    +    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +    See the License for the specific language governing permissions and
    +    limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +
    +  <modelVersion>4.0.0</modelVersion>
    +
    +  <parent>
    +    <groupId>org.apache.carbondata</groupId>
    +    <artifactId>carbondata-parent</artifactId>
    +    <version>1.5.0-SNAPSHOT</version>
    +    <relativePath>../../pom.xml</relativePath>
    +  </parent>
    +
    +  <artifactId>carbondata-spark-datasource</artifactId>
    +  <name>Apache CarbonData :: Spark Datasource</name>
    +
    +  <properties>
    +    <dev.path>${basedir}/../../dev</dev.path>
    +    <jacoco.append>true</jacoco.append>
    +  </properties>
    +
    +  <dependencies>
    +    <dependency>
    +      <groupId>org.apache.carbondata</groupId>
    +      <artifactId>carbondata-hadoop</artifactId>
    +      <version>${project.version}</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.carbondata</groupId>
    +      <artifactId>carbondata-store-sdk</artifactId>
    +      <version>${project.version}</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-repl_${scala.binary.version}</artifactId>
    +    </dependency>
    +    <dependency>
    +      <groupId>junit</groupId>
    +      <artifactId>junit</artifactId>
    +      <scope>test</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.scalatest</groupId>
    +      <artifactId>scalatest_${scala.binary.version}</artifactId>
    +      <scope>test</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.hadoop</groupId>
    +      <artifactId>hadoop-aws</artifactId>
    +      <version>${hadoop.version}</version>
    +      <exclusions>
    +        <exclusion>
    +          <groupId>com.fasterxml.jackson.core</groupId>
    +          <artifactId>jackson-core</artifactId>
    +        </exclusion>
    +        <exclusion>
    +          <groupId>com.fasterxml.jackson.core</groupId>
    +          <artifactId>jackson-annotations</artifactId>
    +        </exclusion>
    +        <exclusion>
    +          <groupId>com.fasterxml.jackson.core</groupId>
    +          <artifactId>jackson-databind</artifactId>
    +        </exclusion>
    +      </exclusions>
    +    </dependency>
    +  </dependencies>
    +
    +  <build>
    +    <testSourceDirectory>src/test/scala</testSourceDirectory>
    +    <resources>
    +      <resource>
    +        <directory>src/resources</directory>
    +      </resource>
    +      <resource>
    +        <directory>.</directory>
    +        <includes>
    +          <include>CARBON_SPARK_INTERFACELogResource.properties</include>
    +        </includes>
    +      </resource>
    +    </resources>
    +    <plugins>
    +      <plugin>
    +        <groupId>org.scala-tools</groupId>
    +        <artifactId>maven-scala-plugin</artifactId>
    +        <version>2.15.2</version>
    +        <executions>
    +          <execution>
    +            <id>compile</id>
    +            <goals>
    +              <goal>compile</goal>
    +            </goals>
    +            <phase>compile</phase>
    +          </execution>
    +          <execution>
    +            <id>testCompile</id>
    +            <goals>
    +              <goal>testCompile</goal>
    +            </goals>
    +            <phase>test</phase>
    +          </execution>
    +          <execution>
    +            <phase>process-resources</phase>
    +            <goals>
    +              <goal>compile</goal>
    +            </goals>
    +          </execution>
    +        </executions>
    +      </plugin>
    +      <plugin>
    +        <artifactId>maven-compiler-plugin</artifactId>
    +        <configuration>
    +          <source>1.7</source>
    +          <target>1.7</target>
    +        </configuration>
    +      </plugin>
    +      <plugin>
    +        <groupId>org.apache.maven.plugins</groupId>
    +        <artifactId>maven-surefire-plugin</artifactId>
    +        <version>2.18</version>
    +        <!-- Note config is repeated in scalatest config -->
    +        <configuration>
    +          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
    +          <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
    +          <systemProperties>
    +            <java.awt.headless>true</java.awt.headless>
    +            <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
    +          </systemProperties>
    +          <failIfNoTests>false</failIfNoTests>
    +        </configuration>
    +      </plugin>
    +      <plugin>
    +        <groupId>org.scalatest</groupId>
    +        <artifactId>scalatest-maven-plugin</artifactId>
    +        <version>1.0</version>
    +        <!-- Note config is repeated in surefire config -->
    +        <configuration>
    +          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
    +          <junitxml>.</junitxml>
    +          <filereports>CarbonTestSuite.txt</filereports>
    +          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
    +          </argLine>
    +          <stderr />
    +          <environmentVariables>
    +          </environmentVariables>
    +          <systemProperties>
    +            <java.awt.headless>true</java.awt.headless>
    +            <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
    +          </systemProperties>
    +        </configuration>
    +        <executions>
    +          <execution>
    +            <id>test</id>
    +            <goals>
    +              <goal>test</goal>
    +            </goals>
    +          </execution>
    +        </executions>
    +      </plugin>
    +    </plugins>
    +  </build>
    +  <profiles>
    +    <profile>
    +      <id>build-all</id>
    +      <properties>
    +        <spark.version>2.2.1</spark.version>
    +        <scala.binary.version>2.11</scala.binary.version>
    +        <scala.version>2.11.8</scala.version>
    +      </properties>
    +    </profile>
    +    <profile>
    +      <id>sdvtest</id>
    +      <properties>
    +        <maven.test.skip>true</maven.test.skip>
    +      </properties>
    +    </profile>
    +    <profile>
    +      <id>spark-2.1</id>
    +      <properties>
    +        <spark.version>2.1.0</spark.version>
    +        <scala.binary.version>2.11</scala.binary.version>
    +        <scala.version>2.11.8</scala.version>
    +      </properties>
    +      <build>
    +        <plugins>
    +          <plugin>
    +            <groupId>org.apache.maven.plugins</groupId>
    +            <artifactId>maven-compiler-plugin</artifactId>
    +            <configuration>
    +              <excludes>
    +                <exclude>src/main/spark2.2</exclude>
    +              </excludes>
    +            </configuration>
    +          </plugin>
    +          <plugin>
    +            <groupId>org.codehaus.mojo</groupId>
    +            <artifactId>build-helper-maven-plugin</artifactId>
    +            <version>3.0.0</version>
    +            <executions>
    +              <execution>
    +                <id>add-source</id>
    +                <phase>generate-sources</phase>
    +                <goals>
    +                  <goal>add-source</goal>
    +                </goals>
    +                <configuration>
    +                  <sources>
    +                    <source>src/main/spark2.1</source>
    +                  </sources>
    +                </configuration>
    +              </execution>
    +            </executions>
    +          </plugin>
    +        </plugins>
    +      </build>
    +    </profile>
    +    <profile>
    +      <id>spark-2.2</id>
    +      <activation>
    +        <activeByDefault>true</activeByDefault>
    +      </activation>
    +      <properties>
    +        <spark.version>2.2.1</spark.version>
    +        <scala.binary.version>2.11</scala.binary.version>
    +        <scala.version>2.11.8</scala.version>
    +      </properties>
    +      <build>
    --- End diff --
    
    I think profiles are not required


---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6336/



---

[GitHub] carbondata pull request #2647: [WIP] Added Spark FileFormat interface implem...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r211872899
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
    @@ -36,12 +36,14 @@
      */
     @InterfaceAudience.Internal
     @InterfaceStability.Stable
    -public class LatestFilesReadCommittedScope implements ReadCommittedScope {
    +public class LatestFilesReadCommittedScope
    +    implements ReadCommittedScope {
    --- End diff --
    
    no need to modify


---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212250488
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
    @@ -36,12 +36,14 @@
      */
     @InterfaceAudience.Internal
     @InterfaceStability.Stable
    -public class LatestFilesReadCommittedScope implements ReadCommittedScope {
    +public class LatestFilesReadCommittedScope
    +    implements ReadCommittedScope {
    --- End diff --
    
    ok


---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6319/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7975/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7999/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6699/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6316/



---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212317536
  
    --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.carbondata.execution.datasources
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.metadata.datatype
    +import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
    +import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
    +import org.apache.carbondata.core.scan.expression.conditional._
    +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
    +
    +object CarbonSparkDataSourceUtil {
    +
    +  /**
    +   * Convert from carbon datatype to sparks datatype
    +   */
    +  def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
    +    if (CarbonDataTypes.isDecimal(dataType)) {
    +      DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
    +        dataType.asInstanceOf[CarbonDecimalType].getScale)
    +    } else {
    +      dataType match {
    +        case CarbonDataTypes.STRING => StringType
    +        case CarbonDataTypes.SHORT => ShortType
    +        case CarbonDataTypes.INT => IntegerType
    +        case CarbonDataTypes.LONG => LongType
    +        case CarbonDataTypes.DOUBLE => DoubleType
    +        case CarbonDataTypes.BOOLEAN => BooleanType
    +        case CarbonDataTypes.TIMESTAMP => TimestampType
    +        case CarbonDataTypes.DATE => DateType
    +        case CarbonDataTypes.VARCHAR => StringType
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Convert from sparks datatype to carbon datatype
    +   */
    +  def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
    +    dataType match {
    +      case StringType => CarbonDataTypes.STRING
    +      case ShortType => CarbonDataTypes.SHORT
    +      case IntegerType => CarbonDataTypes.INT
    +      case LongType => CarbonDataTypes.LONG
    +      case DoubleType => CarbonDataTypes.DOUBLE
    +      case FloatType => CarbonDataTypes.FLOAT
    +      case DateType => CarbonDataTypes.DATE
    +      case BooleanType => CarbonDataTypes.BOOLEAN
    +      case TimestampType => CarbonDataTypes.TIMESTAMP
    +      case ArrayType(elementType, _) =>
    +        CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
    +      case StructType(fields) =>
    +        val carbonFields = new java.util.ArrayList[CarbonStructField]
    +        fields.map { field =>
    +          carbonFields.add(
    +            new CarbonStructField(
    +              field.name,
    +              convertSparkToCarbonDataType(field.dataType)))
    +        }
    +        CarbonDataTypes.createStructType(carbonFields)
    +      case NullType => CarbonDataTypes.NULL
    +      case decimal: DecimalType =>
    +        CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
    +      case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
    +    }
    +  }
    +
    +  /**
    +   * Converts data sources filters to carbon filter predicates.
    +   */
    +  def createCarbonFilter(schema: StructType,
    +      predicate: sources.Filter): Option[CarbonExpression] = {
    +    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
    +
    +    def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
    +      predicate match {
    +
    +        case sources.EqualTo(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualTo(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.EqualNullSafe(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualNullSafe(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThan(name, value) =>
    +          Some(new GreaterThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThan(name, value) =>
    +          Some(new LessThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThanOrEqual(name, value) =>
    +          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThanOrEqual(name, value) =>
    +          Some(new LessThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.In(name, values) =>
    +          if (values.length == 1 && values(0) == null) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new InExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.filterNot(_ == null)
    +                  .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
    +          }
    +        case sources.Not(sources.In(name, values)) =>
    +          if (values.contains(null)) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new NotInExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
    +          }
    +        case sources.IsNull(name) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.IsNotNull(name) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.And(lhs, rhs) =>
    +          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
    +        case sources.Or(lhs, rhs) =>
    +          for {
    +            lhsFilter <- createFilter(lhs)
    +            rhsFilter <- createFilter(rhs)
    +          } yield {
    +            new OrExpression(lhsFilter, rhsFilter)
    +          }
    +        case sources.StringStartsWith(name, value) if value.length > 0 =>
    +          Some(new StartsWithExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case _ => None
    +      }
    +    }
    +
    +    def getCarbonExpression(name: String) = {
    +      new CarbonColumnExpression(name,
    +        CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
    +    }
    +
    +    def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
    +      val dataTypeOfAttribute =
    +        CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
    +      val dataType =
    +        if (Option(value).isDefined &&
    +            dataTypeOfAttribute == CarbonDataTypes.STRING &&
    +            value.isInstanceOf[Double]) {
    +        CarbonDataTypes.DOUBLE
    +      } else {
    +        dataTypeOfAttribute
    +      }
    +      new CarbonLiteralExpression(value, dataType)
    +    }
    +
    +    createFilter(predicate)
    +  }
    +
    +  // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
    +  // not able find the classes inside scala list and gives ClassNotFoundException.
    +  private def convertToJavaList(
    +      scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
    +    val javaList = new java.util.ArrayList[CarbonExpression]()
    +    scalaList.foreach(javaList.add)
    +    javaList
    +  }
    +
    +  /**
    +   * Create load model for carbon
    +   */
    +  def prepareLoadModel(options: Map[String, String],
    +      dataSchema: StructType): CarbonLoadModel = {
    +    val schema = new Schema(dataSchema.fields.map { field =>
    +      field.dataType match {
    +        case s: StructType =>
    +          new Field(field.name,
    +            field.dataType.typeName,
    +            s.fields
    +              .map(f => new datatype.StructField(f.name,
    +                CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
    +        case a: ArrayType =>
    +          new Field(field.name,
    +            field.dataType.typeName,
    +            Seq(new datatype.StructField(field.name,
    +              CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
    +        case other =>
    +          new Field(field.name, field.dataType.simpleString)
    +      }
    +    })
    +    val builder = new CarbonWriterBuilder
    +    builder.isTransactionalTable(false)
    +    builder.outputPath(options.getOrElse("path", ""))
    +    val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
    +    if (blockSize.isDefined) {
    +      builder.withBlockSize(blockSize.get)
    +    }
    +    val blockletSize = options.get("table_blockletsize").map(_.toInt)
    +    if (blockletSize.isDefined) {
    +      builder.withBlockletSize(blockletSize.get)
    +    }
    +    builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
    +      CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean)
    +    builder.localDictionaryThreshold(
    +      options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
    +        CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
    +    builder.sortBy(
    +      options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
    +    builder.isTransactionalTable(false)
    --- End diff --
    
    duplicate line


---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7985/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6351/



---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212250560
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
    @@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
        * @param path carbon file path
        */
       public LatestFilesReadCommittedScope(String path) {
    -    this(path, null);
    +    this(path, (String) null);
    +  }
    +
    +  /**
    +   * a new constructor with path
    +   *
    +   * @param path carbon file path
    +   */
    +  public LatestFilesReadCommittedScope(String path, String[] subFolders) {
    +    Objects.requireNonNull(path);
    +    this.carbonFilePath = path;
    +    this.subFolders = subFolders;
    --- End diff --
    
    ok


---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212318460
  
    --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.carbondata.execution.datasources
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.metadata.datatype
    +import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
    +import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
    +import org.apache.carbondata.core.scan.expression.conditional._
    +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
    +
    +object CarbonSparkDataSourceUtil {
    +
    +  /**
    +   * Convert from carbon datatype to sparks datatype
    +   */
    +  def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
    +    if (CarbonDataTypes.isDecimal(dataType)) {
    +      DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
    +        dataType.asInstanceOf[CarbonDecimalType].getScale)
    +    } else {
    +      dataType match {
    +        case CarbonDataTypes.STRING => StringType
    +        case CarbonDataTypes.SHORT => ShortType
    +        case CarbonDataTypes.INT => IntegerType
    +        case CarbonDataTypes.LONG => LongType
    +        case CarbonDataTypes.DOUBLE => DoubleType
    +        case CarbonDataTypes.BOOLEAN => BooleanType
    +        case CarbonDataTypes.TIMESTAMP => TimestampType
    +        case CarbonDataTypes.DATE => DateType
    +        case CarbonDataTypes.VARCHAR => StringType
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Convert from sparks datatype to carbon datatype
    +   */
    +  def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
    +    dataType match {
    +      case StringType => CarbonDataTypes.STRING
    +      case ShortType => CarbonDataTypes.SHORT
    +      case IntegerType => CarbonDataTypes.INT
    +      case LongType => CarbonDataTypes.LONG
    +      case DoubleType => CarbonDataTypes.DOUBLE
    +      case FloatType => CarbonDataTypes.FLOAT
    +      case DateType => CarbonDataTypes.DATE
    +      case BooleanType => CarbonDataTypes.BOOLEAN
    +      case TimestampType => CarbonDataTypes.TIMESTAMP
    +      case ArrayType(elementType, _) =>
    +        CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
    +      case StructType(fields) =>
    +        val carbonFields = new java.util.ArrayList[CarbonStructField]
    +        fields.map { field =>
    +          carbonFields.add(
    +            new CarbonStructField(
    +              field.name,
    +              convertSparkToCarbonDataType(field.dataType)))
    +        }
    +        CarbonDataTypes.createStructType(carbonFields)
    +      case NullType => CarbonDataTypes.NULL
    +      case decimal: DecimalType =>
    +        CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
    +      case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
    +    }
    +  }
    +
    +  /**
    +   * Converts data sources filters to carbon filter predicates.
    +   */
    +  def createCarbonFilter(schema: StructType,
    +      predicate: sources.Filter): Option[CarbonExpression] = {
    +    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
    +
    +    def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
    +      predicate match {
    +
    +        case sources.EqualTo(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualTo(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.EqualNullSafe(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualNullSafe(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThan(name, value) =>
    +          Some(new GreaterThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThan(name, value) =>
    +          Some(new LessThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThanOrEqual(name, value) =>
    +          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThanOrEqual(name, value) =>
    +          Some(new LessThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.In(name, values) =>
    +          if (values.length == 1 && values(0) == null) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new InExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.filterNot(_ == null)
    +                  .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
    +          }
    +        case sources.Not(sources.In(name, values)) =>
    +          if (values.contains(null)) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new NotInExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
    +          }
    +        case sources.IsNull(name) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.IsNotNull(name) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.And(lhs, rhs) =>
    +          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
    +        case sources.Or(lhs, rhs) =>
    +          for {
    +            lhsFilter <- createFilter(lhs)
    +            rhsFilter <- createFilter(rhs)
    +          } yield {
    +            new OrExpression(lhsFilter, rhsFilter)
    +          }
    +        case sources.StringStartsWith(name, value) if value.length > 0 =>
    +          Some(new StartsWithExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case _ => None
    +      }
    +    }
    +
    +    def getCarbonExpression(name: String) = {
    +      new CarbonColumnExpression(name,
    +        CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
    +    }
    +
    +    def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
    +      val dataTypeOfAttribute =
    +        CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
    +      val dataType =
    +        if (Option(value).isDefined &&
    +            dataTypeOfAttribute == CarbonDataTypes.STRING &&
    +            value.isInstanceOf[Double]) {
    +        CarbonDataTypes.DOUBLE
    +      } else {
    +        dataTypeOfAttribute
    +      }
    +      new CarbonLiteralExpression(value, dataType)
    +    }
    +
    +    createFilter(predicate)
    +  }
    +
    +  // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
    +  // not able find the classes inside scala list and gives ClassNotFoundException.
    +  private def convertToJavaList(
    +      scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
    +    val javaList = new java.util.ArrayList[CarbonExpression]()
    +    scalaList.foreach(javaList.add)
    +    javaList
    +  }
    +
    +  /**
    +   * Create load model for carbon
    +   */
    +  def prepareLoadModel(options: Map[String, String],
    +      dataSchema: StructType): CarbonLoadModel = {
    +    val schema = new Schema(dataSchema.fields.map { field =>
    +      field.dataType match {
    +        case s: StructType =>
    +          new Field(field.name,
    +            field.dataType.typeName,
    +            s.fields
    +              .map(f => new datatype.StructField(f.name,
    +                CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
    +        case a: ArrayType =>
    +          new Field(field.name,
    +            field.dataType.typeName,
    +            Seq(new datatype.StructField(field.name,
    +              CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
    +        case other =>
    +          new Field(field.name, field.dataType.simpleString)
    +      }
    +    })
    +    val builder = new CarbonWriterBuilder
    +    builder.isTransactionalTable(false)
    +    builder.outputPath(options.getOrElse("path", ""))
    +    val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
    +    if (blockSize.isDefined) {
    +      builder.withBlockSize(blockSize.get)
    +    }
    +    val blockletSize = options.get("table_blockletsize").map(_.toInt)
    +    if (blockletSize.isDefined) {
    +      builder.withBlockletSize(blockletSize.get)
    +    }
    +    builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
    +      CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean)
    +    builder.localDictionaryThreshold(
    +      options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
    +        CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
    +    builder.sortBy(
    +      options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
    +    builder.isTransactionalTable(false)
    +    builder.uniqueIdentifier(System.currentTimeMillis())
    --- End diff --
    
    Support column_meta_cache, cache_level also


---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212333785
  
    --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.carbondata.execution.datasources.readsupport
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.types.StructType
    +
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
    +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport
    +
    +/**
    + * Read support class which converts carbon row array format to sparks Internal row.
    + */
    +class SparkUnsafeRowReadSuport(requiredSchema: StructType)
    --- End diff --
    
    Directly implement CarbonReadSupport


---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6384/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7983/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6326/



---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212348045
  
    --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.carbondata.execution.datasources
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.metadata.datatype
    +import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
    +import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
    +import org.apache.carbondata.core.scan.expression.conditional._
    +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
    +
    +object CarbonSparkDataSourceUtil {
    +
    +  /**
    +   * Convert from carbon datatype to sparks datatype
    +   */
    +  def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
    +    if (CarbonDataTypes.isDecimal(dataType)) {
    +      DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
    +        dataType.asInstanceOf[CarbonDecimalType].getScale)
    +    } else {
    +      dataType match {
    +        case CarbonDataTypes.STRING => StringType
    +        case CarbonDataTypes.SHORT => ShortType
    +        case CarbonDataTypes.INT => IntegerType
    +        case CarbonDataTypes.LONG => LongType
    +        case CarbonDataTypes.DOUBLE => DoubleType
    +        case CarbonDataTypes.BOOLEAN => BooleanType
    +        case CarbonDataTypes.TIMESTAMP => TimestampType
    +        case CarbonDataTypes.DATE => DateType
    +        case CarbonDataTypes.VARCHAR => StringType
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Convert from sparks datatype to carbon datatype
    +   */
    +  def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
    +    dataType match {
    +      case StringType => CarbonDataTypes.STRING
    +      case ShortType => CarbonDataTypes.SHORT
    +      case IntegerType => CarbonDataTypes.INT
    +      case LongType => CarbonDataTypes.LONG
    +      case DoubleType => CarbonDataTypes.DOUBLE
    +      case FloatType => CarbonDataTypes.FLOAT
    +      case DateType => CarbonDataTypes.DATE
    +      case BooleanType => CarbonDataTypes.BOOLEAN
    +      case TimestampType => CarbonDataTypes.TIMESTAMP
    +      case ArrayType(elementType, _) =>
    +        CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
    +      case StructType(fields) =>
    +        val carbonFields = new java.util.ArrayList[CarbonStructField]
    +        fields.map { field =>
    +          carbonFields.add(
    +            new CarbonStructField(
    +              field.name,
    +              convertSparkToCarbonDataType(field.dataType)))
    +        }
    +        CarbonDataTypes.createStructType(carbonFields)
    +      case NullType => CarbonDataTypes.NULL
    +      case decimal: DecimalType =>
    +        CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
    +      case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
    +    }
    +  }
    +
    +  /**
    +   * Converts data sources filters to carbon filter predicates.
    +   */
    +  def createCarbonFilter(schema: StructType,
    +      predicate: sources.Filter): Option[CarbonExpression] = {
    +    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
    +
    +    def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
    +      predicate match {
    +
    +        case sources.EqualTo(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualTo(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.EqualNullSafe(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualNullSafe(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThan(name, value) =>
    +          Some(new GreaterThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThan(name, value) =>
    +          Some(new LessThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThanOrEqual(name, value) =>
    +          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThanOrEqual(name, value) =>
    +          Some(new LessThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.In(name, values) =>
    +          if (values.length == 1 && values(0) == null) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new InExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.filterNot(_ == null)
    +                  .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
    +          }
    +        case sources.Not(sources.In(name, values)) =>
    +          if (values.contains(null)) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new NotInExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
    +          }
    +        case sources.IsNull(name) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.IsNotNull(name) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.And(lhs, rhs) =>
    +          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
    +        case sources.Or(lhs, rhs) =>
    +          for {
    +            lhsFilter <- createFilter(lhs)
    +            rhsFilter <- createFilter(rhs)
    +          } yield {
    +            new OrExpression(lhsFilter, rhsFilter)
    +          }
    +        case sources.StringStartsWith(name, value) if value.length > 0 =>
    +          Some(new StartsWithExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case _ => None
    +      }
    +    }
    +
    +    def getCarbonExpression(name: String) = {
    +      new CarbonColumnExpression(name,
    +        CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
    +    }
    +
    +    def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
    +      val dataTypeOfAttribute =
    +        CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
    +      val dataType =
    +        if (Option(value).isDefined &&
    +            dataTypeOfAttribute == CarbonDataTypes.STRING &&
    +            value.isInstanceOf[Double]) {
    +        CarbonDataTypes.DOUBLE
    +      } else {
    +        dataTypeOfAttribute
    +      }
    +      new CarbonLiteralExpression(value, dataType)
    +    }
    +
    +    createFilter(predicate)
    +  }
    +
    +  // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
    +  // not able find the classes inside scala list and gives ClassNotFoundException.
    +  private def convertToJavaList(
    +      scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
    +    val javaList = new java.util.ArrayList[CarbonExpression]()
    +    scalaList.foreach(javaList.add)
    +    javaList
    +  }
    +
    +  /**
    +   * Create load model for carbon
    +   */
    +  def prepareLoadModel(options: Map[String, String],
    +      dataSchema: StructType): CarbonLoadModel = {
    +    val schema = new Schema(dataSchema.fields.map { field =>
    +      field.dataType match {
    +        case s: StructType =>
    +          new Field(field.name,
    +            field.dataType.typeName,
    +            s.fields
    +              .map(f => new datatype.StructField(f.name,
    +                CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
    +        case a: ArrayType =>
    +          new Field(field.name,
    +            field.dataType.typeName,
    +            Seq(new datatype.StructField(field.name,
    +              CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
    +        case other =>
    +          new Field(field.name, field.dataType.simpleString)
    +      }
    +    })
    +    val builder = new CarbonWriterBuilder
    +    builder.isTransactionalTable(false)
    +    builder.outputPath(options.getOrElse("path", ""))
    +    val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
    +    if (blockSize.isDefined) {
    +      builder.withBlockSize(blockSize.get)
    +    }
    +    val blockletSize = options.get("table_blockletsize").map(_.toInt)
    +    if (blockletSize.isDefined) {
    +      builder.withBlockletSize(blockletSize.get)
    +    }
    +    builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
    +      CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean)
    +    builder.localDictionaryThreshold(
    +      options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
    +        CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
    +    builder.sortBy(
    +      options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
    +    builder.isTransactionalTable(false)
    +    builder.uniqueIdentifier(System.currentTimeMillis())
    --- End diff --
    
    ok


---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212314571
  
    --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.carbondata.execution.datasources
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.metadata.datatype
    +import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
    +import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
    +import org.apache.carbondata.core.scan.expression.conditional._
    +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
    +
    +object CarbonSparkDataSourceUtil {
    +
    +  /**
    +   * Convert from carbon datatype to sparks datatype
    +   */
    +  def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
    +    if (CarbonDataTypes.isDecimal(dataType)) {
    +      DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
    +        dataType.asInstanceOf[CarbonDecimalType].getScale)
    +    } else {
    +      dataType match {
    +        case CarbonDataTypes.STRING => StringType
    +        case CarbonDataTypes.SHORT => ShortType
    +        case CarbonDataTypes.INT => IntegerType
    +        case CarbonDataTypes.LONG => LongType
    +        case CarbonDataTypes.DOUBLE => DoubleType
    +        case CarbonDataTypes.BOOLEAN => BooleanType
    +        case CarbonDataTypes.TIMESTAMP => TimestampType
    +        case CarbonDataTypes.DATE => DateType
    +        case CarbonDataTypes.VARCHAR => StringType
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Convert from sparks datatype to carbon datatype
    +   */
    +  def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
    +    dataType match {
    +      case StringType => CarbonDataTypes.STRING
    +      case ShortType => CarbonDataTypes.SHORT
    +      case IntegerType => CarbonDataTypes.INT
    +      case LongType => CarbonDataTypes.LONG
    +      case DoubleType => CarbonDataTypes.DOUBLE
    +      case FloatType => CarbonDataTypes.FLOAT
    +      case DateType => CarbonDataTypes.DATE
    +      case BooleanType => CarbonDataTypes.BOOLEAN
    +      case TimestampType => CarbonDataTypes.TIMESTAMP
    +      case ArrayType(elementType, _) =>
    +        CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
    +      case StructType(fields) =>
    +        val carbonFields = new java.util.ArrayList[CarbonStructField]
    +        fields.map { field =>
    +          carbonFields.add(
    +            new CarbonStructField(
    +              field.name,
    +              convertSparkToCarbonDataType(field.dataType)))
    +        }
    +        CarbonDataTypes.createStructType(carbonFields)
    +      case NullType => CarbonDataTypes.NULL
    +      case decimal: DecimalType =>
    +        CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
    +      case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
    +    }
    +  }
    +
    +  /**
    +   * Converts data sources filters to carbon filter predicates.
    +   */
    +  def createCarbonFilter(schema: StructType,
    +      predicate: sources.Filter): Option[CarbonExpression] = {
    +    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
    +
    +    def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
    +      predicate match {
    +
    +        case sources.EqualTo(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualTo(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.EqualNullSafe(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualNullSafe(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThan(name, value) =>
    +          Some(new GreaterThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThan(name, value) =>
    +          Some(new LessThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThanOrEqual(name, value) =>
    +          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThanOrEqual(name, value) =>
    +          Some(new LessThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.In(name, values) =>
    +          if (values.length == 1 && values(0) == null) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new InExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.filterNot(_ == null)
    +                  .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
    +          }
    +        case sources.Not(sources.In(name, values)) =>
    +          if (values.contains(null)) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new NotInExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
    +          }
    +        case sources.IsNull(name) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.IsNotNull(name) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.And(lhs, rhs) =>
    +          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
    +        case sources.Or(lhs, rhs) =>
    +          for {
    +            lhsFilter <- createFilter(lhs)
    +            rhsFilter <- createFilter(rhs)
    +          } yield {
    +            new OrExpression(lhsFilter, rhsFilter)
    +          }
    +        case sources.StringStartsWith(name, value) if value.length > 0 =>
    +          Some(new StartsWithExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case _ => None
    +      }
    +    }
    +
    +    def getCarbonExpression(name: String) = {
    --- End diff --
    
    Remove duplicate methods (CarbonFilters also has)


---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6740/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6332/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8033/



---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212336190
  
    --- Diff: integration/spark-datasource/pom.xml ---
    @@ -0,0 +1,279 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +    Licensed to the Apache Software Foundation (ASF) under one or more
    +    contributor license agreements.  See the NOTICE file distributed with
    +    this work for additional information regarding copyright ownership.
    +    The ASF licenses this file to You under the Apache License, Version 2.0
    +    (the "License"); you may not use this file except in compliance with
    +    the License.  You may obtain a copy of the License at
    +
    +       http://www.apache.org/licenses/LICENSE-2.0
    +
    +    Unless required by applicable law or agreed to in writing, software
    +    distributed under the License is distributed on an "AS IS" BASIS,
    +    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +    See the License for the specific language governing permissions and
    +    limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +
    +  <modelVersion>4.0.0</modelVersion>
    +
    +  <parent>
    +    <groupId>org.apache.carbondata</groupId>
    +    <artifactId>carbondata-parent</artifactId>
    +    <version>1.5.0-SNAPSHOT</version>
    +    <relativePath>../../pom.xml</relativePath>
    +  </parent>
    +
    +  <artifactId>carbondata-spark-datasource</artifactId>
    +  <name>Apache CarbonData :: Spark Datasource</name>
    +
    +  <properties>
    +    <dev.path>${basedir}/../../dev</dev.path>
    +    <jacoco.append>true</jacoco.append>
    +  </properties>
    +
    +  <dependencies>
    +    <dependency>
    +      <groupId>org.apache.carbondata</groupId>
    +      <artifactId>carbondata-hadoop</artifactId>
    +      <version>${project.version}</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.carbondata</groupId>
    +      <artifactId>carbondata-store-sdk</artifactId>
    +      <version>${project.version}</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.spark</groupId>
    +      <artifactId>spark-repl_${scala.binary.version}</artifactId>
    +    </dependency>
    +    <dependency>
    +      <groupId>junit</groupId>
    +      <artifactId>junit</artifactId>
    +      <scope>test</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.scalatest</groupId>
    +      <artifactId>scalatest_${scala.binary.version}</artifactId>
    +      <scope>test</scope>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.hadoop</groupId>
    +      <artifactId>hadoop-aws</artifactId>
    +      <version>${hadoop.version}</version>
    +      <exclusions>
    +        <exclusion>
    +          <groupId>com.fasterxml.jackson.core</groupId>
    +          <artifactId>jackson-core</artifactId>
    +        </exclusion>
    +        <exclusion>
    +          <groupId>com.fasterxml.jackson.core</groupId>
    +          <artifactId>jackson-annotations</artifactId>
    +        </exclusion>
    +        <exclusion>
    +          <groupId>com.fasterxml.jackson.core</groupId>
    +          <artifactId>jackson-databind</artifactId>
    +        </exclusion>
    +      </exclusions>
    +    </dependency>
    +  </dependencies>
    +
    +  <build>
    +    <testSourceDirectory>src/test/scala</testSourceDirectory>
    +    <resources>
    +      <resource>
    +        <directory>src/resources</directory>
    +      </resource>
    +      <resource>
    +        <directory>.</directory>
    +        <includes>
    +          <include>CARBON_SPARK_INTERFACELogResource.properties</include>
    +        </includes>
    +      </resource>
    +    </resources>
    +    <plugins>
    +      <plugin>
    +        <groupId>org.scala-tools</groupId>
    +        <artifactId>maven-scala-plugin</artifactId>
    +        <version>2.15.2</version>
    +        <executions>
    +          <execution>
    +            <id>compile</id>
    +            <goals>
    +              <goal>compile</goal>
    +            </goals>
    +            <phase>compile</phase>
    +          </execution>
    +          <execution>
    +            <id>testCompile</id>
    +            <goals>
    +              <goal>testCompile</goal>
    +            </goals>
    +            <phase>test</phase>
    +          </execution>
    +          <execution>
    +            <phase>process-resources</phase>
    +            <goals>
    +              <goal>compile</goal>
    +            </goals>
    +          </execution>
    +        </executions>
    +      </plugin>
    +      <plugin>
    +        <artifactId>maven-compiler-plugin</artifactId>
    +        <configuration>
    +          <source>1.7</source>
    +          <target>1.7</target>
    +        </configuration>
    +      </plugin>
    +      <plugin>
    +        <groupId>org.apache.maven.plugins</groupId>
    +        <artifactId>maven-surefire-plugin</artifactId>
    +        <version>2.18</version>
    +        <!-- Note config is repeated in scalatest config -->
    +        <configuration>
    +          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
    +          <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
    +          <systemProperties>
    +            <java.awt.headless>true</java.awt.headless>
    +            <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
    +          </systemProperties>
    +          <failIfNoTests>false</failIfNoTests>
    +        </configuration>
    +      </plugin>
    +      <plugin>
    +        <groupId>org.scalatest</groupId>
    +        <artifactId>scalatest-maven-plugin</artifactId>
    +        <version>1.0</version>
    +        <!-- Note config is repeated in surefire config -->
    +        <configuration>
    +          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
    +          <junitxml>.</junitxml>
    +          <filereports>CarbonTestSuite.txt</filereports>
    +          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
    +          </argLine>
    +          <stderr />
    +          <environmentVariables>
    +          </environmentVariables>
    +          <systemProperties>
    +            <java.awt.headless>true</java.awt.headless>
    +            <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
    +          </systemProperties>
    +        </configuration>
    +        <executions>
    +          <execution>
    +            <id>test</id>
    +            <goals>
    +              <goal>test</goal>
    +            </goals>
    +          </execution>
    +        </executions>
    +      </plugin>
    +    </plugins>
    +  </build>
    +  <profiles>
    +    <profile>
    +      <id>build-all</id>
    +      <properties>
    +        <spark.version>2.2.1</spark.version>
    +        <scala.binary.version>2.11</scala.binary.version>
    +        <scala.version>2.11.8</scala.version>
    +      </properties>
    +    </profile>
    +    <profile>
    +      <id>sdvtest</id>
    +      <properties>
    +        <maven.test.skip>true</maven.test.skip>
    +      </properties>
    +    </profile>
    +    <profile>
    +      <id>spark-2.1</id>
    +      <properties>
    +        <spark.version>2.1.0</spark.version>
    +        <scala.binary.version>2.11</scala.binary.version>
    +        <scala.version>2.11.8</scala.version>
    +      </properties>
    +      <build>
    +        <plugins>
    +          <plugin>
    +            <groupId>org.apache.maven.plugins</groupId>
    +            <artifactId>maven-compiler-plugin</artifactId>
    +            <configuration>
    +              <excludes>
    +                <exclude>src/main/spark2.2</exclude>
    +              </excludes>
    +            </configuration>
    +          </plugin>
    +          <plugin>
    +            <groupId>org.codehaus.mojo</groupId>
    +            <artifactId>build-helper-maven-plugin</artifactId>
    +            <version>3.0.0</version>
    +            <executions>
    +              <execution>
    +                <id>add-source</id>
    +                <phase>generate-sources</phase>
    +                <goals>
    +                  <goal>add-source</goal>
    +                </goals>
    +                <configuration>
    +                  <sources>
    +                    <source>src/main/spark2.1</source>
    +                  </sources>
    +                </configuration>
    +              </execution>
    +            </executions>
    +          </plugin>
    +        </plugins>
    +      </build>
    +    </profile>
    +    <profile>
    +      <id>spark-2.2</id>
    +      <activation>
    +        <activeByDefault>true</activeByDefault>
    +      </activation>
    +      <properties>
    +        <spark.version>2.2.1</spark.version>
    +        <scala.binary.version>2.11</scala.binary.version>
    +        <scala.version>2.11.8</scala.version>
    +      </properties>
    +      <build>
    --- End diff --
    
    ok


---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212340928
  
    --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.carbondata.execution.datasources
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.metadata.datatype
    +import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
    +import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
    +import org.apache.carbondata.core.scan.expression.conditional._
    +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel
    +import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
    +
    +object CarbonSparkDataSourceUtil {
    +
    +  /**
    +   * Convert from carbon datatype to sparks datatype
    +   */
    +  def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
    +    if (CarbonDataTypes.isDecimal(dataType)) {
    +      DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
    +        dataType.asInstanceOf[CarbonDecimalType].getScale)
    +    } else {
    +      dataType match {
    +        case CarbonDataTypes.STRING => StringType
    +        case CarbonDataTypes.SHORT => ShortType
    +        case CarbonDataTypes.INT => IntegerType
    +        case CarbonDataTypes.LONG => LongType
    +        case CarbonDataTypes.DOUBLE => DoubleType
    +        case CarbonDataTypes.BOOLEAN => BooleanType
    +        case CarbonDataTypes.TIMESTAMP => TimestampType
    +        case CarbonDataTypes.DATE => DateType
    +        case CarbonDataTypes.VARCHAR => StringType
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Convert from sparks datatype to carbon datatype
    +   */
    +  def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
    +    dataType match {
    +      case StringType => CarbonDataTypes.STRING
    +      case ShortType => CarbonDataTypes.SHORT
    +      case IntegerType => CarbonDataTypes.INT
    +      case LongType => CarbonDataTypes.LONG
    +      case DoubleType => CarbonDataTypes.DOUBLE
    +      case FloatType => CarbonDataTypes.FLOAT
    +      case DateType => CarbonDataTypes.DATE
    +      case BooleanType => CarbonDataTypes.BOOLEAN
    +      case TimestampType => CarbonDataTypes.TIMESTAMP
    +      case ArrayType(elementType, _) =>
    +        CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
    +      case StructType(fields) =>
    +        val carbonFields = new java.util.ArrayList[CarbonStructField]
    +        fields.map { field =>
    +          carbonFields.add(
    +            new CarbonStructField(
    +              field.name,
    +              convertSparkToCarbonDataType(field.dataType)))
    +        }
    +        CarbonDataTypes.createStructType(carbonFields)
    +      case NullType => CarbonDataTypes.NULL
    +      case decimal: DecimalType =>
    +        CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
    +      case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
    +    }
    +  }
    +
    +  /**
    +   * Converts data sources filters to carbon filter predicates.
    +   */
    +  def createCarbonFilter(schema: StructType,
    +      predicate: sources.Filter): Option[CarbonExpression] = {
    +    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
    +
    +    def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
    +      predicate match {
    +
    +        case sources.EqualTo(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualTo(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.EqualNullSafe(name, value) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.Not(sources.EqualNullSafe(name, value)) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThan(name, value) =>
    +          Some(new GreaterThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThan(name, value) =>
    +          Some(new LessThanExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.GreaterThanOrEqual(name, value) =>
    +          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.LessThanOrEqual(name, value) =>
    +          Some(new LessThanEqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case sources.In(name, values) =>
    +          if (values.length == 1 && values(0) == null) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new InExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.filterNot(_ == null)
    +                  .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
    +          }
    +        case sources.Not(sources.In(name, values)) =>
    +          if (values.contains(null)) {
    +            Some(new FalseExpression(getCarbonExpression(name)))
    +          } else {
    +            Some(new NotInExpression(getCarbonExpression(name),
    +              new ListExpression(
    +                convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
    +          }
    +        case sources.IsNull(name) =>
    +          Some(new EqualToExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.IsNotNull(name) =>
    +          Some(new NotEqualsExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, null), true))
    +        case sources.And(lhs, rhs) =>
    +          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
    +        case sources.Or(lhs, rhs) =>
    +          for {
    +            lhsFilter <- createFilter(lhs)
    +            rhsFilter <- createFilter(rhs)
    +          } yield {
    +            new OrExpression(lhsFilter, rhsFilter)
    +          }
    +        case sources.StringStartsWith(name, value) if value.length > 0 =>
    +          Some(new StartsWithExpression(getCarbonExpression(name),
    +            getCarbonLiteralExpression(name, value)))
    +        case _ => None
    +      }
    +    }
    +
    +    def getCarbonExpression(name: String) = {
    --- End diff --
    
    ok


---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6757/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6340/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6349/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6377/



---

[GitHub] carbondata pull request #2647: [WIP] Added Spark FileFormat interface implem...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r211873452
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
    @@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
        * @param path carbon file path
        */
       public LatestFilesReadCommittedScope(String path) {
    -    this(path, null);
    +    this(path, (String) null);
    +  }
    +
    +  /**
    +   * a new constructor with path
    +   *
    +   * @param path carbon file path
    +   */
    +  public LatestFilesReadCommittedScope(String path, String[] subFolders) {
    +    Objects.requireNonNull(path);
    +    this.carbonFilePath = path;
    +    this.subFolders = subFolders;
    +    try {
    +      takeCarbonIndexFileSnapShot();
    +    } catch (IOException ex) {
    +      throw new RuntimeException("Error while taking index snapshot", ex);
    --- End diff --
    
    Can we use more specific exception


---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7997/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6370/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6701/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6693/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7989/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8017/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6354/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6725/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6382/



---

[GitHub] carbondata pull request #2647: [WIP] Added Spark FileFormat interface implem...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r211873872
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
    @@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
        * @param path carbon file path
        */
       public LatestFilesReadCommittedScope(String path) {
    -    this(path, null);
    +    this(path, (String) null);
    +  }
    +
    +  /**
    +   * a new constructor with path
    +   *
    +   * @param path carbon file path
    +   */
    +  public LatestFilesReadCommittedScope(String path, String[] subFolders) {
    +    Objects.requireNonNull(path);
    +    this.carbonFilePath = path;
    +    this.subFolders = subFolders;
    --- End diff --
    
    suggest change `subFolders` to `dataFolders` and add javadoc to explain it contains carbondata files and carbonindex files in these folders


---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7966/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6331/



---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212350692
  
    --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.carbondata.execution.datasources.readsupport
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.types.StructType
    +
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
    +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport
    +
    +/**
    + * Read support class which converts carbon row array format to sparks Internal row.
    + */
    +class SparkUnsafeRowReadSuport(requiredSchema: StructType)
    --- End diff --
    
    ok


---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8002/



---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212329045
  
    --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala ---
    @@ -0,0 +1,400 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.carbondata.execution.datasources
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileStatus, Path}
    +import org.apache.hadoop.io.NullWritable
    +import org.apache.hadoop.mapreduce._
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    +import org.apache.spark.{SparkException, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.memory.MemoryMode
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.JoinedRow
    +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
    +import org.apache.spark.sql.catalyst.util.ArrayData
    +import org.apache.spark.sql.execution.datasources._
    +import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
    +import org.apache.spark.sql.types._
    +import org.apache.spark.sql.util.SparkTypeConverter
    +import org.apache.spark.util.SerializableConfiguration
    +
    +import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.converter.SparkDataTypeConverterImpl
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.indexstore.BlockletDetailInfo
    +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnarFormatVersion}
    +import org.apache.carbondata.core.metadata.schema.SchemaReader
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
    +import org.apache.carbondata.core.scan.expression.logical.AndExpression
    +import org.apache.carbondata.core.statusmanager.{FileFormat => CarbonFileFormatVersion}
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader}
    +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableOutputFormat}
    +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
    +import org.apache.carbondata.processing.loading.complexobjects.{ArrayObject, StructObject}
    +import org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader
    +
    +/**
    + * Used to read and write data stored in carbondata files to/from the spark execution engine.
    + */
    +@InterfaceAudience.User
    +@InterfaceStability.Evolving
    +class SparkCarbonFileFormat extends FileFormat
    +  with DataSourceRegister
    +  with Logging
    +  with Serializable {
    +
    +  @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  /**
    +   * If user does not provide schema while reading the data then spark calls this method to infer
    +   * schema from the carbodata files. It reads the schema present in carbondata files and return it.
    +   */
    +  override def inferSchema(sparkSession: SparkSession,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType] = {
    +    val tablePath = options.get("path") match {
    +      case Some(path) => path
    +      case _ => FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
    +    }
    +
    +    val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
    +    val table = CarbonTable.buildFromTableInfo(tableInfo)
    +    var schema = new StructType
    +    tableInfo.getFactTable.getListOfColumns.asScala.foreach { col =>
    +      // TODO find better way to know its a child
    +      if (!col.getColumnName.contains(".")) {
    +        schema = schema.add(
    +          col.getColumnName,
    +          SparkTypeConverter.convertCarbonToSparkDataType(col, table))
    +      }
    +    }
    +    Some(schema)
    +  }
    +
    +
    +  /**
    +   * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation is
    +   * done here.
    +   */
    +  override def prepareWrite(sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +
    +    val conf = job.getConfiguration
    +
    +    val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
    +    model.setLoadWithoutConverterStep(true)
    +    CarbonTableOutputFormat.setLoadModel(conf, model)
    +
    +    new OutputWriterFactory {
    +      override def newInstance(
    +          path: String,
    +          dataSchema: StructType,
    +          context: TaskAttemptContext): OutputWriter = {
    +        val updatedPath = if (path.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
    +          new Path(path).getParent.toString
    +        } else {
    +          path
    +        }
    +        context.getConfiguration.set("carbon.outputformat.writepath", updatedPath)
    +        context.getConfiguration.set("carbon.outputformat.taskno", System.nanoTime() + "")
    +        new CarbonOutputWriter(path, context, dataSchema.fields)
    +      }
    +
    +      override def getFileExtension(context: TaskAttemptContext): String = {
    +        CarbonTablePath.CARBON_DATA_EXT
    +      }
    +    }
    +  }
    +
    +  /**
    +   * It is a just class to make compile between spark 2.1 and 2.2
    +   */
    +  private trait AbstractCarbonOutputWriter {
    +    def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
    +    def writeInternal(row: InternalRow): Unit = {
    +      writeCarbon(row)
    +    }
    +    def write(row: InternalRow): Unit = {
    +      writeCarbon(row)
    +    }
    +    def writeCarbon(row: InternalRow): Unit
    +  }
    +
    +
    +  /**
    +   * Writer class for carbondata files
    +   */
    +  private class CarbonOutputWriter(path: String,
    +      context: TaskAttemptContext,
    +      fieldTypes: Array[StructField]) extends OutputWriter with AbstractCarbonOutputWriter {
    +
    +    private val writable = new ObjectArrayWritable
    +
    +    private val cutOffDate = Integer.MAX_VALUE >> 1
    +
    +    private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] =
    +      new CarbonTableOutputFormat().getRecordWriter(context)
    +
    +    /**
    +     * Write sparks internal row to carbondata record writer
    +     */
    +    def writeCarbon(row: InternalRow): Unit = {
    +      val data: Array[AnyRef] = extractData(row, fieldTypes)
    +      writable.set(data)
    +      recordWriter.write(NullWritable.get(), writable)
    +    }
    +
    +    override def writeInternal(row: InternalRow): Unit = {
    +      writeCarbon(row)
    +    }
    +
    +    /**
    +     * Convert the internal row to carbondata understandable object
    +     */
    +    private def extractData(row: InternalRow, fieldTypes: Array[StructField]): Array[AnyRef] = {
    +      val data = new Array[AnyRef](fieldTypes.length)
    +      var i = 0
    +      while (i < fieldTypes.length) {
    +        if (!row.isNullAt(i)) {
    +          fieldTypes(i).dataType match {
    +            case StringType =>
    +              data(i) = row.getString(i)
    +            case d: DecimalType =>
    +              data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
    +            case s: StructType =>
    +              data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
    +            case s: ArrayType =>
    +              data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
    +            case d: DateType =>
    +              data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
    +            case d: TimestampType =>
    +              data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef]
    +            case other =>
    +              data(i) = row.get(i, other)
    +          }
    +        } else {
    +          setNull(fieldTypes(i).dataType, data, i)
    +        }
    +        i += 1
    +      }
    +      data
    +    }
    +
    +    private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = {
    +      dataType match {
    +        case d: DateType =>
    +          // 1  as treated as null in carbon
    +          data(i) = 1.asInstanceOf[AnyRef]
    +        case _ =>
    +      }
    +    }
    +
    +    /**
    +     * Convert the internal row to carbondata understandable object
    +     */
    +    private def extractData(row: ArrayData, dataType: DataType): Array[AnyRef] = {
    +      val data = new Array[AnyRef](row.numElements())
    +      var i = 0
    +      while (i < data.length) {
    +        if (!row.isNullAt(i)) {
    +          dataType match {
    +            case d: DecimalType =>
    +              data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
    +            case s: StructType =>
    +              data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
    +            case s: ArrayType =>
    +              data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
    +            case d: DateType =>
    +              data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
    +            case other => data(i) = row.get(i, dataType)
    +          }
    +        } else {
    +          setNull(dataType, data, i)
    +        }
    +        i += 1
    +      }
    +      data
    +    }
    +
    +    override def close(): Unit = {
    +      recordWriter.close(context)
    +    }
    +  }
    +
    +  override def shortName(): String = "carbon"
    +
    +  override def toString: String = "carbon"
    +
    +  override def hashCode(): Int = getClass.hashCode()
    +
    +  override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonFileFormat]
    +
    +  /**
    +   * Whether to support vector reader while reading data.
    +   * In case of complex types it is not required to support it
    +   */
    +  private def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = {
    +    val vectorizedReader = {
    +      if (sparkSession.sqlContext.sparkSession.conf
    +        .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
    +        sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
    +      } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
    +        System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
    +      } else {
    +        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
    +          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
    +      }
    +    }
    +    vectorizedReader.toBoolean && schema.forall(_.dataType.isInstanceOf[AtomicType])
    +  }
    +
    +
    +  /**
    +   * Returns whether this format support returning columnar batch or not.
    +   */
    +  override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
    +    val conf = sparkSession.sessionState.conf
    +    conf.wholeStageEnabled &&
    +    schema.length <= conf.wholeStageMaxNumFields &&
    +    schema.forall(_.dataType.isInstanceOf[AtomicType])
    +  }
    +
    +  /**
    +   * Returns a function that can be used to read a single carbondata file in as an
    +   * Iterator of InternalRow.
    +   */
    +  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
    +      dataSchema: StructType,
    +      partitionSchema: StructType,
    +      requiredSchema: StructType,
    +      filters: Seq[Filter],
    +      options: Map[String, String],
    +      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
    +    val filter: Option[CarbonExpression] = filters.flatMap { filter =>
    +      CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
    +    }.reduceOption(new AndExpression(_, _))
    +
    +    val projection = requiredSchema.map(_.name).toArray
    +    val carbonProjection = new CarbonProjection
    +    projection.foreach(carbonProjection.addColumn)
    +
    +    var supportBatchValue: Boolean = false
    +
    +    val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
    +    val readVector = supportVector(sparkSession, resultSchema)
    +    if (readVector) {
    +      supportBatchValue = supportBatch(sparkSession, resultSchema)
    +    }
    +    val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
    +    CarbonInputFormat
    +      .setTableInfo(hadoopConf, model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
    +    CarbonInputFormat.setTransactionalTable(hadoopConf, false)
    +    CarbonInputFormat.setColumnProjection(hadoopConf, carbonProjection)
    +    filter match {
    +      case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
    +      case None => None
    +    }
    +    val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
    +    val broadcastedHadoopConf =
    +      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
    +
    +    file: PartitionedFile => {
    +      assert(file.partitionValues.numFields == partitionSchema.size)
    +
    +      if (!(file.filePath.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
    --- End diff --
    
    Can we use just check with carbondata extn?


---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/carbondata/pull/2647


---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6713/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6366/



---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212250514
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
    @@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
        * @param path carbon file path
        */
       public LatestFilesReadCommittedScope(String path) {
    -    this(path, null);
    +    this(path, (String) null);
    +  }
    +
    +  /**
    +   * a new constructor with path
    +   *
    +   * @param path carbon file path
    +   */
    --- End diff --
    
    ok


---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6722/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8022/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6324/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6376/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6760/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6378/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6747/



---

[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2647#discussion_r212350177
  
    --- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala ---
    @@ -0,0 +1,400 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.carbondata.execution.datasources
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileStatus, Path}
    +import org.apache.hadoop.io.NullWritable
    +import org.apache.hadoop.mapreduce._
    +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    +import org.apache.spark.{SparkException, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.memory.MemoryMode
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.JoinedRow
    +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
    +import org.apache.spark.sql.catalyst.util.ArrayData
    +import org.apache.spark.sql.execution.datasources._
    +import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
    +import org.apache.spark.sql.types._
    +import org.apache.spark.sql.util.SparkTypeConverter
    +import org.apache.spark.util.SerializableConfiguration
    +
    +import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
    +import org.apache.carbondata.common.logging.LogServiceFactory
    +import org.apache.carbondata.converter.SparkDataTypeConverterImpl
    +import org.apache.carbondata.core.constants.CarbonCommonConstants
    +import org.apache.carbondata.core.datastore.impl.FileFactory
    +import org.apache.carbondata.core.indexstore.BlockletDetailInfo
    +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnarFormatVersion}
    +import org.apache.carbondata.core.metadata.schema.SchemaReader
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable
    +import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
    +import org.apache.carbondata.core.scan.expression.logical.AndExpression
    +import org.apache.carbondata.core.statusmanager.{FileFormat => CarbonFileFormatVersion}
    +import org.apache.carbondata.core.util.CarbonProperties
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader}
    +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableOutputFormat}
    +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
    +import org.apache.carbondata.processing.loading.complexobjects.{ArrayObject, StructObject}
    +import org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader
    +
    +/**
    + * Used to read and write data stored in carbondata files to/from the spark execution engine.
    + */
    +@InterfaceAudience.User
    +@InterfaceStability.Evolving
    +class SparkCarbonFileFormat extends FileFormat
    +  with DataSourceRegister
    +  with Logging
    +  with Serializable {
    +
    +  @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  /**
    +   * If user does not provide schema while reading the data then spark calls this method to infer
    +   * schema from the carbodata files. It reads the schema present in carbondata files and return it.
    +   */
    +  override def inferSchema(sparkSession: SparkSession,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType] = {
    +    val tablePath = options.get("path") match {
    +      case Some(path) => path
    +      case _ => FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
    +    }
    +
    +    val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
    +    val table = CarbonTable.buildFromTableInfo(tableInfo)
    +    var schema = new StructType
    +    tableInfo.getFactTable.getListOfColumns.asScala.foreach { col =>
    +      // TODO find better way to know its a child
    +      if (!col.getColumnName.contains(".")) {
    +        schema = schema.add(
    +          col.getColumnName,
    +          SparkTypeConverter.convertCarbonToSparkDataType(col, table))
    +      }
    +    }
    +    Some(schema)
    +  }
    +
    +
    +  /**
    +   * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation is
    +   * done here.
    +   */
    +  override def prepareWrite(sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +
    +    val conf = job.getConfiguration
    +
    +    val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
    +    model.setLoadWithoutConverterStep(true)
    +    CarbonTableOutputFormat.setLoadModel(conf, model)
    +
    +    new OutputWriterFactory {
    +      override def newInstance(
    +          path: String,
    +          dataSchema: StructType,
    +          context: TaskAttemptContext): OutputWriter = {
    +        val updatedPath = if (path.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
    +          new Path(path).getParent.toString
    +        } else {
    +          path
    +        }
    +        context.getConfiguration.set("carbon.outputformat.writepath", updatedPath)
    +        context.getConfiguration.set("carbon.outputformat.taskno", System.nanoTime() + "")
    +        new CarbonOutputWriter(path, context, dataSchema.fields)
    +      }
    +
    +      override def getFileExtension(context: TaskAttemptContext): String = {
    +        CarbonTablePath.CARBON_DATA_EXT
    +      }
    +    }
    +  }
    +
    +  /**
    +   * It is a just class to make compile between spark 2.1 and 2.2
    +   */
    +  private trait AbstractCarbonOutputWriter {
    +    def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
    +    def writeInternal(row: InternalRow): Unit = {
    +      writeCarbon(row)
    +    }
    +    def write(row: InternalRow): Unit = {
    +      writeCarbon(row)
    +    }
    +    def writeCarbon(row: InternalRow): Unit
    +  }
    +
    +
    +  /**
    +   * Writer class for carbondata files
    +   */
    +  private class CarbonOutputWriter(path: String,
    +      context: TaskAttemptContext,
    +      fieldTypes: Array[StructField]) extends OutputWriter with AbstractCarbonOutputWriter {
    +
    +    private val writable = new ObjectArrayWritable
    +
    +    private val cutOffDate = Integer.MAX_VALUE >> 1
    +
    +    private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] =
    +      new CarbonTableOutputFormat().getRecordWriter(context)
    +
    +    /**
    +     * Write sparks internal row to carbondata record writer
    +     */
    +    def writeCarbon(row: InternalRow): Unit = {
    +      val data: Array[AnyRef] = extractData(row, fieldTypes)
    +      writable.set(data)
    +      recordWriter.write(NullWritable.get(), writable)
    +    }
    +
    +    override def writeInternal(row: InternalRow): Unit = {
    +      writeCarbon(row)
    +    }
    +
    +    /**
    +     * Convert the internal row to carbondata understandable object
    +     */
    +    private def extractData(row: InternalRow, fieldTypes: Array[StructField]): Array[AnyRef] = {
    +      val data = new Array[AnyRef](fieldTypes.length)
    +      var i = 0
    +      while (i < fieldTypes.length) {
    +        if (!row.isNullAt(i)) {
    +          fieldTypes(i).dataType match {
    +            case StringType =>
    +              data(i) = row.getString(i)
    +            case d: DecimalType =>
    +              data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
    +            case s: StructType =>
    +              data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
    +            case s: ArrayType =>
    +              data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
    +            case d: DateType =>
    +              data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
    +            case d: TimestampType =>
    +              data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef]
    +            case other =>
    +              data(i) = row.get(i, other)
    +          }
    +        } else {
    +          setNull(fieldTypes(i).dataType, data, i)
    +        }
    +        i += 1
    +      }
    +      data
    +    }
    +
    +    private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = {
    +      dataType match {
    +        case d: DateType =>
    +          // 1  as treated as null in carbon
    +          data(i) = 1.asInstanceOf[AnyRef]
    +        case _ =>
    +      }
    +    }
    +
    +    /**
    +     * Convert the internal row to carbondata understandable object
    +     */
    +    private def extractData(row: ArrayData, dataType: DataType): Array[AnyRef] = {
    +      val data = new Array[AnyRef](row.numElements())
    +      var i = 0
    +      while (i < data.length) {
    +        if (!row.isNullAt(i)) {
    +          dataType match {
    +            case d: DecimalType =>
    +              data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
    +            case s: StructType =>
    +              data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
    +            case s: ArrayType =>
    +              data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
    +            case d: DateType =>
    +              data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
    +            case other => data(i) = row.get(i, dataType)
    +          }
    +        } else {
    +          setNull(dataType, data, i)
    +        }
    +        i += 1
    +      }
    +      data
    +    }
    +
    +    override def close(): Unit = {
    +      recordWriter.close(context)
    +    }
    +  }
    +
    +  override def shortName(): String = "carbon"
    +
    +  override def toString: String = "carbon"
    +
    +  override def hashCode(): Int = getClass.hashCode()
    +
    +  override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonFileFormat]
    +
    +  /**
    +   * Whether to support vector reader while reading data.
    +   * In case of complex types it is not required to support it
    +   */
    +  private def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = {
    +    val vectorizedReader = {
    +      if (sparkSession.sqlContext.sparkSession.conf
    +        .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
    +        sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
    +      } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
    +        System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
    +      } else {
    +        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
    +          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
    +      }
    +    }
    +    vectorizedReader.toBoolean && schema.forall(_.dataType.isInstanceOf[AtomicType])
    +  }
    +
    +
    +  /**
    +   * Returns whether this format support returning columnar batch or not.
    +   */
    +  override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
    +    val conf = sparkSession.sessionState.conf
    +    conf.wholeStageEnabled &&
    +    schema.length <= conf.wholeStageMaxNumFields &&
    +    schema.forall(_.dataType.isInstanceOf[AtomicType])
    +  }
    +
    +  /**
    +   * Returns a function that can be used to read a single carbondata file in as an
    +   * Iterator of InternalRow.
    +   */
    +  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
    +      dataSchema: StructType,
    +      partitionSchema: StructType,
    +      requiredSchema: StructType,
    +      filters: Seq[Filter],
    +      options: Map[String, String],
    +      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
    +    val filter: Option[CarbonExpression] = filters.flatMap { filter =>
    +      CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
    +    }.reduceOption(new AndExpression(_, _))
    +
    +    val projection = requiredSchema.map(_.name).toArray
    +    val carbonProjection = new CarbonProjection
    +    projection.foreach(carbonProjection.addColumn)
    +
    +    var supportBatchValue: Boolean = false
    +
    +    val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
    +    val readVector = supportVector(sparkSession, resultSchema)
    +    if (readVector) {
    +      supportBatchValue = supportBatch(sparkSession, resultSchema)
    +    }
    +    val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
    +    CarbonInputFormat
    +      .setTableInfo(hadoopConf, model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
    +    CarbonInputFormat.setTransactionalTable(hadoopConf, false)
    +    CarbonInputFormat.setColumnProjection(hadoopConf, carbonProjection)
    +    filter match {
    +      case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
    +      case None => None
    +    }
    +    val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
    +    val broadcastedHadoopConf =
    +      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
    +
    +    file: PartitionedFile => {
    +      assert(file.partitionValues.numFields == partitionSchema.size)
    +
    +      if (!(file.filePath.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
    --- End diff --
    
    ok


---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7977/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6709/



---

[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8039/



---

[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2647
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7969/



---