You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by ravipesala <gi...@git.apache.org> on 2018/08/22 06:04:20 UTC
[GitHub] carbondata pull request #2647: [WIP] Added Spark FileFormat interface implem...
GitHub user ravipesala opened a pull request:
https://github.com/apache/carbondata/pull/2647
[WIP] Added Spark FileFormat interface implementation in Carbon
Be sure to do all of the following checklist to help us incorporate
your contribution quickly and easily:
- [ ] Any interfaces changed?
- [ ] Any backward compatibility impacted?
- [ ] Document update required?
- [ ] Testing done
Please provide details on
- Whether new unit test cases have been added or why no new tests are required?
- How it is tested? Please attach test report.
- Is it a performance related change? Please attach the performance test report.
- Any additional information to help reviewers in testing this change.
- [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ravipesala/incubator-carbondata spark-fileformat
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/carbondata/pull/2647.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2647
----
commit 4df28b3ac90fbd26ff674c10dd4ae3d3a95f956a
Author: ravipesala <ra...@...>
Date: 2018-08-22T06:02:21Z
Added Spark FileFormat interface implementation in Carbon
----
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6690/
---
[GitHub] carbondata pull request #2647: [WIP] Added Spark FileFormat interface implem...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r211872774
--- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
@@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path) {
- this(path, null);
+ this(path, (String) null);
+ }
+
+ /**
+ * a new constructor with path
+ *
+ * @param path carbon file path
+ */
--- End diff --
missing parameter doc
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212250447
--- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
@@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path) {
- this(path, null);
+ this(path, (String) null);
+ }
+
+ /**
+ * a new constructor with path
+ *
+ * @param path carbon file path
+ */
+ public LatestFilesReadCommittedScope(String path, String[] subFolders) {
+ Objects.requireNonNull(path);
+ this.carbonFilePath = path;
+ this.subFolders = subFolders;
+ try {
+ takeCarbonIndexFileSnapShot();
+ } catch (IOException ex) {
+ throw new RuntimeException("Error while taking index snapshot", ex);
--- End diff --
Not required handling here, so removed.
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212305587
--- Diff: integration/spark-datasource/pom.xml ---
@@ -0,0 +1,279 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-parent</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>carbondata-spark-datasource</artifactId>
+ <name>Apache CarbonData :: Spark Datasource</name>
+
+ <properties>
+ <dev.path>${basedir}/../../dev</dev.path>
+ <jacoco.append>true</jacoco.append>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-hadoop</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-store-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-repl_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/resources</directory>
+ </resource>
+ <resource>
+ <directory>.</directory>
+ <includes>
+ <include>CARBON_SPARK_INTERFACELogResource.properties</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>testCompile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18</version>
+ <!-- Note config is repeated in scalatest config -->
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+ </systemProperties>
+ <failIfNoTests>false</failIfNoTests>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>1.0</version>
+ <!-- Note config is repeated in surefire config -->
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <filereports>CarbonTestSuite.txt</filereports>
+ <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ </argLine>
+ <stderr />
+ <environmentVariables>
+ </environmentVariables>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+ </systemProperties>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>build-all</id>
+ <properties>
+ <spark.version>2.2.1</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ </profile>
+ <profile>
+ <id>sdvtest</id>
+ <properties>
+ <maven.test.skip>true</maven.test.skip>
+ </properties>
+ </profile>
+ <profile>
+ <id>spark-2.1</id>
+ <properties>
+ <spark.version>2.1.0</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/spark2.2</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/spark2.1</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark-2.2</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <spark.version>2.2.1</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
--- End diff --
I think profiles are not required
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6336/
---
[GitHub] carbondata pull request #2647: [WIP] Added Spark FileFormat interface implem...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r211872899
--- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
@@ -36,12 +36,14 @@
*/
@InterfaceAudience.Internal
@InterfaceStability.Stable
-public class LatestFilesReadCommittedScope implements ReadCommittedScope {
+public class LatestFilesReadCommittedScope
+ implements ReadCommittedScope {
--- End diff --
no need to modify
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212250488
--- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
@@ -36,12 +36,14 @@
*/
@InterfaceAudience.Internal
@InterfaceStability.Stable
-public class LatestFilesReadCommittedScope implements ReadCommittedScope {
+public class LatestFilesReadCommittedScope
+ implements ReadCommittedScope {
--- End diff --
ok
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6319/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7975/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7999/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6699/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6316/
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212317536
--- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
+import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
+import org.apache.carbondata.core.scan.expression.conditional._
+import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
+
+object CarbonSparkDataSourceUtil {
+
+ /**
+ * Convert from carbon datatype to sparks datatype
+ */
+ def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
+ if (CarbonDataTypes.isDecimal(dataType)) {
+ DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
+ dataType.asInstanceOf[CarbonDecimalType].getScale)
+ } else {
+ dataType match {
+ case CarbonDataTypes.STRING => StringType
+ case CarbonDataTypes.SHORT => ShortType
+ case CarbonDataTypes.INT => IntegerType
+ case CarbonDataTypes.LONG => LongType
+ case CarbonDataTypes.DOUBLE => DoubleType
+ case CarbonDataTypes.BOOLEAN => BooleanType
+ case CarbonDataTypes.TIMESTAMP => TimestampType
+ case CarbonDataTypes.DATE => DateType
+ case CarbonDataTypes.VARCHAR => StringType
+ }
+ }
+ }
+
+ /**
+ * Convert from sparks datatype to carbon datatype
+ */
+ def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
+ dataType match {
+ case StringType => CarbonDataTypes.STRING
+ case ShortType => CarbonDataTypes.SHORT
+ case IntegerType => CarbonDataTypes.INT
+ case LongType => CarbonDataTypes.LONG
+ case DoubleType => CarbonDataTypes.DOUBLE
+ case FloatType => CarbonDataTypes.FLOAT
+ case DateType => CarbonDataTypes.DATE
+ case BooleanType => CarbonDataTypes.BOOLEAN
+ case TimestampType => CarbonDataTypes.TIMESTAMP
+ case ArrayType(elementType, _) =>
+ CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
+ case StructType(fields) =>
+ val carbonFields = new java.util.ArrayList[CarbonStructField]
+ fields.map { field =>
+ carbonFields.add(
+ new CarbonStructField(
+ field.name,
+ convertSparkToCarbonDataType(field.dataType)))
+ }
+ CarbonDataTypes.createStructType(carbonFields)
+ case NullType => CarbonDataTypes.NULL
+ case decimal: DecimalType =>
+ CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
+ case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
+ }
+ }
+
+ /**
+ * Converts data sources filters to carbon filter predicates.
+ */
+ def createCarbonFilter(schema: StructType,
+ predicate: sources.Filter): Option[CarbonExpression] = {
+ val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+ def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
+ predicate match {
+
+ case sources.EqualTo(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualTo(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.EqualNullSafe(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualNullSafe(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThan(name, value) =>
+ Some(new GreaterThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThan(name, value) =>
+ Some(new LessThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThanOrEqual(name, value) =>
+ Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThanOrEqual(name, value) =>
+ Some(new LessThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.In(name, values) =>
+ if (values.length == 1 && values(0) == null) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new InExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.filterNot(_ == null)
+ .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
+ }
+ case sources.Not(sources.In(name, values)) =>
+ if (values.contains(null)) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new NotInExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+ }
+ case sources.IsNull(name) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.IsNotNull(name) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.And(lhs, rhs) =>
+ (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
+ case sources.Or(lhs, rhs) =>
+ for {
+ lhsFilter <- createFilter(lhs)
+ rhsFilter <- createFilter(rhs)
+ } yield {
+ new OrExpression(lhsFilter, rhsFilter)
+ }
+ case sources.StringStartsWith(name, value) if value.length > 0 =>
+ Some(new StartsWithExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case _ => None
+ }
+ }
+
+ def getCarbonExpression(name: String) = {
+ new CarbonColumnExpression(name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+ }
+
+ def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
+ val dataTypeOfAttribute =
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
+ val dataType =
+ if (Option(value).isDefined &&
+ dataTypeOfAttribute == CarbonDataTypes.STRING &&
+ value.isInstanceOf[Double]) {
+ CarbonDataTypes.DOUBLE
+ } else {
+ dataTypeOfAttribute
+ }
+ new CarbonLiteralExpression(value, dataType)
+ }
+
+ createFilter(predicate)
+ }
+
+ // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
+ // not able find the classes inside scala list and gives ClassNotFoundException.
+ private def convertToJavaList(
+ scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
+ val javaList = new java.util.ArrayList[CarbonExpression]()
+ scalaList.foreach(javaList.add)
+ javaList
+ }
+
+ /**
+ * Create load model for carbon
+ */
+ def prepareLoadModel(options: Map[String, String],
+ dataSchema: StructType): CarbonLoadModel = {
+ val schema = new Schema(dataSchema.fields.map { field =>
+ field.dataType match {
+ case s: StructType =>
+ new Field(field.name,
+ field.dataType.typeName,
+ s.fields
+ .map(f => new datatype.StructField(f.name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
+ case a: ArrayType =>
+ new Field(field.name,
+ field.dataType.typeName,
+ Seq(new datatype.StructField(field.name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
+ case other =>
+ new Field(field.name, field.dataType.simpleString)
+ }
+ })
+ val builder = new CarbonWriterBuilder
+ builder.isTransactionalTable(false)
+ builder.outputPath(options.getOrElse("path", ""))
+ val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
+ if (blockSize.isDefined) {
+ builder.withBlockSize(blockSize.get)
+ }
+ val blockletSize = options.get("table_blockletsize").map(_.toInt)
+ if (blockletSize.isDefined) {
+ builder.withBlockletSize(blockletSize.get)
+ }
+ builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+ CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean)
+ builder.localDictionaryThreshold(
+ options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+ CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
+ builder.sortBy(
+ options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
+ builder.isTransactionalTable(false)
--- End diff --
duplicate line
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7985/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6351/
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212250560
--- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
@@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path) {
- this(path, null);
+ this(path, (String) null);
+ }
+
+ /**
+ * a new constructor with path
+ *
+ * @param path carbon file path
+ */
+ public LatestFilesReadCommittedScope(String path, String[] subFolders) {
+ Objects.requireNonNull(path);
+ this.carbonFilePath = path;
+ this.subFolders = subFolders;
--- End diff --
ok
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212318460
--- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
+import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
+import org.apache.carbondata.core.scan.expression.conditional._
+import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
+
+object CarbonSparkDataSourceUtil {
+
+ /**
+ * Convert from carbon datatype to sparks datatype
+ */
+ def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
+ if (CarbonDataTypes.isDecimal(dataType)) {
+ DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
+ dataType.asInstanceOf[CarbonDecimalType].getScale)
+ } else {
+ dataType match {
+ case CarbonDataTypes.STRING => StringType
+ case CarbonDataTypes.SHORT => ShortType
+ case CarbonDataTypes.INT => IntegerType
+ case CarbonDataTypes.LONG => LongType
+ case CarbonDataTypes.DOUBLE => DoubleType
+ case CarbonDataTypes.BOOLEAN => BooleanType
+ case CarbonDataTypes.TIMESTAMP => TimestampType
+ case CarbonDataTypes.DATE => DateType
+ case CarbonDataTypes.VARCHAR => StringType
+ }
+ }
+ }
+
+ /**
+ * Convert from sparks datatype to carbon datatype
+ */
+ def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
+ dataType match {
+ case StringType => CarbonDataTypes.STRING
+ case ShortType => CarbonDataTypes.SHORT
+ case IntegerType => CarbonDataTypes.INT
+ case LongType => CarbonDataTypes.LONG
+ case DoubleType => CarbonDataTypes.DOUBLE
+ case FloatType => CarbonDataTypes.FLOAT
+ case DateType => CarbonDataTypes.DATE
+ case BooleanType => CarbonDataTypes.BOOLEAN
+ case TimestampType => CarbonDataTypes.TIMESTAMP
+ case ArrayType(elementType, _) =>
+ CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
+ case StructType(fields) =>
+ val carbonFields = new java.util.ArrayList[CarbonStructField]
+ fields.map { field =>
+ carbonFields.add(
+ new CarbonStructField(
+ field.name,
+ convertSparkToCarbonDataType(field.dataType)))
+ }
+ CarbonDataTypes.createStructType(carbonFields)
+ case NullType => CarbonDataTypes.NULL
+ case decimal: DecimalType =>
+ CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
+ case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
+ }
+ }
+
+ /**
+ * Converts data sources filters to carbon filter predicates.
+ */
+ def createCarbonFilter(schema: StructType,
+ predicate: sources.Filter): Option[CarbonExpression] = {
+ val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+ def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
+ predicate match {
+
+ case sources.EqualTo(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualTo(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.EqualNullSafe(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualNullSafe(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThan(name, value) =>
+ Some(new GreaterThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThan(name, value) =>
+ Some(new LessThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThanOrEqual(name, value) =>
+ Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThanOrEqual(name, value) =>
+ Some(new LessThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.In(name, values) =>
+ if (values.length == 1 && values(0) == null) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new InExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.filterNot(_ == null)
+ .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
+ }
+ case sources.Not(sources.In(name, values)) =>
+ if (values.contains(null)) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new NotInExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+ }
+ case sources.IsNull(name) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.IsNotNull(name) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.And(lhs, rhs) =>
+ (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
+ case sources.Or(lhs, rhs) =>
+ for {
+ lhsFilter <- createFilter(lhs)
+ rhsFilter <- createFilter(rhs)
+ } yield {
+ new OrExpression(lhsFilter, rhsFilter)
+ }
+ case sources.StringStartsWith(name, value) if value.length > 0 =>
+ Some(new StartsWithExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case _ => None
+ }
+ }
+
+ def getCarbonExpression(name: String) = {
+ new CarbonColumnExpression(name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+ }
+
+ def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
+ val dataTypeOfAttribute =
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
+ val dataType =
+ if (Option(value).isDefined &&
+ dataTypeOfAttribute == CarbonDataTypes.STRING &&
+ value.isInstanceOf[Double]) {
+ CarbonDataTypes.DOUBLE
+ } else {
+ dataTypeOfAttribute
+ }
+ new CarbonLiteralExpression(value, dataType)
+ }
+
+ createFilter(predicate)
+ }
+
+ // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
+ // not able find the classes inside scala list and gives ClassNotFoundException.
+ private def convertToJavaList(
+ scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
+ val javaList = new java.util.ArrayList[CarbonExpression]()
+ scalaList.foreach(javaList.add)
+ javaList
+ }
+
+ /**
+ * Create load model for carbon
+ */
+ def prepareLoadModel(options: Map[String, String],
+ dataSchema: StructType): CarbonLoadModel = {
+ val schema = new Schema(dataSchema.fields.map { field =>
+ field.dataType match {
+ case s: StructType =>
+ new Field(field.name,
+ field.dataType.typeName,
+ s.fields
+ .map(f => new datatype.StructField(f.name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
+ case a: ArrayType =>
+ new Field(field.name,
+ field.dataType.typeName,
+ Seq(new datatype.StructField(field.name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
+ case other =>
+ new Field(field.name, field.dataType.simpleString)
+ }
+ })
+ val builder = new CarbonWriterBuilder
+ builder.isTransactionalTable(false)
+ builder.outputPath(options.getOrElse("path", ""))
+ val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
+ if (blockSize.isDefined) {
+ builder.withBlockSize(blockSize.get)
+ }
+ val blockletSize = options.get("table_blockletsize").map(_.toInt)
+ if (blockletSize.isDefined) {
+ builder.withBlockletSize(blockletSize.get)
+ }
+ builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+ CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean)
+ builder.localDictionaryThreshold(
+ options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+ CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
+ builder.sortBy(
+ options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
+ builder.isTransactionalTable(false)
+ builder.uniqueIdentifier(System.currentTimeMillis())
--- End diff --
Support column_meta_cache, cache_level also
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212333785
--- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.carbondata.execution.datasources.readsupport
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport
+
+/**
+ * Read support class which converts carbon row array format to sparks Internal row.
+ */
+class SparkUnsafeRowReadSuport(requiredSchema: StructType)
--- End diff --
Directly implement CarbonReadSupport
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6384/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7983/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6326/
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212348045
--- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
+import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
+import org.apache.carbondata.core.scan.expression.conditional._
+import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
+
+object CarbonSparkDataSourceUtil {
+
+ /**
+ * Convert from carbon datatype to sparks datatype
+ */
+ def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
+ if (CarbonDataTypes.isDecimal(dataType)) {
+ DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
+ dataType.asInstanceOf[CarbonDecimalType].getScale)
+ } else {
+ dataType match {
+ case CarbonDataTypes.STRING => StringType
+ case CarbonDataTypes.SHORT => ShortType
+ case CarbonDataTypes.INT => IntegerType
+ case CarbonDataTypes.LONG => LongType
+ case CarbonDataTypes.DOUBLE => DoubleType
+ case CarbonDataTypes.BOOLEAN => BooleanType
+ case CarbonDataTypes.TIMESTAMP => TimestampType
+ case CarbonDataTypes.DATE => DateType
+ case CarbonDataTypes.VARCHAR => StringType
+ }
+ }
+ }
+
+ /**
+ * Convert from sparks datatype to carbon datatype
+ */
+ def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
+ dataType match {
+ case StringType => CarbonDataTypes.STRING
+ case ShortType => CarbonDataTypes.SHORT
+ case IntegerType => CarbonDataTypes.INT
+ case LongType => CarbonDataTypes.LONG
+ case DoubleType => CarbonDataTypes.DOUBLE
+ case FloatType => CarbonDataTypes.FLOAT
+ case DateType => CarbonDataTypes.DATE
+ case BooleanType => CarbonDataTypes.BOOLEAN
+ case TimestampType => CarbonDataTypes.TIMESTAMP
+ case ArrayType(elementType, _) =>
+ CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
+ case StructType(fields) =>
+ val carbonFields = new java.util.ArrayList[CarbonStructField]
+ fields.map { field =>
+ carbonFields.add(
+ new CarbonStructField(
+ field.name,
+ convertSparkToCarbonDataType(field.dataType)))
+ }
+ CarbonDataTypes.createStructType(carbonFields)
+ case NullType => CarbonDataTypes.NULL
+ case decimal: DecimalType =>
+ CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
+ case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
+ }
+ }
+
+ /**
+ * Converts data sources filters to carbon filter predicates.
+ */
+ def createCarbonFilter(schema: StructType,
+ predicate: sources.Filter): Option[CarbonExpression] = {
+ val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+ def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
+ predicate match {
+
+ case sources.EqualTo(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualTo(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.EqualNullSafe(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualNullSafe(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThan(name, value) =>
+ Some(new GreaterThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThan(name, value) =>
+ Some(new LessThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThanOrEqual(name, value) =>
+ Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThanOrEqual(name, value) =>
+ Some(new LessThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.In(name, values) =>
+ if (values.length == 1 && values(0) == null) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new InExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.filterNot(_ == null)
+ .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
+ }
+ case sources.Not(sources.In(name, values)) =>
+ if (values.contains(null)) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new NotInExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+ }
+ case sources.IsNull(name) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.IsNotNull(name) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.And(lhs, rhs) =>
+ (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
+ case sources.Or(lhs, rhs) =>
+ for {
+ lhsFilter <- createFilter(lhs)
+ rhsFilter <- createFilter(rhs)
+ } yield {
+ new OrExpression(lhsFilter, rhsFilter)
+ }
+ case sources.StringStartsWith(name, value) if value.length > 0 =>
+ Some(new StartsWithExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case _ => None
+ }
+ }
+
+ def getCarbonExpression(name: String) = {
+ new CarbonColumnExpression(name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+ }
+
+ def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
+ val dataTypeOfAttribute =
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
+ val dataType =
+ if (Option(value).isDefined &&
+ dataTypeOfAttribute == CarbonDataTypes.STRING &&
+ value.isInstanceOf[Double]) {
+ CarbonDataTypes.DOUBLE
+ } else {
+ dataTypeOfAttribute
+ }
+ new CarbonLiteralExpression(value, dataType)
+ }
+
+ createFilter(predicate)
+ }
+
+ // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
+ // not able find the classes inside scala list and gives ClassNotFoundException.
+ private def convertToJavaList(
+ scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
+ val javaList = new java.util.ArrayList[CarbonExpression]()
+ scalaList.foreach(javaList.add)
+ javaList
+ }
+
+ /**
+ * Create load model for carbon
+ */
+ def prepareLoadModel(options: Map[String, String],
+ dataSchema: StructType): CarbonLoadModel = {
+ val schema = new Schema(dataSchema.fields.map { field =>
+ field.dataType match {
+ case s: StructType =>
+ new Field(field.name,
+ field.dataType.typeName,
+ s.fields
+ .map(f => new datatype.StructField(f.name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
+ case a: ArrayType =>
+ new Field(field.name,
+ field.dataType.typeName,
+ Seq(new datatype.StructField(field.name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
+ case other =>
+ new Field(field.name, field.dataType.simpleString)
+ }
+ })
+ val builder = new CarbonWriterBuilder
+ builder.isTransactionalTable(false)
+ builder.outputPath(options.getOrElse("path", ""))
+ val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
+ if (blockSize.isDefined) {
+ builder.withBlockSize(blockSize.get)
+ }
+ val blockletSize = options.get("table_blockletsize").map(_.toInt)
+ if (blockletSize.isDefined) {
+ builder.withBlockletSize(blockletSize.get)
+ }
+ builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+ CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean)
+ builder.localDictionaryThreshold(
+ options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+ CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
+ builder.sortBy(
+ options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
+ builder.isTransactionalTable(false)
+ builder.uniqueIdentifier(System.currentTimeMillis())
--- End diff --
ok
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212314571
--- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
+import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
+import org.apache.carbondata.core.scan.expression.conditional._
+import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
+
+object CarbonSparkDataSourceUtil {
+
+ /**
+ * Convert from carbon datatype to sparks datatype
+ */
+ def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
+ if (CarbonDataTypes.isDecimal(dataType)) {
+ DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
+ dataType.asInstanceOf[CarbonDecimalType].getScale)
+ } else {
+ dataType match {
+ case CarbonDataTypes.STRING => StringType
+ case CarbonDataTypes.SHORT => ShortType
+ case CarbonDataTypes.INT => IntegerType
+ case CarbonDataTypes.LONG => LongType
+ case CarbonDataTypes.DOUBLE => DoubleType
+ case CarbonDataTypes.BOOLEAN => BooleanType
+ case CarbonDataTypes.TIMESTAMP => TimestampType
+ case CarbonDataTypes.DATE => DateType
+ case CarbonDataTypes.VARCHAR => StringType
+ }
+ }
+ }
+
+ /**
+ * Convert from sparks datatype to carbon datatype
+ */
+ def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
+ dataType match {
+ case StringType => CarbonDataTypes.STRING
+ case ShortType => CarbonDataTypes.SHORT
+ case IntegerType => CarbonDataTypes.INT
+ case LongType => CarbonDataTypes.LONG
+ case DoubleType => CarbonDataTypes.DOUBLE
+ case FloatType => CarbonDataTypes.FLOAT
+ case DateType => CarbonDataTypes.DATE
+ case BooleanType => CarbonDataTypes.BOOLEAN
+ case TimestampType => CarbonDataTypes.TIMESTAMP
+ case ArrayType(elementType, _) =>
+ CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
+ case StructType(fields) =>
+ val carbonFields = new java.util.ArrayList[CarbonStructField]
+ fields.map { field =>
+ carbonFields.add(
+ new CarbonStructField(
+ field.name,
+ convertSparkToCarbonDataType(field.dataType)))
+ }
+ CarbonDataTypes.createStructType(carbonFields)
+ case NullType => CarbonDataTypes.NULL
+ case decimal: DecimalType =>
+ CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
+ case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
+ }
+ }
+
+ /**
+ * Converts data sources filters to carbon filter predicates.
+ */
+ def createCarbonFilter(schema: StructType,
+ predicate: sources.Filter): Option[CarbonExpression] = {
+ val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+ def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
+ predicate match {
+
+ case sources.EqualTo(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualTo(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.EqualNullSafe(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualNullSafe(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThan(name, value) =>
+ Some(new GreaterThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThan(name, value) =>
+ Some(new LessThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThanOrEqual(name, value) =>
+ Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThanOrEqual(name, value) =>
+ Some(new LessThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.In(name, values) =>
+ if (values.length == 1 && values(0) == null) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new InExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.filterNot(_ == null)
+ .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
+ }
+ case sources.Not(sources.In(name, values)) =>
+ if (values.contains(null)) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new NotInExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+ }
+ case sources.IsNull(name) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.IsNotNull(name) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.And(lhs, rhs) =>
+ (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
+ case sources.Or(lhs, rhs) =>
+ for {
+ lhsFilter <- createFilter(lhs)
+ rhsFilter <- createFilter(rhs)
+ } yield {
+ new OrExpression(lhsFilter, rhsFilter)
+ }
+ case sources.StringStartsWith(name, value) if value.length > 0 =>
+ Some(new StartsWithExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case _ => None
+ }
+ }
+
+ def getCarbonExpression(name: String) = {
--- End diff --
Remove duplicate methods (CarbonFilters also has)
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6740/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6332/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8033/
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212336190
--- Diff: integration/spark-datasource/pom.xml ---
@@ -0,0 +1,279 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-parent</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>carbondata-spark-datasource</artifactId>
+ <name>Apache CarbonData :: Spark Datasource</name>
+
+ <properties>
+ <dev.path>${basedir}/../../dev</dev.path>
+ <jacoco.append>true</jacoco.append>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-hadoop</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-store-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-repl_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/resources</directory>
+ </resource>
+ <resource>
+ <directory>.</directory>
+ <includes>
+ <include>CARBON_SPARK_INTERFACELogResource.properties</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>testCompile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18</version>
+ <!-- Note config is repeated in scalatest config -->
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+ </systemProperties>
+ <failIfNoTests>false</failIfNoTests>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>1.0</version>
+ <!-- Note config is repeated in surefire config -->
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <filereports>CarbonTestSuite.txt</filereports>
+ <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ </argLine>
+ <stderr />
+ <environmentVariables>
+ </environmentVariables>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+ </systemProperties>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>build-all</id>
+ <properties>
+ <spark.version>2.2.1</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ </profile>
+ <profile>
+ <id>sdvtest</id>
+ <properties>
+ <maven.test.skip>true</maven.test.skip>
+ </properties>
+ </profile>
+ <profile>
+ <id>spark-2.1</id>
+ <properties>
+ <spark.version>2.1.0</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/main/spark2.2</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/spark2.1</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark-2.2</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <spark.version>2.2.1</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ <build>
--- End diff --
ok
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212340928
--- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
+import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
+import org.apache.carbondata.core.scan.expression.conditional._
+import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
+
+object CarbonSparkDataSourceUtil {
+
+ /**
+ * Convert from carbon datatype to sparks datatype
+ */
+ def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
+ if (CarbonDataTypes.isDecimal(dataType)) {
+ DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
+ dataType.asInstanceOf[CarbonDecimalType].getScale)
+ } else {
+ dataType match {
+ case CarbonDataTypes.STRING => StringType
+ case CarbonDataTypes.SHORT => ShortType
+ case CarbonDataTypes.INT => IntegerType
+ case CarbonDataTypes.LONG => LongType
+ case CarbonDataTypes.DOUBLE => DoubleType
+ case CarbonDataTypes.BOOLEAN => BooleanType
+ case CarbonDataTypes.TIMESTAMP => TimestampType
+ case CarbonDataTypes.DATE => DateType
+ case CarbonDataTypes.VARCHAR => StringType
+ }
+ }
+ }
+
+ /**
+ * Convert from sparks datatype to carbon datatype
+ */
+ def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
+ dataType match {
+ case StringType => CarbonDataTypes.STRING
+ case ShortType => CarbonDataTypes.SHORT
+ case IntegerType => CarbonDataTypes.INT
+ case LongType => CarbonDataTypes.LONG
+ case DoubleType => CarbonDataTypes.DOUBLE
+ case FloatType => CarbonDataTypes.FLOAT
+ case DateType => CarbonDataTypes.DATE
+ case BooleanType => CarbonDataTypes.BOOLEAN
+ case TimestampType => CarbonDataTypes.TIMESTAMP
+ case ArrayType(elementType, _) =>
+ CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
+ case StructType(fields) =>
+ val carbonFields = new java.util.ArrayList[CarbonStructField]
+ fields.map { field =>
+ carbonFields.add(
+ new CarbonStructField(
+ field.name,
+ convertSparkToCarbonDataType(field.dataType)))
+ }
+ CarbonDataTypes.createStructType(carbonFields)
+ case NullType => CarbonDataTypes.NULL
+ case decimal: DecimalType =>
+ CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
+ case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
+ }
+ }
+
+ /**
+ * Converts data sources filters to carbon filter predicates.
+ */
+ def createCarbonFilter(schema: StructType,
+ predicate: sources.Filter): Option[CarbonExpression] = {
+ val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+ def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
+ predicate match {
+
+ case sources.EqualTo(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualTo(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.EqualNullSafe(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualNullSafe(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThan(name, value) =>
+ Some(new GreaterThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThan(name, value) =>
+ Some(new LessThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThanOrEqual(name, value) =>
+ Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThanOrEqual(name, value) =>
+ Some(new LessThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.In(name, values) =>
+ if (values.length == 1 && values(0) == null) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new InExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.filterNot(_ == null)
+ .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
+ }
+ case sources.Not(sources.In(name, values)) =>
+ if (values.contains(null)) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new NotInExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+ }
+ case sources.IsNull(name) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.IsNotNull(name) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.And(lhs, rhs) =>
+ (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
+ case sources.Or(lhs, rhs) =>
+ for {
+ lhsFilter <- createFilter(lhs)
+ rhsFilter <- createFilter(rhs)
+ } yield {
+ new OrExpression(lhsFilter, rhsFilter)
+ }
+ case sources.StringStartsWith(name, value) if value.length > 0 =>
+ Some(new StartsWithExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case _ => None
+ }
+ }
+
+ def getCarbonExpression(name: String) = {
--- End diff --
ok
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6757/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6340/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6349/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6377/
---
[GitHub] carbondata pull request #2647: [WIP] Added Spark FileFormat interface implem...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r211873452
--- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
@@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path) {
- this(path, null);
+ this(path, (String) null);
+ }
+
+ /**
+ * a new constructor with path
+ *
+ * @param path carbon file path
+ */
+ public LatestFilesReadCommittedScope(String path, String[] subFolders) {
+ Objects.requireNonNull(path);
+ this.carbonFilePath = path;
+ this.subFolders = subFolders;
+ try {
+ takeCarbonIndexFileSnapShot();
+ } catch (IOException ex) {
+ throw new RuntimeException("Error while taking index snapshot", ex);
--- End diff --
Can we use more specific exception
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7997/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6370/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6701/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6693/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7989/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8017/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6354/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6725/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6382/
---
[GitHub] carbondata pull request #2647: [WIP] Added Spark FileFormat interface implem...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r211873872
--- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
@@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path) {
- this(path, null);
+ this(path, (String) null);
+ }
+
+ /**
+ * a new constructor with path
+ *
+ * @param path carbon file path
+ */
+ public LatestFilesReadCommittedScope(String path, String[] subFolders) {
+ Objects.requireNonNull(path);
+ this.carbonFilePath = path;
+ this.subFolders = subFolders;
--- End diff --
suggest change `subFolders` to `dataFolders` and add javadoc to explain it contains carbondata files and carbonindex files in these folders
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7966/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6331/
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212350692
--- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.carbondata.execution.datasources.readsupport
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport
+
+/**
+ * Read support class which converts carbon row array format to sparks Internal row.
+ */
+class SparkUnsafeRowReadSuport(requiredSchema: StructType)
--- End diff --
ok
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8002/
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by KanakaKumar <gi...@git.apache.org>.
Github user KanakaKumar commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212329045
--- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala ---
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql._
+import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparkTypeConverter
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.statusmanager.{FileFormat => CarbonFileFormatVersion}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import org.apache.carbondata.processing.loading.complexobjects.{ArrayObject, StructObject}
+import org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader
+
+/**
+ * Used to read and write data stored in carbondata files to/from the spark execution engine.
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+class SparkCarbonFileFormat extends FileFormat
+ with DataSourceRegister
+ with Logging
+ with Serializable {
+
+ @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ /**
+ * If user does not provide schema while reading the data then spark calls this method to infer
+ * schema from the carbodata files. It reads the schema present in carbondata files and return it.
+ */
+ override def inferSchema(sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ val tablePath = options.get("path") match {
+ case Some(path) => path
+ case _ => FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
+ }
+
+ val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
+ val table = CarbonTable.buildFromTableInfo(tableInfo)
+ var schema = new StructType
+ tableInfo.getFactTable.getListOfColumns.asScala.foreach { col =>
+ // TODO find better way to know its a child
+ if (!col.getColumnName.contains(".")) {
+ schema = schema.add(
+ col.getColumnName,
+ SparkTypeConverter.convertCarbonToSparkDataType(col, table))
+ }
+ }
+ Some(schema)
+ }
+
+
+ /**
+ * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation is
+ * done here.
+ */
+ override def prepareWrite(sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+
+ val conf = job.getConfiguration
+
+ val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
+ model.setLoadWithoutConverterStep(true)
+ CarbonTableOutputFormat.setLoadModel(conf, model)
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ val updatedPath = if (path.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
+ new Path(path).getParent.toString
+ } else {
+ path
+ }
+ context.getConfiguration.set("carbon.outputformat.writepath", updatedPath)
+ context.getConfiguration.set("carbon.outputformat.taskno", System.nanoTime() + "")
+ new CarbonOutputWriter(path, context, dataSchema.fields)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ CarbonTablePath.CARBON_DATA_EXT
+ }
+ }
+ }
+
+ /**
+ * It is a just class to make compile between spark 2.1 and 2.2
+ */
+ private trait AbstractCarbonOutputWriter {
+ def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
+ def writeInternal(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+ def write(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+ def writeCarbon(row: InternalRow): Unit
+ }
+
+
+ /**
+ * Writer class for carbondata files
+ */
+ private class CarbonOutputWriter(path: String,
+ context: TaskAttemptContext,
+ fieldTypes: Array[StructField]) extends OutputWriter with AbstractCarbonOutputWriter {
+
+ private val writable = new ObjectArrayWritable
+
+ private val cutOffDate = Integer.MAX_VALUE >> 1
+
+ private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] =
+ new CarbonTableOutputFormat().getRecordWriter(context)
+
+ /**
+ * Write sparks internal row to carbondata record writer
+ */
+ def writeCarbon(row: InternalRow): Unit = {
+ val data: Array[AnyRef] = extractData(row, fieldTypes)
+ writable.set(data)
+ recordWriter.write(NullWritable.get(), writable)
+ }
+
+ override def writeInternal(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+
+ /**
+ * Convert the internal row to carbondata understandable object
+ */
+ private def extractData(row: InternalRow, fieldTypes: Array[StructField]): Array[AnyRef] = {
+ val data = new Array[AnyRef](fieldTypes.length)
+ var i = 0
+ while (i < fieldTypes.length) {
+ if (!row.isNullAt(i)) {
+ fieldTypes(i).dataType match {
+ case StringType =>
+ data(i) = row.getString(i)
+ case d: DecimalType =>
+ data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+ case s: StructType =>
+ data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
+ case s: ArrayType =>
+ data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+ case d: DateType =>
+ data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
+ case d: TimestampType =>
+ data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef]
+ case other =>
+ data(i) = row.get(i, other)
+ }
+ } else {
+ setNull(fieldTypes(i).dataType, data, i)
+ }
+ i += 1
+ }
+ data
+ }
+
+ private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = {
+ dataType match {
+ case d: DateType =>
+ // 1 as treated as null in carbon
+ data(i) = 1.asInstanceOf[AnyRef]
+ case _ =>
+ }
+ }
+
+ /**
+ * Convert the internal row to carbondata understandable object
+ */
+ private def extractData(row: ArrayData, dataType: DataType): Array[AnyRef] = {
+ val data = new Array[AnyRef](row.numElements())
+ var i = 0
+ while (i < data.length) {
+ if (!row.isNullAt(i)) {
+ dataType match {
+ case d: DecimalType =>
+ data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+ case s: StructType =>
+ data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
+ case s: ArrayType =>
+ data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+ case d: DateType =>
+ data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
+ case other => data(i) = row.get(i, dataType)
+ }
+ } else {
+ setNull(dataType, data, i)
+ }
+ i += 1
+ }
+ data
+ }
+
+ override def close(): Unit = {
+ recordWriter.close(context)
+ }
+ }
+
+ override def shortName(): String = "carbon"
+
+ override def toString: String = "carbon"
+
+ override def hashCode(): Int = getClass.hashCode()
+
+ override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonFileFormat]
+
+ /**
+ * Whether to support vector reader while reading data.
+ * In case of complex types it is not required to support it
+ */
+ private def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = {
+ val vectorizedReader = {
+ if (sparkSession.sqlContext.sparkSession.conf
+ .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
+ sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
+ System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else {
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+ CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+ }
+ }
+ vectorizedReader.toBoolean && schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
+
+ /**
+ * Returns whether this format support returning columnar batch or not.
+ */
+ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
+ val conf = sparkSession.sessionState.conf
+ conf.wholeStageEnabled &&
+ schema.length <= conf.wholeStageMaxNumFields &&
+ schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
+ /**
+ * Returns a function that can be used to read a single carbondata file in as an
+ * Iterator of InternalRow.
+ */
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ val filter: Option[CarbonExpression] = filters.flatMap { filter =>
+ CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
+ }.reduceOption(new AndExpression(_, _))
+
+ val projection = requiredSchema.map(_.name).toArray
+ val carbonProjection = new CarbonProjection
+ projection.foreach(carbonProjection.addColumn)
+
+ var supportBatchValue: Boolean = false
+
+ val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+ val readVector = supportVector(sparkSession, resultSchema)
+ if (readVector) {
+ supportBatchValue = supportBatch(sparkSession, resultSchema)
+ }
+ val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
+ CarbonInputFormat
+ .setTableInfo(hadoopConf, model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
+ CarbonInputFormat.setTransactionalTable(hadoopConf, false)
+ CarbonInputFormat.setColumnProjection(hadoopConf, carbonProjection)
+ filter match {
+ case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
+ case None => None
+ }
+ val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+ file: PartitionedFile => {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ if (!(file.filePath.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
--- End diff --
Can we use just check with carbondata extn?
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/carbondata/pull/2647
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6713/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6366/
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212250514
--- Diff: core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---
@@ -66,7 +68,23 @@ public LatestFilesReadCommittedScope(String path, String segmentId) {
* @param path carbon file path
*/
public LatestFilesReadCommittedScope(String path) {
- this(path, null);
+ this(path, (String) null);
+ }
+
+ /**
+ * a new constructor with path
+ *
+ * @param path carbon file path
+ */
--- End diff --
ok
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6722/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8022/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6324/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6376/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6760/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2647
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6378/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6747/
---
[GitHub] carbondata pull request #2647: [CARBONDATA-2872] Added Spark FileFormat inte...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2647#discussion_r212350177
--- Diff: integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala ---
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql._
+import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparkTypeConverter
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.statusmanager.{FileFormat => CarbonFileFormatVersion}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import org.apache.carbondata.processing.loading.complexobjects.{ArrayObject, StructObject}
+import org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader
+
+/**
+ * Used to read and write data stored in carbondata files to/from the spark execution engine.
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+class SparkCarbonFileFormat extends FileFormat
+ with DataSourceRegister
+ with Logging
+ with Serializable {
+
+ @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ /**
+ * If user does not provide schema while reading the data then spark calls this method to infer
+ * schema from the carbodata files. It reads the schema present in carbondata files and return it.
+ */
+ override def inferSchema(sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ val tablePath = options.get("path") match {
+ case Some(path) => path
+ case _ => FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
+ }
+
+ val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
+ val table = CarbonTable.buildFromTableInfo(tableInfo)
+ var schema = new StructType
+ tableInfo.getFactTable.getListOfColumns.asScala.foreach { col =>
+ // TODO find better way to know its a child
+ if (!col.getColumnName.contains(".")) {
+ schema = schema.add(
+ col.getColumnName,
+ SparkTypeConverter.convertCarbonToSparkDataType(col, table))
+ }
+ }
+ Some(schema)
+ }
+
+
+ /**
+ * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation is
+ * done here.
+ */
+ override def prepareWrite(sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+
+ val conf = job.getConfiguration
+
+ val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
+ model.setLoadWithoutConverterStep(true)
+ CarbonTableOutputFormat.setLoadModel(conf, model)
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ val updatedPath = if (path.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
+ new Path(path).getParent.toString
+ } else {
+ path
+ }
+ context.getConfiguration.set("carbon.outputformat.writepath", updatedPath)
+ context.getConfiguration.set("carbon.outputformat.taskno", System.nanoTime() + "")
+ new CarbonOutputWriter(path, context, dataSchema.fields)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ CarbonTablePath.CARBON_DATA_EXT
+ }
+ }
+ }
+
+ /**
+ * It is a just class to make compile between spark 2.1 and 2.2
+ */
+ private trait AbstractCarbonOutputWriter {
+ def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
+ def writeInternal(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+ def write(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+ def writeCarbon(row: InternalRow): Unit
+ }
+
+
+ /**
+ * Writer class for carbondata files
+ */
+ private class CarbonOutputWriter(path: String,
+ context: TaskAttemptContext,
+ fieldTypes: Array[StructField]) extends OutputWriter with AbstractCarbonOutputWriter {
+
+ private val writable = new ObjectArrayWritable
+
+ private val cutOffDate = Integer.MAX_VALUE >> 1
+
+ private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] =
+ new CarbonTableOutputFormat().getRecordWriter(context)
+
+ /**
+ * Write sparks internal row to carbondata record writer
+ */
+ def writeCarbon(row: InternalRow): Unit = {
+ val data: Array[AnyRef] = extractData(row, fieldTypes)
+ writable.set(data)
+ recordWriter.write(NullWritable.get(), writable)
+ }
+
+ override def writeInternal(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+
+ /**
+ * Convert the internal row to carbondata understandable object
+ */
+ private def extractData(row: InternalRow, fieldTypes: Array[StructField]): Array[AnyRef] = {
+ val data = new Array[AnyRef](fieldTypes.length)
+ var i = 0
+ while (i < fieldTypes.length) {
+ if (!row.isNullAt(i)) {
+ fieldTypes(i).dataType match {
+ case StringType =>
+ data(i) = row.getString(i)
+ case d: DecimalType =>
+ data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+ case s: StructType =>
+ data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
+ case s: ArrayType =>
+ data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+ case d: DateType =>
+ data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
+ case d: TimestampType =>
+ data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef]
+ case other =>
+ data(i) = row.get(i, other)
+ }
+ } else {
+ setNull(fieldTypes(i).dataType, data, i)
+ }
+ i += 1
+ }
+ data
+ }
+
+ private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = {
+ dataType match {
+ case d: DateType =>
+ // 1 as treated as null in carbon
+ data(i) = 1.asInstanceOf[AnyRef]
+ case _ =>
+ }
+ }
+
+ /**
+ * Convert the internal row to carbondata understandable object
+ */
+ private def extractData(row: ArrayData, dataType: DataType): Array[AnyRef] = {
+ val data = new Array[AnyRef](row.numElements())
+ var i = 0
+ while (i < data.length) {
+ if (!row.isNullAt(i)) {
+ dataType match {
+ case d: DecimalType =>
+ data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+ case s: StructType =>
+ data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
+ case s: ArrayType =>
+ data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+ case d: DateType =>
+ data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
+ case other => data(i) = row.get(i, dataType)
+ }
+ } else {
+ setNull(dataType, data, i)
+ }
+ i += 1
+ }
+ data
+ }
+
+ override def close(): Unit = {
+ recordWriter.close(context)
+ }
+ }
+
+ override def shortName(): String = "carbon"
+
+ override def toString: String = "carbon"
+
+ override def hashCode(): Int = getClass.hashCode()
+
+ override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonFileFormat]
+
+ /**
+ * Whether to support vector reader while reading data.
+ * In case of complex types it is not required to support it
+ */
+ private def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = {
+ val vectorizedReader = {
+ if (sparkSession.sqlContext.sparkSession.conf
+ .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
+ sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
+ System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else {
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+ CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+ }
+ }
+ vectorizedReader.toBoolean && schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
+
+ /**
+ * Returns whether this format support returning columnar batch or not.
+ */
+ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
+ val conf = sparkSession.sessionState.conf
+ conf.wholeStageEnabled &&
+ schema.length <= conf.wholeStageMaxNumFields &&
+ schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
+ /**
+ * Returns a function that can be used to read a single carbondata file in as an
+ * Iterator of InternalRow.
+ */
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ val filter: Option[CarbonExpression] = filters.flatMap { filter =>
+ CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
+ }.reduceOption(new AndExpression(_, _))
+
+ val projection = requiredSchema.map(_.name).toArray
+ val carbonProjection = new CarbonProjection
+ projection.foreach(carbonProjection.addColumn)
+
+ var supportBatchValue: Boolean = false
+
+ val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+ val readVector = supportVector(sparkSession, resultSchema)
+ if (readVector) {
+ supportBatchValue = supportBatch(sparkSession, resultSchema)
+ }
+ val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
+ CarbonInputFormat
+ .setTableInfo(hadoopConf, model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
+ CarbonInputFormat.setTransactionalTable(hadoopConf, false)
+ CarbonInputFormat.setColumnProjection(hadoopConf, carbonProjection)
+ filter match {
+ case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
+ case None => None
+ }
+ val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+ file: PartitionedFile => {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ if (!(file.filePath.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
--- End diff --
ok
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7977/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6709/
---
[GitHub] carbondata issue #2647: [CARBONDATA-2872] Added Spark FileFormat interface i...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8039/
---
[GitHub] carbondata issue #2647: [WIP] Added Spark FileFormat interface implementatio...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2647
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7969/
---