You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/20 15:44:41 UTC
[08/10] carbondata git commit: [CARBONDATA-2224][File Level Reader
Support] External File level reader support
[CARBONDATA-2224][File Level Reader Support] External File level reader support
File level reader reads any carbondata file placed in any external file path.
This closes #2055
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/223c25de
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/223c25de
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/223c25de
Branch: refs/heads/carbonfile
Commit: 223c25de091fa5410875695492398dc5bfafa257
Parents: f5cdd5c
Author: sounakr <so...@gmail.com>
Authored: Sat Feb 24 07:55:14 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Mar 20 23:44:00 2018 +0800
----------------------------------------------------------------------
.../core/metadata/schema/table/CarbonTable.java | 6 +
.../apache/carbondata/core/util/CarbonUtil.java | 209 +++++-
.../hadoop/api/CarbonFileInputFormat.java | 682 +++++++++++++++++++
.../carbondata/hadoop/util/SchemaReader.java | 17 +-
integration/spark-common-test/pom.xml | 6 +
...FileInputFormatWithExternalCarbonTable.scala | 240 +++++++
...tCreateTableUsingSparkCarbonFileFormat.scala | 327 +++++++++
...tSparkCarbonFileFormatWithSparkSession.scala | 176 +++++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 64 +-
.../VectorizedCarbonRecordReader.java | 22 +-
.../management/CarbonLoadDataCommand.scala | 4 +-
.../command/table/CarbonDropTableCommand.scala | 2 +-
.../datasources/CarbonFileFormat.scala | 443 ------------
.../datasources/SparkCarbonFileFormat.scala | 269 ++++++++
.../datasources/SparkCarbonTableFormat.scala | 443 ++++++++++++
.../sql/execution/strategy/DDLStrategy.scala | 27 +-
.../spark/sql/hive/CarbonAnalysisRules.scala | 2 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 47 +-
.../spark/sql/hive/CarbonSessionState.scala | 17 +-
.../spark/sql/hive/CarbonSessionState.scala | 16 +-
....apache.spark.sql.sources.DataSourceRegister | 3 +-
.../sdk/file/CSVCarbonWriterTest.java | 6 +-
22 files changed, 2522 insertions(+), 506 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/223c25de/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f14672f..278dc96 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -826,6 +826,12 @@ public class CarbonTable implements Serializable {
return external != null && external.equalsIgnoreCase("true");
}
+ public boolean isFileLevelExternalTable() {
+ String external = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal");
+ return external != null && external.equalsIgnoreCase("true");
+ }
+
+
public long size() throws IOException {
Map<String, Long> dataIndexSize = CarbonUtil.calculateDataIndexSize(this);
Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/223c25de/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 06511f8..5a5f65d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
@@ -52,18 +53,26 @@ import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypeAdapter;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
import org.apache.carbondata.core.reader.ThriftReader;
import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
@@ -77,6 +86,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockletHeader;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.DataChunk3;
+import org.apache.carbondata.format.FileHeader;
+
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -1279,7 +1290,7 @@ public final class CarbonUtil {
int counter = 0;
for (int i = 0; i < wrapperColumnSchemaList.size(); i++) {
if (CarbonUtil.hasEncoding(wrapperColumnSchemaList.get(i).getEncodingList(),
- org.apache.carbondata.core.metadata.encoder.Encoding.DICTIONARY)) {
+ Encoding.DICTIONARY)) {
cardinality.add(dictionaryColumnCardinality[counter]);
counter++;
} else if (!wrapperColumnSchemaList.get(i).isDimensionColumn()) {
@@ -2068,6 +2079,202 @@ public final class CarbonUtil {
return tableInfo;
}
+ public static ColumnSchema thriftColumnSchmeaToWrapperColumnSchema(
+ org.apache.carbondata.format.ColumnSchema externalColumnSchema) {
+ ColumnSchema wrapperColumnSchema = new ColumnSchema();
+ wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
+ wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
+ wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar());
+ DataType dataType = thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type);
+ if (DataTypes.isDecimal(dataType)) {
+ DecimalType decimalType = (DecimalType) dataType;
+ decimalType.setPrecision(externalColumnSchema.getPrecision());
+ decimalType.setScale(externalColumnSchema.getScale());
+ }
+ wrapperColumnSchema.setDataType(dataType);
+ wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension());
+ List<Encoding> encoders = new ArrayList<Encoding>();
+ for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) {
+ encoders.add(fromExternalToWrapperEncoding(encoder));
+ }
+ wrapperColumnSchema.setEncodingList(encoders);
+ wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child());
+ wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision());
+ wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id());
+ wrapperColumnSchema.setScale(externalColumnSchema.getScale());
+ wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
+ wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
+ Map<String, String> properties = externalColumnSchema.getColumnProperties();
+ if (properties != null) {
+ if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
+ wrapperColumnSchema.setSortColumn(true);
+ }
+ }
+ wrapperColumnSchema.setFunction(externalColumnSchema.getAggregate_function());
+ List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation =
+ externalColumnSchema.getParentColumnTableRelations();
+ if (null != parentColumnTableRelation) {
+ wrapperColumnSchema.setParentColumnTableRelations(
+ fromThriftToWrapperParentTableColumnRelations(parentColumnTableRelation));
+ }
+ return wrapperColumnSchema;
+ }
+
+ static List<ParentColumnTableRelation> fromThriftToWrapperParentTableColumnRelations(
+ List<org.apache.carbondata.format.ParentColumnTableRelation> thirftParentColumnRelation) {
+ List<ParentColumnTableRelation> parentColumnTableRelationList = new ArrayList<>();
+ for (org.apache.carbondata.format.ParentColumnTableRelation carbonTableRelation :
+ thirftParentColumnRelation) {
+ RelationIdentifier relationIdentifier =
+ new RelationIdentifier(carbonTableRelation.getRelationIdentifier().getDatabaseName(),
+ carbonTableRelation.getRelationIdentifier().getTableName(),
+ carbonTableRelation.getRelationIdentifier().getTableId());
+ ParentColumnTableRelation parentColumnTableRelation =
+ new ParentColumnTableRelation(relationIdentifier, carbonTableRelation.getColumnId(),
+ carbonTableRelation.getColumnName());
+ parentColumnTableRelationList.add(parentColumnTableRelation);
+ }
+ return parentColumnTableRelationList;
+ }
+
+ static Encoding fromExternalToWrapperEncoding(
+ org.apache.carbondata.format.Encoding encoderThrift) {
+ switch (encoderThrift) {
+ case DICTIONARY:
+ return Encoding.DICTIONARY;
+ case DELTA:
+ return Encoding.DELTA;
+ case RLE:
+ return Encoding.RLE;
+ case INVERTED_INDEX:
+ return Encoding.INVERTED_INDEX;
+ case BIT_PACKED:
+ return Encoding.BIT_PACKED;
+ case DIRECT_DICTIONARY:
+ return Encoding.DIRECT_DICTIONARY;
+ default:
+ throw new IllegalArgumentException(encoderThrift.toString() + " is not supported");
+ }
+ }
+
+ static DataType thriftDataTyopeToWrapperDataType(
+ org.apache.carbondata.format.DataType dataTypeThrift) {
+ switch (dataTypeThrift) {
+ case BOOLEAN:
+ return DataTypes.BOOLEAN;
+ case STRING:
+ return DataTypes.STRING;
+ case SHORT:
+ return DataTypes.SHORT;
+ case INT:
+ return DataTypes.INT;
+ case LONG:
+ return DataTypes.LONG;
+ case DOUBLE:
+ return DataTypes.DOUBLE;
+ case DECIMAL:
+ return DataTypes.createDefaultDecimalType();
+ case DATE:
+ return DataTypes.DATE;
+ case TIMESTAMP:
+ return DataTypes.TIMESTAMP;
+ case ARRAY:
+ return DataTypes.createDefaultArrayType();
+ case STRUCT:
+ return DataTypes.createDefaultStructType();
+ default:
+ return DataTypes.STRING;
+ }
+ }
+
+ public static List<String> getFilePathExternalFilePath(String path) {
+
+ // return the list of carbondata files in the given path.
+ CarbonFile segment = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+ CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+
+ if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+ return true;
+ }
+ return false;
+ }
+ });
+ List<String> filePaths = new ArrayList<>(dataFiles.length);
+ for (CarbonFile dfiles : dataFiles) {
+ filePaths.add(dfiles.getAbsolutePath());
+ }
+ return filePaths;
+ }
+
+ /**
+ * This method will read the schema file from a given path
+ *
+ * @param schemaFilePath
+ * @return
+ */
+ public static org.apache.carbondata.format.TableInfo inferSchemaFileExternalTable(
+ String carbonDataFilePath, AbsoluteTableIdentifier absoluteTableIdentifier,
+ boolean schemaExists) throws IOException {
+ TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+ public org.apache.thrift.TBase<org.apache.carbondata.format.TableInfo,
+ org.apache.carbondata.format.TableInfo._Fields> create() {
+ return new org.apache.carbondata.format.TableInfo();
+ }
+ };
+ if (schemaExists == false) {
+ List<String> filePaths =
+ getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
+ String fistFilePath = null;
+ try {
+ fistFilePath = filePaths.get(0);
+ } catch (Exception e) {
+ LOGGER.error("CarbonData file is not present in the table location");
+ }
+ CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(fistFilePath);
+ FileHeader fileHeader = carbonHeaderReader.readHeader();
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
+ for (int i = 0; i < table_columns.size(); i++) {
+ ColumnSchema col = thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i));
+ col.setColumnReferenceId(col.getColumnUniqueId());
+ columnSchemaList.add(col);
+ }
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setTableName(absoluteTableIdentifier.getTableName());
+ tableSchema.setBucketingInfo(null);
+ tableSchema.setSchemaEvalution(null);
+ tableSchema.setTableId(UUID.randomUUID().toString());
+ tableSchema.setListOfColumns(columnSchemaList);
+
+ ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter =
+ new ThriftWrapperSchemaConverterImpl();
+ SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry();
+ schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis());
+ SchemaEvolution schemaEvol = new SchemaEvolution();
+ List<SchemaEvolutionEntry> schEntryList = new ArrayList<>();
+ schEntryList.add(schemaEvolutionEntry);
+ schemaEvol.setSchemaEvolutionEntryList(schEntryList);
+ tableSchema.setSchemaEvalution(schemaEvol);
+
+ org.apache.carbondata.format.TableSchema thriftFactTable =
+ thriftWrapperSchemaConverter.fromWrapperToExternalTableSchema(tableSchema);
+ org.apache.carbondata.format.TableInfo tableInfo =
+ new org.apache.carbondata.format.TableInfo(thriftFactTable,
+ new ArrayList<org.apache.carbondata.format.TableSchema>());
+
+ tableInfo.setDataMapSchemas(null);
+ return tableInfo;
+ } else {
+ ThriftReader thriftReader = new ThriftReader(carbonDataFilePath, createTBase);
+ thriftReader.open();
+ org.apache.carbondata.format.TableInfo tableInfo =
+ (org.apache.carbondata.format.TableInfo) thriftReader.read();
+ thriftReader.close();
+ return tableInfo;
+ }
+ }
+
public static void dropDatabaseDirectory(String databasePath)
throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/223c25de/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
new file mode 100644
index 0000000..b86b1cc
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.SingleTableProvider;
+import org.apache.carbondata.core.scan.filter.TableProvider;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+
+/**
+ * Input format of CarbonData file.
+ *
+ * @param <T>
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable {
+
+ public static final String READ_SUPPORT_CLASS = "carbon.read.support.class";
+ // comma separated list of input segment numbers
+ public static final String INPUT_SEGMENT_NUMBERS =
+ "mapreduce.input.carboninputformat.segmentnumbers";
+ private static final String VALIDATE_INPUT_SEGMENT_IDs =
+ "mapreduce.input.carboninputformat.validsegments";
+ // comma separated list of input files
+ public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
+ private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
+ private static final Log LOG = LogFactory.getLog(CarbonFileInputFormat.class);
+ private static final String FILTER_PREDICATE =
+ "mapreduce.input.carboninputformat.filter.predicate";
+ private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
+ private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
+ private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
+ private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
+ private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
+ public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
+ public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
+ private static final String PARTITIONS_TO_PRUNE =
+ "mapreduce.input.carboninputformat.partitions.to.prune";
+ public static final String UPADTE_T =
+ "mapreduce.input.carboninputformat.partitions.to.prune";
+
+ // a cache for carbon table, it will be used in task side
+ private CarbonTable carbonTable;
+
+ /**
+ * Set the `tableInfo` in `configuration`
+ */
+ public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
+ throws IOException {
+ if (null != tableInfo) {
+ configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
+ }
+ }
+
+ /**
+ * Get TableInfo object from `configuration`
+ */
+ private static TableInfo getTableInfo(Configuration configuration) throws IOException {
+ String tableInfoStr = configuration.get(TABLE_INFO);
+ if (tableInfoStr == null) {
+ return null;
+ } else {
+ TableInfo output = new TableInfo();
+ output.readFields(
+ new DataInputStream(
+ new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
+ return output;
+ }
+ }
+
+
+ public static void setTablePath(Configuration configuration, String tablePath) {
+ configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+ }
+
+ public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
+ configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
+ }
+
+
+ public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
+ throws IOException {
+ if (dataMapJob != null) {
+ String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
+ configuration.set(DATA_MAP_DSTR, toString);
+ }
+ }
+
+ public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
+ String jobString = configuration.get(DATA_MAP_DSTR);
+ if (jobString != null) {
+ return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
+ }
+ return null;
+ }
+
+ /**
+ * It sets unresolved filter expression.
+ *
+ * @param configuration
+ * @param filterExpression
+ */
+ public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
+ if (filterExpression == null) {
+ return;
+ }
+ try {
+ String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
+ configuration.set(FILTER_PREDICATE, filterString);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while setting filter expression to Job", e);
+ }
+ }
+
+ public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
+ if (projection == null || projection.isEmpty()) {
+ return;
+ }
+ String[] allColumns = projection.getAllColumns();
+ StringBuilder builder = new StringBuilder();
+ for (String column : allColumns) {
+ builder.append(column).append(",");
+ }
+ String columnString = builder.toString();
+ columnString = columnString.substring(0, columnString.length() - 1);
+ configuration.set(COLUMN_PROJECTION, columnString);
+ }
+
+ public static String getColumnProjection(Configuration configuration) {
+ return configuration.get(COLUMN_PROJECTION);
+ }
+
+
+ /**
+ * Set list of segments to access
+ */
+ public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
+ configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
+ }
+
+ /**
+ * Set `CARBON_INPUT_SEGMENTS` from property to configuration
+ */
+ public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
+ String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
+ String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
+ String segmentNumbersFromProperty = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
+ if (!segmentNumbersFromProperty.trim().equals("*")) {
+ CarbonFileInputFormat
+ .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
+ }
+ }
+
+ /**
+ * set list of segment to access
+ */
+ public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
+ configuration.set(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
+ }
+
+ /**
+ * get list of segment to access
+ */
+ public static boolean getValidateSegmentsToAccess(Configuration configuration) {
+ return configuration.get(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
+ .equalsIgnoreCase("true");
+ }
+
+ /**
+ * set list of partitions to prune
+ */
+ public static void setPartitionsToPrune(Configuration configuration,
+ List<PartitionSpec> partitions) {
+ if (partitions == null) {
+ return;
+ }
+ try {
+ String partitionString =
+ ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
+ configuration.set(PARTITIONS_TO_PRUNE, partitionString);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while setting patition information to Job", e);
+ }
+ }
+
+ /**
+ * get list of partitions to prune
+ */
+ private static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
+ throws IOException {
+ String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
+ if (partitionString != null) {
+ return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
+ }
+ return null;
+ }
+
+ public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+ throws IOException {
+ String tablePath = configuration.get(INPUT_DIR, "");
+ try {
+ return AbsoluteTableIdentifier
+ .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
+ } catch (InvalidConfigurationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * Configurations FileInputFormat.INPUT_DIR
+ * are used to get table path to read.
+ *
+ * @param job
+ * @return List<InputSplit> list of CarbonInputSplit
+ * @throws IOException
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+ CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
+ if (null == carbonTable) {
+ throw new IOException("Missing/Corrupt schema file for table.");
+ }
+ // TableDataMap blockletMap = DataMapStoreManager.getInstance()
+ // .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+
+ if (getValidateSegmentsToAccess(job.getConfiguration())) {
+ // get all valid segments and set them into the configuration
+ // check for externalTable segment (Segment_null)
+ // process and resolve the expression
+ Expression filter = getFilterPredicates(job.getConfiguration());
+ TableProvider tableProvider = new SingleTableProvider(carbonTable);
+ // this will be null in case of corrupt schema file.
+ PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
+
+ FilterResolverIntf filterInterface = CarbonInputFormatUtil
+ .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
+
+ String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null");
+ FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
+ if (FileFactory.isFileExist(segmentDir, fileType)) {
+ // if external table Segments are found, add it to the List
+ List<Segment> externalTableSegments = new ArrayList<Segment>();
+ Segment seg = new Segment("null", null);
+ externalTableSegments.add(seg);
+
+ Map<String, String> indexFiles =
+ new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir);
+
+ if (indexFiles.size() == 0) {
+ throw new RuntimeException("Index file not present to read the carbondata file");
+ }
+ // do block filtering and get split
+ List<InputSplit> splits =
+ getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
+
+ return splits;
+ }
+ }
+ return null;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
+ * are used to get table path to read.
+ *
+ * @return
+ * @throws IOException
+ */
+ private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
+ List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
+ List<Integer> oldPartitionIdList) throws IOException {
+
+ List<InputSplit> result = new LinkedList<InputSplit>();
+ UpdateVO invalidBlockVOForSegmentId = null;
+ Boolean isIUDTable = false;
+
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+ SegmentUpdateStatusManager updateStatusManager =
+ new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+ isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
+
+ // for each segment fetch blocks matching filter in Driver BTree
+ List<CarbonInputSplit> dataBlocksOfSegment =
+ getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+ validSegments, partitionInfo, oldPartitionIdList);
+ for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
+
+ // Get the UpdateVO for those tables on which IUD operations being performed.
+ if (isIUDTable) {
+ invalidBlockVOForSegmentId =
+ updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
+ }
+ String[] deleteDeltaFilePath = null;
+ if (isIUDTable) {
+ // In case IUD is not performed in this table avoid searching for
+ // invalidated blocks.
+ if (CarbonUtil
+ .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
+ invalidBlockVOForSegmentId, updateStatusManager)) {
+ continue;
+ }
+ // When iud is done then only get delete delta files for a block
+ try {
+ deleteDeltaFilePath = updateStatusManager
+ .getDeleteDeltaFilePath(inputSplit.getPath().toString(), inputSplit.getSegmentId());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
+ result.add(inputSplit);
+ }
+ return result;
+ }
+
+ protected Expression getFilterPredicates(Configuration configuration) {
+ try {
+ String filterExprString = configuration.get(FILTER_PREDICATE);
+ if (filterExprString == null) {
+ return null;
+ }
+ Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+ return (Expression) filter;
+ } catch (IOException e) {
+ throw new RuntimeException("Error while reading filter expression", e);
+ }
+ }
+
+ /**
+ * get data blocks of given segment
+ */
+ private List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
+ AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+ BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
+ List<Integer> oldPartitionIdList) throws IOException {
+
+ QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+ QueryStatistic statistic = new QueryStatistic();
+
+ // get tokens for all the required FileSystem for table path
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+ new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+ boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+ CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+ DataMapExprWrapper dataMapExprWrapper =
+ DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
+ DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
+ List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
+ List<ExtendedBlocklet> prunedBlocklets;
+ if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
+ DistributableDataMapFormat datamapDstr =
+ new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
+ segmentIds, partitionsToPrune,
+ BlockletDataMapFactory.class.getName());
+ prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+ // Apply expression on the blocklets.
+ prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
+ } else {
+ prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
+ }
+
+ List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+ int partitionIndex = 0;
+ List<Integer> partitionIdList = new ArrayList<>();
+ if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+ partitionIdList = partitionInfo.getPartitionIds();
+ }
+ for (ExtendedBlocklet blocklet : prunedBlocklets) {
+ long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
+ CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
+
+ // OldPartitionIdList is only used in alter table partition command because it change
+ // partition info first and then read data.
+ // For other normal query should use newest partitionIdList
+ if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+ if (oldPartitionIdList != null) {
+ partitionIndex = oldPartitionIdList.indexOf((int)partitionId);
+ } else {
+ partitionIndex = partitionIdList.indexOf((int)partitionId);
+ }
+ }
+ if (partitionIndex != -1) {
+ // matchedPartitions variable will be null in two cases as follows
+ // 1. the table is not a partition table
+ // 2. the table is a partition table, and all partitions are matched by query
+ // for partition table, the task id of carbaondata file name is the partition id.
+ // if this partition is not required, here will skip it.
+ if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
+ CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
+ if (inputSplit != null) {
+ resultFilterredBlocks.add(inputSplit);
+ }
+ }
+ }
+ }
+ statistic
+ .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+ recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+ return resultFilterredBlocks;
+ }
+
+ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
+ CarbonInputSplit split =
+ CarbonInputSplit.from(blocklet.getSegmentId(),
+ blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0,
+ blocklet.getLength(), blocklet.getLocations()),
+ ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
+ blocklet.getDataMapWriterPath());
+ split.setDetailInfo(blocklet.getDetailInfo());
+ return split;
+ }
+
+ @Override
+ public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
+ CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
+ return new CarbonRecordReader<T>(queryModel, readSupport);
+ }
+
+ public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
+ TableProvider tableProvider = new SingleTableProvider(carbonTable);
+
+ // query plan includes projection column
+ String projectionString = getColumnProjection(configuration);
+ String[] projectionColumnNames = null;
+ if (projectionString != null) {
+ projectionColumnNames = projectionString.split(",");
+ }
+ QueryModel queryModel = carbonTable.createQueryWithProjection(
+ projectionColumnNames, getDataTypeConverter(configuration));
+
+ // set the filter to the query model in order to filter blocklet before scan
+ Expression filter = getFilterPredicates(configuration);
+ boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
+ // getAllMeasures returns list of visible and invisible columns
+ boolean[] isFilterMeasures =
+ new boolean[carbonTable.getAllMeasures().size()];
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions,
+ isFilterMeasures);
+ queryModel.setIsFilterDimensions(isFilterDimensions);
+ queryModel.setIsFilterMeasures(isFilterMeasures);
+ FilterResolverIntf filterIntf = CarbonInputFormatUtil
+ .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
+ queryModel.setFilterExpressionResolverTree(filterIntf);
+
+ // update the file level index store if there are invalid segment
+ if (inputSplit instanceof CarbonMultiBlockSplit) {
+ CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+ List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+ if (invalidSegments.size() > 0) {
+ queryModel.setInvalidSegmentIds(invalidSegments);
+ }
+ List<UpdateVO> invalidTimestampRangeList =
+ split.getAllSplits().get(0).getInvalidTimestampRange();
+ if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
+ queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
+ }
+ }
+ return queryModel;
+ }
+
+ private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+ CarbonTable carbonTableTemp;
+ if (carbonTable == null) {
+ // carbon table should be created either from deserialized table info (schema saved in
+ // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+ TableInfo tableInfo = getTableInfo(configuration);
+ CarbonTable localCarbonTable;
+ if (tableInfo != null) {
+ localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+ } else {
+ String schemaPath = CarbonTablePath
+ .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
+ if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
+ TableInfo tableInfoInfer =
+ SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration));
+ localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
+ } else {
+ localCarbonTable =
+ SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration));
+ }
+ }
+ this.carbonTable = localCarbonTable;
+ return localCarbonTable;
+ } else {
+ carbonTableTemp = this.carbonTable;
+ return carbonTableTemp;
+ }
+ }
+
+
+ public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+ String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+ //By default it uses dictionary decoder read class
+ CarbonReadSupport<T> readSupport = null;
+ if (readSupportClass != null) {
+ try {
+ Class<?> myClass = Class.forName(readSupportClass);
+ Constructor<?> constructor = myClass.getConstructors()[0];
+ Object object = constructor.newInstance();
+ if (object instanceof CarbonReadSupport) {
+ readSupport = (CarbonReadSupport) object;
+ }
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Class " + readSupportClass + "not found", ex);
+ } catch (Exception ex) {
+ LOG.error("Error while creating " + readSupportClass, ex);
+ }
+ } else {
+ readSupport = new DictionaryDecodeReadSupport<>();
+ }
+ return readSupport;
+ }
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path filename) {
+ try {
+ // Don't split the file if it is local file system
+ FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
+ if (fileSystem instanceof LocalFileSystem) {
+ return false;
+ }
+ } catch (Exception e) {
+ return true;
+ }
+ return true;
+ }
+
+ /**
+ * return valid segment to access
+ */
+ private String[] getSegmentsToAccess(JobContext job) {
+ String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
+ if (segmentString.trim().isEmpty()) {
+ return new String[0];
+ }
+ return segmentString.split(",");
+ }
+
+ public static DataTypeConverter getDataTypeConverter(Configuration configuration)
+ throws IOException {
+ String converter = configuration.get(CARBON_CONVERTER);
+ if (converter == null) {
+ return new DataTypeConverterImpl();
+ }
+ return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
+ }
+
+ public static void setDatabaseName(Configuration configuration, String databaseName) {
+ if (null != databaseName) {
+ configuration.set(DATABASE_NAME, databaseName);
+ }
+ }
+
+ public static String getDatabaseName(Configuration configuration)
+ throws InvalidConfigurationException {
+ String databseName = configuration.get(DATABASE_NAME);
+ if (null == databseName) {
+ throw new InvalidConfigurationException("Database name is not set.");
+ }
+ return databseName;
+ }
+
+ public static void setTableName(Configuration configuration, String tableName) {
+ if (null != tableName) {
+ configuration.set(TABLE_NAME, tableName);
+ }
+ }
+
+ public static String getTableName(Configuration configuration)
+ throws InvalidConfigurationException {
+ String tableName = configuration.get(TABLE_NAME);
+ if (tableName == null) {
+ throw new InvalidConfigurationException("Table name is not set");
+ }
+ return tableName;
+ }
+
+ public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(
+ org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/223c25de/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index dfa8dd1..ab7c333 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
/**
* TODO: It should be removed after store manager implementation.
@@ -59,6 +58,7 @@ public class SchemaReader {
throw new IOException("File does not exist: " + schemaFilePath);
}
}
+
/**
* the method returns the Wrapper TableInfo
*
@@ -79,4 +79,19 @@ public class SchemaReader {
carbonTableIdentifier.getTableName(),
identifier.getTablePath());
}
+
+
+ public static TableInfo inferSchema(AbsoluteTableIdentifier identifier)
+ throws IOException {
+ // This routine is going to infer schema from the carbondata file footer
+ // Convert the ColumnSchema -> TableSchema -> TableInfo.
+ // Return the TableInfo.
+ org.apache.carbondata.format.TableInfo tableInfo =
+ CarbonUtil.inferSchemaFileExternalTable(identifier.getTablePath(), identifier, false);
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ TableInfo wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, identifier.getDatabaseName(),
+ identifier.getTableName(), identifier.getTablePath());
+ return wrapperTableInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/223c25de/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index b7f19fd..1c6cee9 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -105,6 +105,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-store-sdk</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/223c25de/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
new file mode 100644
index 0000000..8b1f63f
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll {
+
+ var writerPath = new File(this.getClass.getResource("/").getPath
+ +
+ "../." +
+ "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+ .getCanonicalPath
+ //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ writerPath = writerPath.replace("\\", "/");
+
+
+ def buildTestData(persistSchema:Boolean) = {
+
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val schema = new StringBuilder()
+ .append("[ \n")
+ .append(" {\"name\":\"string\"},\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"height\":\"double\"}\n")
+ .append("]")
+ .toString()
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ if (persistSchema) {
+ builder.persistSchemaFile(true)
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ } else {
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ }
+
+ var i = 0
+ while (i < 100) {
+ writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+ i += 1
+ }
+ writer.close()
+ } catch {
+ case ex: Exception => None
+ case _ => None
+ }
+ }
+
+ def cleanTestData() = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ }
+
+ def deleteIndexFile(path: String, extension: String) : Unit = {
+ val file: CarbonFile = FileFactory
+ .getCarbonFile(path, FileFactory.getFileType(path))
+
+ for (eachDir <- file.listFiles) {
+ if (!eachDir.isDirectory) {
+ if (eachDir.getName.endsWith(extension)) {
+ CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+ }
+ } else {
+ deleteIndexFile(eachDir.getPath, extension)
+ }
+ }
+ }
+
+ override def beforeAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ // create carbon table and insert data
+ }
+
+ override def afterAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ }
+
+ //TO DO, need to remove segment dependency and tableIdentifier Dependency
+ test("read carbondata files (sdk Writer Output) using the Carbonfile ") {
+ buildTestData(false)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //new provider Carbonfile
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("Describe formatted sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable limit 3").show(false)
+
+ sql("select name from sdkOutputTable").show(false)
+
+ sql("select age from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false)
+
+ sql("select * from sdkOutputTable where name = 'robot3'").show(200, false)
+
+ sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false)
+
+ sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false)
+
+ sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false)
+
+ sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false)
+
+ sql("select count(*) from sdkOutputTable").show(200, false)
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ test("should not allow to alter datasource carbontable ") {
+ buildTestData(false)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //data source file format
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ val exception = intercept[MalformedCarbonCommandException]
+ {
+ sql("Alter table sdkOutputTable change age age BIGINT")
+ }
+ assert(exception.getMessage()
+ .contains("Unsupported alter operation on Carbon external fileformat table"))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+ test("Read sdk writer output file without index file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //data source file format
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ //org.apache.spark.SparkException: Index file not present to read the carbondata file
+ val exception = intercept[java.lang.RuntimeException]
+ {
+ sql("select * from sdkOutputTable").show(false)
+ }
+ assert(exception.getMessage().contains("Index file not present to read the carbondata file"))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+
+ test("Read sdk writer output file without Carbondata file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ val exception = intercept[Exception] {
+ // data source file format
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ |'$writerPath' """.stripMargin)
+ }
+ assert(exception.getMessage()
+ .contains("Operation not allowed: Invalid table path provided:"))
+
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+
+
+ test("Read sdk writer output file without any file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+ deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ val exception = intercept[Exception] {
+ //data source file format
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(false)
+ }
+ assert(exception.getMessage()
+ .contains("Operation not allowed: Invalid table path provided:"))
+
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ cleanTestData()
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/223c25de/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
new file mode 100644
index 0000000..d284e50
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAndAfterAll {
+
+
+ override def beforeAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ }
+
+ override def afterAll(): Unit = {
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ }
+
+ var writerPath = new File(this.getClass.getResource("/").getPath
+ +
+ "../." +
+ "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+ .getCanonicalPath
+ //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ writerPath = writerPath.replace("\\", "/");
+
+ val filePath = writerPath + "/Fact/Part0/Segment_null/"
+
+ def buildTestData(persistSchema:Boolean) = {
+
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val schema = new StringBuilder()
+ .append("[ \n")
+ .append(" {\"name\":\"string\"},\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"height\":\"double\"}\n")
+ .append("]")
+ .toString()
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ if (persistSchema) {
+ builder.persistSchemaFile(true)
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ } else {
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ }
+
+ var i = 0
+ while (i < 100) {
+ writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+ i += 1
+ }
+ writer.close()
+ } catch {
+ case ex: Exception => None
+ case _ => None
+ }
+ }
+
+ def cleanTestData() = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ }
+
+ def deleteIndexFile(path: String, extension: String) : Unit = {
+ val file: CarbonFile = FileFactory
+ .getCarbonFile(path, FileFactory.getFileType(path))
+
+ for (eachDir <- file.listFiles) {
+ if (!eachDir.isDirectory) {
+ if (eachDir.getName.endsWith(extension)) {
+ CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+ }
+ } else {
+ deleteIndexFile(eachDir.getPath, extension)
+ }
+ }
+ }
+
+ //TO DO, need to remove segment dependency and tableIdentifier Dependency
+ test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") {
+ buildTestData(false)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //data source file format
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+
+ sql("Describe formatted sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable limit 3").show(false)
+
+ sql("select name from sdkOutputTable").show(false)
+
+ sql("select age from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false)
+
+ sql("select * from sdkOutputTable where name = 'robot3'").show(200,false)
+
+ sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false)
+
+ sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false)
+
+ sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false)
+
+ sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false)
+
+ sql("select count(*) from sdkOutputTable").show(200,false)
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+
+
+ test("should not allow to alter datasource carbontable ") {
+ buildTestData(false)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+
+ val exception = intercept[MalformedCarbonCommandException]
+ {
+ sql("Alter table sdkOutputTable change age age BIGINT")
+ }
+ assert(exception.getMessage().contains("Unsupported alter operation on hive table"))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+
+ test("Read sdk writer output file without Carbondata file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ val exception = intercept[org.apache.spark.SparkException] {
+ // data source file format
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+ }
+ assert(exception.getMessage()
+ .contains("CarbonData file is not present in the location mentioned in DDL"))
+
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+
+
+ test("Read sdk writer output file without any file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ val exception = intercept[org.apache.spark.SparkException] {
+ //data source file format
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+
+ sql("select * from sdkOutputTable").show(false)
+ }
+ assert(exception.getMessage()
+ .contains("CarbonData file is not present in the location mentioned in DDL"))
+
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+
+ test("Read sdk writer output file withSchema") {
+ buildTestData(true)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ //data source file format
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+
+ sql("Describe formatted sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable limit 3").show(false)
+
+ sql("select name from sdkOutputTable").show(false)
+
+ sql("select age from sdkOutputTable").show(false)
+
+ sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false)
+
+ sql("select * from sdkOutputTable where name = 'robot3'").show(200, false)
+
+ sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false)
+
+ sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false)
+
+ sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false)
+
+ sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false)
+
+ sql("select count(*) from sdkOutputTable").show(200, false)
+
+ sql("DROP TABLE sdkOutputTable")
+
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+
+ test("Read sdk writer output file without index file should fail") {
+ buildTestData(false)
+ deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ assert(new File(filePath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+ if (sqlContext.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+ //org.apache.spark.SparkException: Index file not present to read the carbondata file
+ val exception = intercept[org.apache.spark.SparkException]
+ {
+ sql("select * from sdkOutputTable").show(false)
+ }
+ assert(exception.getMessage().contains("Index file not present to read the carbondata file"))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/223c25de/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
new file mode 100644
index 0000000..9a46676
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+object TestSparkCarbonFileFormatWithSparkSession {
+
+ var writerPath = new File(this.getClass.getResource("/").getPath
+ +
+ "../." +
+ "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+ .getCanonicalPath
+ //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+ writerPath = writerPath.replace("\\", "/");
+
+ val filePath = writerPath + "/Fact/Part0/Segment_null/"
+
+ def buildTestData(persistSchema:Boolean) = {
+
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val schema = new StringBuilder()
+ .append("[ \n")
+ .append(" {\"name\":\"string\"},\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"height\":\"double\"}\n")
+ .append("]")
+ .toString()
+
+ try {
+ val builder = CarbonWriter.builder()
+ val writer =
+ if (persistSchema) {
+ builder.persistSchemaFile(true)
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ } else {
+ builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+ }
+
+ var i = 0
+ while (i < 100) {
+ writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+ i += 1
+ }
+ writer.close()
+ } catch {
+ case ex: Exception => None
+ case _ => None
+ }
+ }
+
+ def cleanTestData() = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ }
+
+ def deleteIndexFile(path: String, extension: String) : Unit = {
+ val file: CarbonFile = FileFactory
+ .getCarbonFile(path, FileFactory.getFileType(path))
+
+ for (eachDir <- file.listFiles) {
+ if (!eachDir.isDirectory) {
+ if (eachDir.getName.endsWith(extension)) {
+ CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+ }
+ } else {
+ deleteIndexFile(eachDir.getPath, extension)
+ }
+ }
+ }
+
+ def main(args: Array[String]): Unit = {
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+ val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+ // clean data folder
+ if (true) {
+ val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+ clean(storeLocation)
+ clean(warehouse)
+ clean(metastoredb)
+ }
+
+ val spark = SparkSession
+ .builder()
+ .master("local")
+ .appName("TestSparkCarbonFileFormatWithSparkSession")
+ .enableHiveSupport()
+ .config("spark.sql.warehouse.dir", warehouse)
+ .config("javax.jdo.option.ConnectionURL",
+ s"jdbc:derby:;databaseName=$metastoredb;create=true")
+ .getOrCreate()
+
+ CarbonProperties.getInstance()
+ .addProperty("carbon.storelocation", storeLocation)
+
+ spark.sparkContext.setLogLevel("WARN")
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss")
+ .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
+ buildTestData(false)
+ assert(new File(filePath).exists())
+ //data source file format
+ if (spark.sparkContext.version.startsWith("2.1")) {
+ //data source file format
+ spark.sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ } else if (spark.sparkContext.version.startsWith("2.2")) {
+ //data source file format
+ spark.sql(
+ s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ |'$filePath' """.stripMargin)
+ } else{
+ // TO DO
+ }
+
+ spark.sql("Describe formatted sdkOutputTable").show(false)
+
+ spark.sql("select * from sdkOutputTable").show(false)
+
+ spark.sql("select * from sdkOutputTable limit 3").show(false)
+
+ spark.sql("select name from sdkOutputTable").show(false)
+
+ spark.sql("select age from sdkOutputTable").show(false)
+
+ spark.sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false)
+
+ spark.sql("select * from sdkOutputTable where name = 'robot3'").show(200,false)
+
+ spark.sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false)
+
+ spark.sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false)
+
+ spark.sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false)
+
+ spark.sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false)
+
+ spark.sql("select count(*) from sdkOutputTable").show(200,false)
+
+ spark.sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(filePath).exists())
+ cleanTestData()
+
+ spark.stop()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/223c25de/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 49a8023..6afd2c0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.block.Distributable
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.filter.FilterUtil
import org.apache.carbondata.core.scan.model.QueryModel
@@ -50,7 +50,7 @@ import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstant
import org.apache.carbondata.core.statusmanager.FileFormat
import org.apache.carbondata.core.util._
import org.apache.carbondata.hadoop._
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.InitInputMetrics
@@ -90,13 +90,21 @@ class CarbonScanRDD(
val jobConf = new JobConf(conf)
SparkHadoopUtil.get.addCredentials(jobConf)
val job = Job.getInstance(jobConf)
- val format = prepareInputFormatForDriver(job.getConfiguration)
-
+ val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal")
+ val format = if (fileLevelExternal != null && fileLevelExternal.equalsIgnoreCase("true")) {
+ prepareFileInputFormatForDriver(job.getConfiguration)
+ } else {
+ prepareInputFormatForDriver(job.getConfiguration)
+ }
// initialise query_id for job
job.getConfiguration.set("query.id", queryId)
// get splits
val splits = format.getSplits(job)
+ if ((splits == null) && format.isInstanceOf[CarbonFileInputFormat[Object]]) {
+ throw new SparkException(
+ "CarbonData file not exist in the segment_null (SDK writer Output) path")
+ }
// separate split
// 1. for batch splits, invoke distributeSplits method to create partitions
@@ -113,7 +121,7 @@ class CarbonScanRDD(
}
val batchPartitions = distributeColumnarSplits(columnarSplits)
// check and remove InExpression from filterExpression
- checkAndRemoveInExpressinFromFilterExpression(format, batchPartitions)
+ checkAndRemoveInExpressinFromFilterExpression(batchPartitions)
if (streamSplits.isEmpty) {
batchPartitions.toArray
} else {
@@ -354,7 +362,9 @@ class CarbonScanRDD(
case _ =>
// create record reader for CarbonData file format
if (vectorReader) {
- val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats)
+ val carbonRecordReader = createVectorizedCarbonRecordReader(model,
+ inputMetricsStats,
+ "true")
if (carbonRecordReader == null) {
new CarbonRecordReader(model,
format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats)
@@ -431,6 +441,16 @@ class CarbonScanRDD(
createInputFormat(conf)
}
+ def prepareFileInputFormatForDriver(conf: Configuration): CarbonFileInputFormat[Object] = {
+ CarbonFileInputFormat.setTableInfo(conf, tableInfo)
+ CarbonFileInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
+ CarbonFileInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
+ if (partitionNames != null) {
+ CarbonFileInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
+ }
+ createFileInputFormat(conf)
+ }
+
private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = {
CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
val tableInfo1 = getTableInfo
@@ -441,6 +461,32 @@ class CarbonScanRDD(
createInputFormat(conf)
}
+ private def createFileInputFormat(conf: Configuration): CarbonFileInputFormat[Object] = {
+ val format = new CarbonFileInputFormat[Object]
+ CarbonFileInputFormat.setTablePath(conf,
+ identifier.appendWithLocalPrefix(identifier.getTablePath))
+ CarbonFileInputFormat.setQuerySegment(conf, identifier)
+ CarbonFileInputFormat.setFilterPredicates(conf, filterExpression)
+ CarbonFileInputFormat.setColumnProjection(conf, columnProjection)
+ CarbonFileInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+ if (CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+ CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
+ CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+ }
+
+ // when validate segments is disabled in thread local update it to CarbonTableInputFormat
+ val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+ if (carbonSessionInfo != null) {
+ CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+ .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+ identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
+ }
+ format
+ }
+
+
private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
val format = new CarbonTableInputFormat[Object]
CarbonTableInputFormat.setTablePath(conf,
@@ -485,7 +531,6 @@ class CarbonScanRDD(
* @param identifiedPartitions
*/
private def checkAndRemoveInExpressinFromFilterExpression(
- format: CarbonTableInputFormat[Object],
identifiedPartitions: mutable.Buffer[Partition]) = {
if (null != filterExpression) {
if (identifiedPartitions.nonEmpty &&
@@ -533,12 +578,13 @@ class CarbonScanRDD(
}
def createVectorizedCarbonRecordReader(queryModel: QueryModel,
- inputMetricsStats: InputMetricsStats): RecordReader[Void, Object] = {
+ inputMetricsStats: InputMetricsStats, enableBatch: String): RecordReader[Void, Object] = {
val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
try {
val cons = Class.forName(name).getDeclaredConstructors
cons.head.setAccessible(true)
- cons.head.newInstance(queryModel, inputMetricsStats).asInstanceOf[RecordReader[Void, Object]]
+ cons.head.newInstance(queryModel, inputMetricsStats, enableBatch)
+ .asInstanceOf[RecordReader[Void, Object]]
} catch {
case e: Exception =>
LOGGER.error(e)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/223c25de/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 73da878..903bf44 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -91,10 +91,21 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
private InputMetricsStats inputMetricsStats;
- public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats) {
+ public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats,
+ String enableBatch) {
this.queryModel = queryModel;
this.inputMetricsStats = inputMetricsStats;
- enableReturningBatches();
+ if (enableBatch.equals("true")) {
+ enableReturningBatches();
+ }
+ }
+
+
+ /*
+ * Can be called before any rows are returned to enable returning columnar batches directly.
+ */
+ public void enableReturningBatches() {
+ returnColumnarBatch = true;
}
/**
@@ -273,12 +284,7 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
if (columnarBatch == null) initBatch();
}
- /*
- * Can be called before any rows are returned to enable returning columnar batches directly.
- */
- private void enableReturningBatches() {
- returnColumnarBatch = true;
- }
+
/**
* Advances to the next batch of rows. Returns false if there are no more.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/223c25de/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 18c268c..7a6aa53 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
-import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.types._
@@ -1020,7 +1020,7 @@ case class CarbonLoadDataCommand(
partitionSchema = partitionSchema,
dataSchema = dataSchema,
bucketSpec = catalogTable.bucketSpec,
- fileFormat = new CarbonFileFormat,
+ fileFormat = new SparkCarbonTableFormat,
options = options.toMap)(sparkSession = sparkSession)
CarbonReflectionUtils.getLogicalRelation(hdfsRelation,