You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/08/24 13:01:35 UTC
[3/4] carbondata git commit: [CARBONDATA-2872] Added Spark FileFormat
interface implementation in Carbon
http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml
new file mode 100644
index 0000000..38cf629
--- /dev/null
+++ b/integration/spark-datasource/pom.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-parent</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>carbondata-spark-datasource</artifactId>
+ <name>Apache CarbonData :: Spark Datasource</name>
+
+ <properties>
+ <dev.path>${basedir}/../../dev</dev.path>
+ <jacoco.append>true</jacoco.append>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-hadoop</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-store-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-repl_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/resources</directory>
+ </resource>
+ <resource>
+ <directory>.</directory>
+ <includes>
+ <include>CARBON_SPARK_INTERFACELogResource.properties</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>testCompile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18</version>
+ <!-- Note config is repeated in scalatest config -->
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+ </systemProperties>
+ <failIfNoTests>false</failIfNoTests>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>1.0</version>
+ <!-- Note config is repeated in surefire config -->
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <filereports>CarbonTestSuite.txt</filereports>
+ <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ </argLine>
+ <stderr />
+ <environmentVariables>
+ </environmentVariables>
+ <systemProperties>
+ <java.awt.headless>true</java.awt.headless>
+ <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+ </systemProperties>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>build-all</id>
+ <properties>
+ <spark.version>2.2.1</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.11.8</scala.version>
+ </properties>
+ </profile>
+ <profile>
+ <id>sdvtest</id>
+ <properties>
+ <maven.test.skip>true</maven.test.skip>
+ </properties>
+ </profile>
+ </profiles>
+</project>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
new file mode 100644
index 0000000..7e38691
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.converter;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.util.DataTypeConverter;
+
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Convert java data type to spark data type
+ */
+public final class SparkDataTypeConverterImpl implements DataTypeConverter, Serializable {
+
+ private static final long serialVersionUID = -4379212832935070583L;
+
+ @Override
+ public Object convertFromStringToDecimal(Object data) {
+ BigDecimal javaDecVal = new BigDecimal(data.toString());
+ return org.apache.spark.sql.types.Decimal.apply(javaDecVal);
+ }
+
+ @Override
+ public Object convertFromBigDecimalToDecimal(Object data) {
+ if (null == data) {
+ return null;
+ }
+ return org.apache.spark.sql.types.Decimal.apply((BigDecimal)data);
+ }
+
+ @Override
+ public Object convertFromDecimalToBigDecimal(Object data) {
+ return ((org.apache.spark.sql.types.Decimal) data).toJavaBigDecimal();
+ }
+
+ @Override
+ public byte[] convertFromStringToByte(Object data) {
+ if (null == data) {
+ return null;
+ }
+ return UTF8String.fromString((String) data).getBytes();
+ }
+
+ @Override
+ public Object convertFromByteToUTF8String(byte[] data) {
+ if (null == data) {
+ return null;
+ }
+ return UTF8String.fromBytes(data);
+ }
+
+ @Override
+ public byte[] convertFromByteToUTF8Bytes(byte[] data) {
+ return UTF8String.fromBytes(data).getBytes();
+ }
+
+ @Override
+ public Object convertFromStringToUTF8String(Object data) {
+ if (null == data) {
+ return null;
+ }
+ return UTF8String.fromString((String) data);
+ }
+
+ @Override
+ public Object wrapWithGenericArrayData(Object data) {
+ return new GenericArrayData(data);
+ }
+
+ @Override
+ public Object wrapWithGenericRow(Object[] fields) {
+ return new GenericInternalRow(fields);
+ }
+
+ private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
+ DataType carbonDataType) {
+ if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
+ return DataTypes.StringType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
+ return DataTypes.ShortType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
+ return DataTypes.IntegerType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+ return DataTypes.LongType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
+ return DataTypes.DoubleType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
+ return DataTypes.BooleanType;
+ } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
+ return DataTypes.createDecimalType();
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
+ return DataTypes.TimestampType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
+ return DataTypes.DateType;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * convert from CarbonColumn array to Spark's StructField array
+ */
+ @Override
+ public Object[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) {
+ StructField[] fields = new StructField[carbonColumns.length];
+ for (int i = 0; i < carbonColumns.length; i++) {
+ CarbonColumn carbonColumn = carbonColumns[i];
+ if (carbonColumn.isDimension()) {
+ if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(carbonColumn.getDataType());
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(generator.getReturnType()), true, null);
+ } else if (!carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+ } else if (carbonColumn.isComplex()) {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+ } else {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(
+ org.apache.carbondata.core.metadata.datatype.DataTypes.INT), true, null);
+ }
+ } else if (carbonColumn.isMeasure()) {
+ DataType dataType = carbonColumn.getDataType();
+ if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN
+ || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT
+ || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT
+ || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(dataType), true, null);
+ } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
+ CarbonMeasure measure = (CarbonMeasure) carbonColumn;
+ fields[i] = new StructField(carbonColumn.getColName(),
+ new DecimalType(measure.getPrecision(), measure.getScale()), true, null);
+ } else {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(
+ org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE), true, null);
+ }
+ }
+ }
+ return fields;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
new file mode 100644
index 0000000..7f1e577
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.vectorreader;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.io.api.Binary;
+
+public class CarbonDictionaryWrapper extends Dictionary {
+
+ private Binary[] binaries;
+
+ CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
+ super(encoding);
+ binaries = new Binary[dictionary.getDictionarySize()];
+ for (int i = 0; i < binaries.length; i++) {
+ binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i));
+ }
+ }
+
+ @Override public int getMaxId() {
+ return binaries.length - 1;
+ }
+
+ @Override public Binary decodeToBinary(int id) {
+ return binaries[id];
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
new file mode 100644
index 0000000..4f34650
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.vectorreader;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.types.Decimal;
+
+class ColumnarVectorWrapper implements CarbonColumnVector {
+
+ private ColumnVector columnVector;
+
+ private boolean[] filteredRows;
+
+ private int counter;
+
+ private boolean filteredRowsExist;
+
+ private DataType blockDataType;
+
+ private CarbonColumnVector dictionaryVector;
+
+ ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
+ this.columnVector = columnVector;
+ this.filteredRows = filteredRows;
+ if (columnVector.getDictionaryIds() != null) {
+ this.dictionaryVector =
+ new ColumnarVectorWrapper(columnVector.getDictionaryIds(), filteredRows);
+ }
+ }
+
+ @Override public void putBoolean(int rowId, boolean value) {
+ if (!filteredRows[rowId]) {
+ columnVector.putBoolean(counter++, value);
+ }
+ }
+
+ @Override public void putFloat(int rowId, float value) {
+ if (!filteredRows[rowId]) {
+ columnVector.putFloat(counter++, value);
+ }
+ }
+
+ @Override public void putShort(int rowId, short value) {
+ if (!filteredRows[rowId]) {
+ columnVector.putShort(counter++, value);
+ }
+ }
+
+ @Override public void putShorts(int rowId, int count, short value) {
+ if (filteredRowsExist) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putShort(counter++, value);
+ }
+ rowId++;
+ }
+ } else {
+ columnVector.putShorts(rowId, count, value);
+ }
+ }
+
+ @Override public void putInt(int rowId, int value) {
+ if (!filteredRows[rowId]) {
+ columnVector.putInt(counter++, value);
+ }
+ }
+
+ @Override public void putInts(int rowId, int count, int value) {
+ if (filteredRowsExist) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putInt(counter++, value);
+ }
+ rowId++;
+ }
+ } else {
+ columnVector.putInts(rowId, count, value);
+ }
+ }
+
+ @Override public void putLong(int rowId, long value) {
+ if (!filteredRows[rowId]) {
+ columnVector.putLong(counter++, value);
+ }
+ }
+
+ @Override public void putLongs(int rowId, int count, long value) {
+ if (filteredRowsExist) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putLong(counter++, value);
+ }
+ rowId++;
+ }
+ } else {
+ columnVector.putLongs(rowId, count, value);
+ }
+ }
+
+ @Override public void putDecimal(int rowId, BigDecimal value, int precision) {
+ if (!filteredRows[rowId]) {
+ Decimal toDecimal = Decimal.apply(value);
+ columnVector.putDecimal(counter++, toDecimal, precision);
+ }
+ }
+
+ @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
+ Decimal decimal = Decimal.apply(value);
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putDecimal(counter++, decimal, precision);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putDouble(int rowId, double value) {
+ if (!filteredRows[rowId]) {
+ columnVector.putDouble(counter++, value);
+ }
+ }
+
+ @Override public void putDoubles(int rowId, int count, double value) {
+ if (filteredRowsExist) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putDouble(counter++, value);
+ }
+ rowId++;
+ }
+ } else {
+ columnVector.putDoubles(rowId, count, value);
+ }
+ }
+
+ @Override public void putBytes(int rowId, byte[] value) {
+ if (!filteredRows[rowId]) {
+ columnVector.putByteArray(counter++, value);
+ }
+ }
+
+ @Override public void putBytes(int rowId, int count, byte[] value) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putByteArray(counter++, value);
+ }
+ rowId++;
+ }
+ }
+
+ @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+ if (!filteredRows[rowId]) {
+ columnVector.putByteArray(counter++, value, offset, length);
+ }
+ }
+
+ @Override public void putNull(int rowId) {
+ if (!filteredRows[rowId]) {
+ columnVector.putNull(counter++);
+ }
+ }
+
+ @Override public void putNulls(int rowId, int count) {
+ if (filteredRowsExist) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putNull(counter++);
+ }
+ rowId++;
+ }
+ } else {
+ columnVector.putNulls(rowId, count);
+ }
+ }
+
+ @Override public void putNotNull(int rowId) {
+ if (!filteredRows[rowId]) {
+ columnVector.putNotNull(counter++);
+ }
+ }
+
+ @Override public void putNotNull(int rowId, int count) {
+ if (filteredRowsExist) {
+ for (int i = 0; i < count; i++) {
+ if (!filteredRows[rowId]) {
+ columnVector.putNotNull(counter++);
+ }
+ rowId++;
+ }
+ } else {
+ columnVector.putNotNulls(rowId, count);
+ }
+ }
+
+ @Override public boolean isNull(int rowId) {
+ return columnVector.isNullAt(rowId);
+ }
+
+ @Override public void putObject(int rowId, Object obj) {
+ //TODO handle complex types
+ }
+
+ @Override public Object getData(int rowId) {
+ //TODO handle complex types
+ return null;
+ }
+
+ @Override public void reset() {
+ counter = 0;
+ filteredRowsExist = false;
+ if (null != dictionaryVector) {
+ dictionaryVector.reset();
+ }
+ }
+
+ @Override public DataType getType() {
+ return CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(columnVector.dataType());
+ }
+
+ @Override
+ public DataType getBlockDataType() {
+ return blockDataType;
+ }
+
+ @Override
+ public void setBlockDataType(DataType blockDataType) {
+ this.blockDataType = blockDataType;
+ }
+
+ @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
+ this.filteredRowsExist = filteredRowsExist;
+ }
+
+ @Override public void setDictionary(CarbonDictionary dictionary) {
+ if (dictionary == null) {
+ columnVector.setDictionary(null);
+ } else {
+ columnVector.setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary));
+ }
+ }
+
+ @Override public boolean hasDictionary() {
+ return columnVector.hasDictionary();
+ }
+
+ @Override public CarbonColumnVector getDictionaryVector() {
+ return dictionaryVector;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
new file mode 100644
index 0000000..2c53d20
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.vectorreader;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.AbstractRecordReader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.InputMetricsStats;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
+ * carbondata column APIs and fills the data directly into columns.
+ */
+public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(VectorizedCarbonRecordReader.class.getName());
+
+ private int batchIdx = 0;
+
+ private int numBatched = 0;
+
+ private ColumnarBatch columnarBatch;
+
+ private CarbonColumnarBatch carbonColumnarBatch;
+
+ /**
+ * If true, this class returns batches instead of rows.
+ */
+ private boolean returnColumnarBatch;
+
+ private boolean[] isNoDictStringField;
+
+ /**
+ * The default config on whether columnarBatch should be onheap.
+ */
+ private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+
+ private QueryModel queryModel;
+
+ private AbstractDetailQueryResultIterator iterator;
+
+ private QueryExecutor queryExecutor;
+
+ private InputMetricsStats inputMetricsStats;
+
+ public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats,
+ String enableBatch) {
+ this.queryModel = queryModel;
+ this.inputMetricsStats = inputMetricsStats;
+ if (enableBatch.equals("true")) {
+ enableReturningBatches();
+ }
+ }
+
+
+ /*
+ * Can be called before any rows are returned to enable returning columnar batches directly.
+ */
+ public void enableReturningBatches() {
+ returnColumnarBatch = true;
+ }
+
+ /**
+ * Implementation of RecordReader API.
+ */
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException, UnsupportedOperationException {
+ // The input split can contain single HDFS block or multiple blocks, so firstly get all the
+ // blocks and then set them in the query model.
+ List<CarbonInputSplit> splitList;
+ if (inputSplit instanceof CarbonInputSplit) {
+ splitList = new ArrayList<>(1);
+ splitList.add((CarbonInputSplit) inputSplit);
+ } else if (inputSplit instanceof CarbonMultiBlockSplit) {
+ // contains multiple blocks, this is an optimization for concurrent query.
+ CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit;
+ splitList = multiBlockSplit.getAllSplits();
+ } else {
+ throw new RuntimeException("unsupported input split type: " + inputSplit);
+ }
+ List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+ queryModel.setTableBlockInfos(tableBlockInfoList);
+ queryModel.setVectorReader(true);
+ try {
+ queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
+ } catch (QueryExecutionException e) {
+ if (ExceptionUtils.indexOfThrowable(e, FileNotFoundException.class) > 0) {
+ LOGGER.error(e);
+ throw new InterruptedException(
+ "Insert overwrite may be in progress.Please check " + e.getMessage());
+ }
+ throw new InterruptedException(e.getMessage());
+ } catch (Exception e) {
+ if (ExceptionUtils.indexOfThrowable(e, FileNotFoundException.class) > 0) {
+ LOGGER.error(e);
+ throw new InterruptedException(
+ "Insert overwrite may be in progress.Please check " + e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ logStatistics(rowCount, queryModel.getStatisticsRecorder());
+ if (columnarBatch != null) {
+ columnarBatch.close();
+ columnarBatch = null;
+ }
+ // clear dictionary cache
+ Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+ if (null != columnToDictionaryMapping) {
+ for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
+ CarbonUtil.clearDictionaryCache(entry.getValue());
+ }
+ }
+ try {
+ queryExecutor.finish();
+ } catch (QueryExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ resultBatch();
+
+ if (returnColumnarBatch) {
+ return nextBatch();
+ }
+
+ if (batchIdx >= numBatched) {
+ if (!nextBatch()) return false;
+ }
+ ++batchIdx;
+ return true;
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException, InterruptedException {
+ if (returnColumnarBatch) {
+ int value = columnarBatch.numValidRows();
+ rowCount += value;
+ if (inputMetricsStats != null) {
+ inputMetricsStats.incrementRecordRead((long) value);
+ }
+ return columnarBatch;
+ }
+ rowCount += 1;
+ return columnarBatch.getRow(batchIdx - 1);
+ }
+
+ @Override
+ public Void getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ // TODO : Implement it based on total number of rows it is going to retrive.
+ return 0;
+ }
+
+ /**
+ * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
+ * This object is reused. Calling this enables the vectorized reader. This should be called
+ * before any calls to nextKeyValue/nextBatch.
+ */
+
+ public void initBatch(MemoryMode memMode, StructType partitionColumns,
+ InternalRow partitionValues) {
+ List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
+ List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
+ StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
+ this.isNoDictStringField = new boolean[queryDimension.size() + queryMeasures.size()];
+ for (int i = 0; i < queryDimension.size(); i++) {
+ ProjectionDimension dim = queryDimension.get(i);
+ if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(dim.getDimension().getDataType());
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+ CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null);
+ } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+ if (dim.getDimension().getDataType() == DataTypes.STRING
+ || dim.getDimension().getDataType() == DataTypes.VARCHAR) {
+ this.isNoDictStringField[dim.getOrdinal()] = true;
+ }
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+ CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true,
+ null);
+ } else if (dim.getDimension().isComplex()) {
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+ CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true,
+ null);
+ } else {
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+ CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(DataTypes.INT), true, null);
+ }
+ }
+
+ for (int i = 0; i < queryMeasures.size(); i++) {
+ ProjectionMeasure msr = queryMeasures.get(i);
+ DataType dataType = msr.getMeasure().getDataType();
+ if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT ||
+ dataType == DataTypes.INT || dataType == DataTypes.LONG) {
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
+ CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
+ null);
+ } else if (DataTypes.isDecimal(dataType)) {
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
+ new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()), true,
+ null);
+ } else {
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
+ CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null);
+ }
+ }
+
+ StructType schema = new StructType(fields);
+ if (partitionColumns != null) {
+ for (StructField field : partitionColumns.fields()) {
+ schema = schema.add(field);
+ }
+ }
+ columnarBatch = ColumnarBatch.allocate(schema, memMode);
+ if (partitionColumns != null) {
+ int partitionIdx = fields.length;
+ for (int i = 0; i < partitionColumns.fields().length; i++) {
+ ColumnVectorUtils.populate(columnarBatch.column(i + partitionIdx), partitionValues, i);
+ columnarBatch.column(i + partitionIdx).setIsConstant();
+ }
+ }
+ CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+ boolean[] filteredRows = new boolean[columnarBatch.capacity()];
+ for (int i = 0; i < fields.length; i++) {
+ if (isNoDictStringField[i]) {
+ columnarBatch.column(i).reserveDictionaryIds(columnarBatch.capacity());
+ }
+ vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i), filteredRows);
+ }
+ carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
+ }
+
+ private void initBatch() {
+ initBatch(DEFAULT_MEMORY_MODE, new StructType(), InternalRow.empty());
+ }
+
+ private void resultBatch() {
+ if (columnarBatch == null) initBatch();
+ }
+
+
+
+ /**
+ * Advances to the next batch of rows. Returns false if there are no more.
+ */
+ private boolean nextBatch() {
+ if (null != isNoDictStringField) {
+ for (int i = 0; i < isNoDictStringField.length; i++) {
+ if (isNoDictStringField[i]) {
+ columnarBatch.column(i).getDictionaryIds().reset();
+ }
+ }
+ }
+ columnarBatch.reset();
+ carbonColumnarBatch.reset();
+ if (iterator.hasNext()) {
+ iterator.processNextBatch(carbonColumnarBatch);
+ int actualSize = carbonColumnarBatch.getActualSize();
+ columnarBatch.setNumRows(actualSize);
+ numBatched = actualSize;
+ batchIdx = 0;
+ return true;
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
new file mode 100644
index 0000000..8471181
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import java.io.IOException
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.{InMemoryFileIndex, _}
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, HDFSCarbonFile}
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
+import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.CarbonInputSplit
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
+
+/**
+ * Its a custom implementation which uses carbon's driver pruning feature to prune carbondata files
+ * using carbonindex.
+ */
+class CarbonFileIndex(
+ sparkSession: SparkSession,
+ dataSchema: StructType,
+ parameters: Map[String, String],
+ fileIndex: FileIndex)
+ extends FileIndex with AbstractCarbonFileIndex {
+
+ override def rootPaths: Seq[Path] = fileIndex.rootPaths
+
+ override def inputFiles: Array[String] = fileIndex.inputFiles
+
+ override def refresh(): Unit = fileIndex.refresh()
+
+ override def sizeInBytes: Long = fileIndex.sizeInBytes
+
+ override def partitionSchema: StructType = fileIndex.partitionSchema
+
+ /**
+ * It lists the pruned files after applying partition and data filters.
+ *
+ * @param partitionFilters
+ * @param dataFilters
+ * @return
+ */
+ override def listFiles(partitionFilters: Seq[Expression],
+ dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+ val method = fileIndex.getClass.getMethods.find(_.getName == "listFiles").get
+ val directories =
+ method.invoke(
+ fileIndex,
+ partitionFilters,
+ dataFilters).asInstanceOf[Seq[PartitionDirectory]]
+ prune(dataFilters, directories)
+ }
+
+ private def prune(dataFilters: Seq[Expression],
+ directories: Seq[PartitionDirectory]) = {
+ val tablePath = parameters.get("path")
+ if (tablePath.nonEmpty) {
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ // convert t sparks source filter
+ val filters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
+
+ // convert to carbon filter expressions
+ val filter: Option[CarbonExpression] = filters.flatMap { filter =>
+ CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
+ }.reduceOption(new AndExpression(_, _))
+ val model = CarbonSparkDataSourceUtil.prepareLoadModel(parameters, dataSchema)
+ CarbonInputFormat.setTableInfo(
+ hadoopConf,
+ model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
+ CarbonInputFormat.setTransactionalTable(hadoopConf, false)
+ var totalFiles = 0
+ val indexFiles = directories.flatMap { dir =>
+ totalFiles += dir.files.length
+ dir.files.filter{f =>
+ f.getPath.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+ f.getPath.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)}.
+ map(new HDFSCarbonFile(_))
+ }.toArray.asInstanceOf[Array[CarbonFile]]
+ if (indexFiles.length == 0 && totalFiles > 0) {
+ throw new IOException("No Index files are present in the table location :" + tablePath.get)
+ }
+ CarbonInputFormat.setReadCommittedScope(
+ hadoopConf,
+ new LatestFilesReadCommittedScope(indexFiles))
+ filter match {
+ case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
+ case None => None
+ }
+ val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val splits = format.getSplits(Job.getInstance(jobConf))
+ .asInstanceOf[util.List[CarbonInputSplit]].asScala
+ val prunedDirs = directories.map { dir =>
+ val files = dir.files
+ .filter(d => splits.exists(_.getBlockPath.equalsIgnoreCase(d.getPath.getName)))
+ PartitionDirectory(dir.values, files)
+ }
+ prunedDirs
+ } else {
+ directories
+ }
+ }
+
+ override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
+ val method = fileIndex.getClass.getMethods.find(_.getName == "listFiles").get
+ val directories =
+ method.invoke(fileIndex, filters).asInstanceOf[Seq[PartitionDirectory]]
+ prune(filters, directories)
+ }
+}
+
+/**
+ * It is a just class to make compile between spark 2.1 and 2.2
+ */
+trait AbstractCarbonFileIndex {
+
+ def listFiles(partitionFilters: Seq[Expression],
+ dataFilters: Seq[Expression]): Seq[PartitionDirectory]
+
+ def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
new file mode 100644
index 0000000..ed67f48
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex, InsertIntoHadoopFsRelationCommand, LogicalRelation}
+import org.apache.spark.sql.sources.BaseRelation
+
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Rule to replace FileIndex with CarbonFileIndex for better driver pruning.
+ */
+class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] {
+
+ /**
+ * This property creates subfolder for every load
+ */
+ private val createSubFolder = CarbonProperties.getInstance()
+ .getProperty("carbonfileformat.create.folder.perload", "false").toBoolean
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ val transformedPlan = plan.transform {
+ case l: LogicalRelation
+ if l.relation.isInstanceOf[HadoopFsRelation] &&
+ l.relation.asInstanceOf[HadoopFsRelation].fileFormat.toString.equals("carbon") &&
+ !l.relation.asInstanceOf[HadoopFsRelation].location.isInstanceOf[CarbonFileIndex] =>
+ val fsRelation = l.relation.asInstanceOf[HadoopFsRelation]
+ val fileIndex = fsRelation.location
+ val carbonFileIndex = new CarbonFileIndex(fsRelation.sparkSession,
+ fsRelation.schema,
+ fsRelation.options,
+ updateFileIndex(fileIndex, fsRelation))
+ val fsRelationCopy = fsRelation.copy(location = carbonFileIndex)(fsRelation.sparkSession)
+ val logicalRelation = l.copy(relation = fsRelationCopy.asInstanceOf[BaseRelation])
+ logicalRelation
+ case insert: InsertIntoHadoopFsRelationCommand
+ if createSubFolder && insert.fileFormat.toString.equals("carbon") &&
+ FileFactory.getUpdatedFilePath(insert.outputPath.toString).equals(
+ FileFactory.getUpdatedFilePath(insert.options("path"))) &&
+ insert.partitionColumns.isEmpty =>
+ val path = new Path(insert.outputPath, System.nanoTime().toString)
+ insert.copy(outputPath = path)
+ }
+ transformedPlan
+ }
+
+ private def updateFileIndex(fileIndex: FileIndex,
+ hadoopFsRelation: HadoopFsRelation): FileIndex = {
+ if (fileIndex.isInstanceOf[InMemoryFileIndex] && fileIndex.rootPaths.length == 1) {
+ val carbonFile = FileFactory.getCarbonFile(fileIndex.rootPaths.head.toUri.toString)
+ val carbonFiles = carbonFile.listFiles()
+ if (carbonFiles.nonEmpty &&
+ !carbonFiles.exists(_.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT))) {
+ val paths = carbonFiles.map(p => new Path(p.getAbsolutePath)).toSeq
+ new InMemoryFileIndex(hadoopFsRelation.sparkSession,
+ paths,
+ hadoopFsRelation.options,
+ Some(hadoopFsRelation.partitionSchema))
+ } else {
+ fileIndex
+ }
+ } else {
+ fileIndex
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
new file mode 100644
index 0000000..8724fd1
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
+import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
+import org.apache.carbondata.core.scan.expression.conditional._
+import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
+
+object CarbonSparkDataSourceUtil {
+
+ /**
+ * Convert from carbon datatype to sparks datatype
+ */
+ def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
+ if (CarbonDataTypes.isDecimal(dataType)) {
+ DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
+ dataType.asInstanceOf[CarbonDecimalType].getScale)
+ } else {
+ dataType match {
+ case CarbonDataTypes.STRING => StringType
+ case CarbonDataTypes.SHORT => ShortType
+ case CarbonDataTypes.INT => IntegerType
+ case CarbonDataTypes.LONG => LongType
+ case CarbonDataTypes.DOUBLE => DoubleType
+ case CarbonDataTypes.BOOLEAN => BooleanType
+ case CarbonDataTypes.TIMESTAMP => TimestampType
+ case CarbonDataTypes.DATE => DateType
+ case CarbonDataTypes.VARCHAR => StringType
+ }
+ }
+ }
+
+ /**
+ * Convert from sparks datatype to carbon datatype
+ */
+ def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
+ dataType match {
+ case StringType => CarbonDataTypes.STRING
+ case ShortType => CarbonDataTypes.SHORT
+ case IntegerType => CarbonDataTypes.INT
+ case LongType => CarbonDataTypes.LONG
+ case DoubleType => CarbonDataTypes.DOUBLE
+ case FloatType => CarbonDataTypes.FLOAT
+ case DateType => CarbonDataTypes.DATE
+ case BooleanType => CarbonDataTypes.BOOLEAN
+ case TimestampType => CarbonDataTypes.TIMESTAMP
+ case ArrayType(elementType, _) =>
+ CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
+ case StructType(fields) =>
+ val carbonFields = new java.util.ArrayList[CarbonStructField]
+ fields.map { field =>
+ carbonFields.add(
+ new CarbonStructField(
+ field.name,
+ convertSparkToCarbonDataType(field.dataType)))
+ }
+ CarbonDataTypes.createStructType(carbonFields)
+ case NullType => CarbonDataTypes.NULL
+ case decimal: DecimalType =>
+ CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
+ case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
+ }
+ }
+
+ /**
+ * Converts data sources filters to carbon filter predicates.
+ */
+ def createCarbonFilter(schema: StructType,
+ predicate: sources.Filter): Option[CarbonExpression] = {
+ val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+ def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
+ predicate match {
+
+ case sources.EqualTo(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualTo(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.EqualNullSafe(name, value) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.Not(sources.EqualNullSafe(name, value)) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThan(name, value) =>
+ Some(new GreaterThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThan(name, value) =>
+ Some(new LessThanExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.GreaterThanOrEqual(name, value) =>
+ Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.LessThanOrEqual(name, value) =>
+ Some(new LessThanEqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case sources.In(name, values) =>
+ if (values.length == 1 && values(0) == null) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new InExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.filterNot(_ == null)
+ .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
+ }
+ case sources.Not(sources.In(name, values)) =>
+ if (values.contains(null)) {
+ Some(new FalseExpression(getCarbonExpression(name)))
+ } else {
+ Some(new NotInExpression(getCarbonExpression(name),
+ new ListExpression(
+ convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+ }
+ case sources.IsNull(name) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.IsNotNull(name) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.And(lhs, rhs) =>
+ (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
+ case sources.Or(lhs, rhs) =>
+ for {
+ lhsFilter <- createFilter(lhs)
+ rhsFilter <- createFilter(rhs)
+ } yield {
+ new OrExpression(lhsFilter, rhsFilter)
+ }
+ case sources.StringStartsWith(name, value) if value.length > 0 =>
+ Some(new StartsWithExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, value)))
+ case _ => None
+ }
+ }
+
+ def getCarbonExpression(name: String) = {
+ new CarbonColumnExpression(name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+ }
+
+ def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
+ val dataTypeOfAttribute =
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
+ val dataType =
+ if (Option(value).isDefined &&
+ dataTypeOfAttribute == CarbonDataTypes.STRING &&
+ value.isInstanceOf[Double]) {
+ CarbonDataTypes.DOUBLE
+ } else {
+ dataTypeOfAttribute
+ }
+ new CarbonLiteralExpression(value, dataType)
+ }
+
+ createFilter(predicate)
+ }
+
+ // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
+ // not able find the classes inside scala list and gives ClassNotFoundException.
+ def convertToJavaList(
+ scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
+ val javaList = new java.util.ArrayList[CarbonExpression]()
+ scalaList.foreach(javaList.add)
+ javaList
+ }
+
+ /**
+ * Create load model for carbon
+ */
+ def prepareLoadModel(options: Map[String, String],
+ dataSchema: StructType): CarbonLoadModel = {
+ val schema = new Schema(dataSchema.fields.map { field =>
+ field.dataType match {
+ case s: StructType =>
+ new Field(field.name,
+ field.dataType.typeName,
+ s.fields
+ .map(f => new datatype.StructField(f.name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
+ case a: ArrayType =>
+ new Field(field.name,
+ field.dataType.typeName,
+ Seq(new datatype.StructField(field.name,
+ CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
+ case other =>
+ new Field(field.name, field.dataType.simpleString)
+ }
+ })
+ val builder = new CarbonWriterBuilder
+ builder.isTransactionalTable(false)
+ builder.outputPath(options.getOrElse("path", ""))
+ val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
+ if (blockSize.isDefined) {
+ builder.withBlockSize(blockSize.get)
+ }
+ val blockletSize = options.get("table_blockletsize").map(_.toInt)
+ if (blockletSize.isDefined) {
+ builder.withBlockletSize(blockletSize.get)
+ }
+ builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+ CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean)
+ builder.localDictionaryThreshold(
+ options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+ CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
+ builder.sortBy(
+ options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
+ builder.uniqueIdentifier(System.currentTimeMillis())
+ val model = builder.buildLoadModel(schema)
+ val tableInfo = model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
+ val properties =
+ tableInfo.getFactTable.getTableProperties
+ // Add the meta cache level
+ options.map{ case (key, value) =>
+ if (key.equalsIgnoreCase(CarbonCommonConstants.COLUMN_META_CACHE)) {
+ val columnsToBeCached = value.split(",").map(x => x.trim.toLowerCase).toSeq
+ // make the columns in create table order and then add it to table properties
+ val createOrder =
+ tableInfo.getFactTable.getListOfColumns.asScala.map(_.getColumnName).filter(
+ col => columnsToBeCached.contains(col))
+ properties.put(CarbonCommonConstants.COLUMN_META_CACHE, createOrder.mkString(","))
+ }
+ }
+ model
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
new file mode 100644
index 0000000..d321cab
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql._
+import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparkTypeConverter
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.statusmanager.{FileFormat => CarbonFileFormatVersion}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import org.apache.carbondata.processing.loading.complexobjects.{ArrayObject, StructObject}
+import org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader
+
+/**
+ * Used to read and write data stored in carbondata files to/from the spark execution engine.
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+class SparkCarbonFileFormat extends FileFormat
+ with DataSourceRegister
+ with Logging
+ with Serializable {
+
+ @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ /**
+ * If user does not provide schema while reading the data then spark calls this method to infer
+ * schema from the carbodata files. It reads the schema present in carbondata files and return it.
+ */
+ override def inferSchema(sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ val tablePath = options.get("path") match {
+ case Some(path) => path
+ case _ => FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
+ }
+
+ val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
+ val table = CarbonTable.buildFromTableInfo(tableInfo)
+ var schema = new StructType
+ tableInfo.getFactTable.getListOfColumns.asScala.foreach { col =>
+ // TODO find better way to know its a child
+ if (!col.getColumnName.contains(".")) {
+ schema = schema.add(
+ col.getColumnName,
+ SparkTypeConverter.convertCarbonToSparkDataType(col, table))
+ }
+ }
+ Some(schema)
+ }
+
+ /**
+ * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation is
+ * done here.
+ */
+ override def prepareWrite(sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+
+ val conf = job.getConfiguration
+
+ val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
+ model.setLoadWithoutConverterStep(true)
+ CarbonTableOutputFormat.setLoadModel(conf, model)
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ val updatedPath = if (path.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
+ new Path(path).getParent.toString
+ } else {
+ path
+ }
+ context.getConfiguration.set("carbon.outputformat.writepath", updatedPath)
+ context.getConfiguration.set("carbon.outputformat.taskno", System.nanoTime() + "")
+ new CarbonOutputWriter(path, context, dataSchema.fields)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ CarbonTablePath.CARBON_DATA_EXT
+ }
+ }
+ }
+
+ /**
+ * It is a just class to make compile between spark 2.1 and 2.2
+ */
+ private trait AbstractCarbonOutputWriter {
+ def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
+ def writeInternal(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+ def write(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+ def writeCarbon(row: InternalRow): Unit
+ }
+
+
+ /**
+ * Writer class for carbondata files
+ */
+ private class CarbonOutputWriter(path: String,
+ context: TaskAttemptContext,
+ fieldTypes: Array[StructField]) extends OutputWriter with AbstractCarbonOutputWriter {
+
+ private val writable = new ObjectArrayWritable
+
+ private val cutOffDate = Integer.MAX_VALUE >> 1
+
+ private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] =
+ new CarbonTableOutputFormat().getRecordWriter(context)
+
+ /**
+ * Write sparks internal row to carbondata record writer
+ */
+ def writeCarbon(row: InternalRow): Unit = {
+ val data: Array[AnyRef] = extractData(row, fieldTypes)
+ writable.set(data)
+ recordWriter.write(NullWritable.get(), writable)
+ }
+
+ override def writeInternal(row: InternalRow): Unit = {
+ writeCarbon(row)
+ }
+
+ /**
+ * Convert the internal row to carbondata understandable object
+ */
+ private def extractData(row: InternalRow, fieldTypes: Array[StructField]): Array[AnyRef] = {
+ val data = new Array[AnyRef](fieldTypes.length)
+ var i = 0
+ while (i < fieldTypes.length) {
+ if (!row.isNullAt(i)) {
+ fieldTypes(i).dataType match {
+ case StringType =>
+ data(i) = row.getString(i)
+ case d: DecimalType =>
+ data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+ case s: StructType =>
+ data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
+ case s: ArrayType =>
+ data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+ case d: DateType =>
+ data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
+ case d: TimestampType =>
+ data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef]
+ case other =>
+ data(i) = row.get(i, other)
+ }
+ } else {
+ setNull(fieldTypes(i).dataType, data, i)
+ }
+ i += 1
+ }
+ data
+ }
+
+ private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = {
+ dataType match {
+ case d: DateType =>
+ // 1 as treated as null in carbon
+ data(i) = 1.asInstanceOf[AnyRef]
+ case _ =>
+ }
+ }
+
+ /**
+ * Convert the internal row to carbondata understandable object
+ */
+ private def extractData(row: ArrayData, dataType: DataType): Array[AnyRef] = {
+ val data = new Array[AnyRef](row.numElements())
+ var i = 0
+ while (i < data.length) {
+ if (!row.isNullAt(i)) {
+ dataType match {
+ case d: DecimalType =>
+ data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+ case s: StructType =>
+ data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
+ case s: ArrayType =>
+ data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+ case d: DateType =>
+ data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
+ case other => data(i) = row.get(i, dataType)
+ }
+ } else {
+ setNull(dataType, data, i)
+ }
+ i += 1
+ }
+ data
+ }
+
+ override def close(): Unit = {
+ recordWriter.close(context)
+ }
+ }
+
+ override def shortName(): String = "carbon"
+
+ override def toString: String = "carbon"
+
+ override def hashCode(): Int = getClass.hashCode()
+
+ override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonFileFormat]
+
+ /**
+ * Whether to support vector reader while reading data.
+ * In case of complex types it is not required to support it
+ */
+ private def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = {
+ val vectorizedReader = {
+ if (sparkSession.sqlContext.sparkSession.conf
+ .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
+ sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
+ System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else {
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+ CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+ }
+ }
+ vectorizedReader.toBoolean && schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
+
+ /**
+ * Returns whether this format support returning columnar batch or not.
+ */
+ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
+ val conf = sparkSession.sessionState.conf
+ conf.wholeStageEnabled &&
+ schema.length <= conf.wholeStageMaxNumFields &&
+ schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
+ /**
+ * Returns a function that can be used to read a single carbondata file in as an
+ * Iterator of InternalRow.
+ */
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ val filter: Option[CarbonExpression] = filters.flatMap { filter =>
+ CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
+ }.reduceOption(new AndExpression(_, _))
+
+ val projection = requiredSchema.map(_.name).toArray
+ val carbonProjection = new CarbonProjection
+ projection.foreach(carbonProjection.addColumn)
+
+ var supportBatchValue: Boolean = false
+
+ val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+ val readVector = supportVector(sparkSession, resultSchema)
+ if (readVector) {
+ supportBatchValue = supportBatch(sparkSession, resultSchema)
+ }
+ val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
+ CarbonInputFormat
+ .setTableInfo(hadoopConf, model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
+ CarbonInputFormat.setTransactionalTable(hadoopConf, false)
+ CarbonInputFormat.setColumnProjection(hadoopConf, carbonProjection)
+ filter match {
+ case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
+ case None => None
+ }
+ val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+ file: PartitionedFile => {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ if (file.filePath.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
+ val split = new CarbonInputSplit("null",
+ new Path(file.filePath),
+ file.start,
+ file.length,
+ file.locations,
+ CarbonFileFormatVersion.COLUMNAR_V3)
+ // It supports only from V3 version.
+ split.setVersion(ColumnarFormatVersion.V3)
+ val info = new BlockletDetailInfo()
+ split.setDetailInfo(info)
+ info.setBlockSize(file.length)
+ // Read the footer offset and set.
+ val reader = FileFactory.getFileHolder(FileFactory.getFileType(file.filePath))
+ val buffer = reader
+ .readByteBuffer(FileFactory.getUpdatedFilePath(file.filePath), file.length - 8, 8)
+ info.setBlockFooterOffset(buffer.getLong)
+ info.setVersionNumber(split.getVersion.number())
+ info.setUseMinMaxForPruning(true)
+ reader.finish()
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
+ val model = format.createQueryModel(split, hadoopAttemptContext)
+ model.setConverter(new SparkDataTypeConverterImpl)
+ val carbonReader = if (readVector) {
+ val vectorizedReader = new VectorizedCarbonRecordReader(model,
+ null,
+ supportBatchValue.toString)
+ vectorizedReader.initialize(split, hadoopAttemptContext)
+ vectorizedReader.initBatch(MemoryMode.ON_HEAP, partitionSchema, file.partitionValues)
+ logDebug(s"Appending $partitionSchema ${ file.partitionValues }")
+ vectorizedReader
+ } else {
+ val reader = new CarbonRecordReader(model,
+ new SparkUnsafeRowReadSuport(requiredSchema), null)
+ reader.initialize(split, hadoopAttemptContext)
+ reader
+ }
+
+ val iter = new RecordReaderIterator(carbonReader)
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+
+ if (carbonReader.isInstanceOf[VectorizedCarbonRecordReader] && readVector) {
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+ val joinedRow = new JoinedRow()
+ val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+ if (partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ iter.asInstanceOf[Iterator[InternalRow]]
+ .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
+ }
+ }
+ }
+ else {
+ Iterator.empty
+ }
+ }
+ }
+
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala
new file mode 100644
index 0000000..cffde6c
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.carbondata.execution.datasources.readsupport
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+
+/**
+ * Read support class which converts carbon row array format to sparks Internal row.
+ */
+class SparkUnsafeRowReadSuport(requiredSchema: StructType) extends CarbonReadSupport[InternalRow] {
+ private val unsafeProjection = UnsafeProjection.create(requiredSchema)
+ override def initialize(carbonColumns: Array[CarbonColumn],
+ carbonTable: CarbonTable): Unit = {
+ }
+
+ override def readRow(data: Array[AnyRef]): InternalRow = {
+ unsafeProjection(new GenericInternalRow(data.asInstanceOf[Array[Any]]))
+ }
+
+ override def close(): Unit = {
+ // Nothing to close
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/CarbonMetastoreTypes.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/CarbonMetastoreTypes.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/CarbonMetastoreTypes.scala
new file mode 100644
index 0000000..89eb622
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/CarbonMetastoreTypes.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import scala.util.parsing.combinator.RegexParsers
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.types._
+
+object CarbonMetastoreTypes extends RegexParsers {
+ protected lazy val primitiveType: Parser[DataType] =
+ "string" ^^^ StringType |
+ "varchar" ^^^ StringType |
+ "float" ^^^ FloatType |
+ "int" ^^^ IntegerType |
+ "tinyint" ^^^ ShortType |
+ "short" ^^^ ShortType |
+ "double" ^^^ DoubleType |
+ "long" ^^^ LongType |
+ "binary" ^^^ BinaryType |
+ "boolean" ^^^ BooleanType |
+ fixedDecimalType |
+ "decimal" ^^^ "decimal" ^^^ DecimalType(10, 0) |
+ "varchar\\((\\d+)\\)".r ^^^ StringType |
+ "date" ^^^ DateType |
+ "timestamp" ^^^ TimestampType
+
+ protected lazy val fixedDecimalType: Parser[DataType] =
+ "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
+ case precision ~ scale =>
+ DecimalType(precision.toInt, scale.toInt)
+ }
+
+ protected lazy val arrayType: Parser[DataType] =
+ "array" ~> "<" ~> dataType <~ ">" ^^ {
+ case tpe => ArrayType(tpe)
+ }
+
+ protected lazy val mapType: Parser[DataType] =
+ "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
+ case t1 ~ _ ~ t2 => MapType(t1, t2)
+ }
+
+ protected lazy val structField: Parser[StructField] =
+ "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
+ case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
+ }
+
+ protected lazy val structType: Parser[DataType] =
+ "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
+ case fields => StructType(fields)
+ }
+
+ protected lazy val dataType: Parser[DataType] =
+ arrayType |
+ mapType |
+ structType |
+ primitiveType
+
+ def toDataType(metastoreType: String): DataType = {
+ parseAll(dataType, metastoreType) match {
+ case Success(result, _) => result
+ case _: NoSuccess =>
+ throw new AnalysisException(s"Unsupported dataType: $metastoreType")
+ }
+ }
+
+ def toMetastoreType(dt: DataType): String = {
+ dt match {
+ case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
+ case StructType(fields) =>
+ s"struct<${
+ fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
+ .mkString(",")
+ }>"
+ case StringType => "string"
+ case FloatType => "float"
+ case IntegerType => "int"
+ case ShortType => "tinyint"
+ case DoubleType => "double"
+ case LongType => "bigint"
+ case BinaryType => "binary"
+ case BooleanType => "boolean"
+ case DecimalType() => "decimal"
+ case TimestampType => "timestamp"
+ case DateType => "date"
+ }
+ }
+}