You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/28 03:19:45 UTC
[4/5] carbondata git commit: [CARBONDATA-2165]Remove spark in
carbon-hadoop module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
new file mode 100644
index 0000000..b2c2d39
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.testutil;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
+import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
+import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
+import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * This class will create store file based on provided schema
+ *
+ */
+public class StoreCreator {
+
+ private static LogService LOG =
+ LogServiceFactory.getLogService(StoreCreator.class.getCanonicalName());
+ private static AbsoluteTableIdentifier absoluteTableIdentifier;
+ private static String storePath = null;
+
+ static {
+ storePath = new File("target/store").getAbsolutePath();
+ String dbName = "testdb";
+ String tableName = "testtable";
+ absoluteTableIdentifier = AbsoluteTableIdentifier.from(storePath + "/testdb/testtable",
+ new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+ }
+
+ public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+ return absoluteTableIdentifier;
+ }
+
+ public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath,
+ AbsoluteTableIdentifier absoluteTableIdentifier) {
+ CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
+ CarbonLoadModel loadModel = new CarbonLoadModel();
+ loadModel.setCarbonDataLoadSchema(schema);
+ loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
+ loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+ loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+ loadModel.setFactFilePath(factFilePath);
+ loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
+ loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
+ loadModel.setDateFormat(null);
+ loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));
+ loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+ loadModel
+ .setSerializationNullFormat(
+ TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
+ loadModel
+ .setBadRecordsLoggerEnable(
+ TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false");
+ loadModel
+ .setBadRecordsAction(
+ TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE");
+ loadModel
+ .setIsEmptyDataBadRecord(
+ DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false");
+ loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary");
+ loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
+ loadModel.setTaskNo("0");
+ loadModel.setSegmentId("0");
+ loadModel.setFactTimeStamp(System.currentTimeMillis());
+ loadModel.setMaxColumns("10");
+ return loadModel;
+ }
+
+ /**
+ * Create store without any restructure
+ */
+ public static void createCarbonStore() throws Exception {
+ CarbonLoadModel loadModel = createTableAndLoadModel();
+ loadData(loadModel, storePath);
+ }
+
+ /**
+ * Method to clear the data maps
+ */
+ public static void clearDataMaps() {
+ DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier);
+ }
+
+ public static CarbonLoadModel createTableAndLoadModel() throws Exception {
+ String factFilePath =
+ new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
+ File storeDir = new File(storePath);
+ CarbonUtil.deleteFoldersAndFiles(storeDir);
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
+ storePath);
+
+ CarbonTable table = createTable(absoluteTableIdentifier);
+ writeDictionary(factFilePath, table);
+ return buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier);
+ }
+
+ public static CarbonTable createTable(
+ AbsoluteTableIdentifier identifier) throws IOException {
+ TableInfo tableInfo = new TableInfo();
+ tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
+ List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
+ ArrayList<Encoding> encodings = new ArrayList<>();
+ encodings.add(Encoding.DICTIONARY);
+ ColumnSchema id = new ColumnSchema();
+ id.setColumnName("ID");
+ id.setColumnar(true);
+ id.setDataType(DataTypes.INT);
+ id.setEncodingList(encodings);
+ id.setColumnUniqueId(UUID.randomUUID().toString());
+ id.setColumnReferenceId(id.getColumnUniqueId());
+ id.setDimensionColumn(true);
+ id.setColumnGroup(1);
+ columnSchemas.add(id);
+
+ ColumnSchema date = new ColumnSchema();
+ date.setColumnName("date");
+ date.setColumnar(true);
+ date.setDataType(DataTypes.STRING);
+ date.setEncodingList(encodings);
+ date.setColumnUniqueId(UUID.randomUUID().toString());
+ date.setDimensionColumn(true);
+ date.setColumnGroup(2);
+ date.setSortColumn(true);
+ date.setColumnReferenceId(id.getColumnUniqueId());
+ columnSchemas.add(date);
+
+ ColumnSchema country = new ColumnSchema();
+ country.setColumnName("country");
+ country.setColumnar(true);
+ country.setDataType(DataTypes.STRING);
+ country.setEncodingList(encodings);
+ country.setColumnUniqueId(UUID.randomUUID().toString());
+ country.setDimensionColumn(true);
+ country.setColumnGroup(3);
+ country.setSortColumn(true);
+ country.setColumnReferenceId(id.getColumnUniqueId());
+ columnSchemas.add(country);
+
+ ColumnSchema name = new ColumnSchema();
+ name.setColumnName("name");
+ name.setColumnar(true);
+ name.setDataType(DataTypes.STRING);
+ name.setEncodingList(encodings);
+ name.setColumnUniqueId(UUID.randomUUID().toString());
+ name.setDimensionColumn(true);
+ name.setColumnGroup(4);
+ name.setSortColumn(true);
+ name.setColumnReferenceId(id.getColumnUniqueId());
+ columnSchemas.add(name);
+
+ ColumnSchema phonetype = new ColumnSchema();
+ phonetype.setColumnName("phonetype");
+ phonetype.setColumnar(true);
+ phonetype.setDataType(DataTypes.STRING);
+ phonetype.setEncodingList(encodings);
+ phonetype.setColumnUniqueId(UUID.randomUUID().toString());
+ phonetype.setDimensionColumn(true);
+ phonetype.setColumnGroup(5);
+ phonetype.setSortColumn(true);
+ phonetype.setColumnReferenceId(id.getColumnUniqueId());
+ columnSchemas.add(phonetype);
+
+ ColumnSchema serialname = new ColumnSchema();
+ serialname.setColumnName("serialname");
+ serialname.setColumnar(true);
+ serialname.setDataType(DataTypes.STRING);
+ serialname.setEncodingList(encodings);
+ serialname.setColumnUniqueId(UUID.randomUUID().toString());
+ serialname.setDimensionColumn(true);
+ serialname.setColumnGroup(6);
+ serialname.setSortColumn(true);
+ serialname.setColumnReferenceId(id.getColumnUniqueId());
+ columnSchemas.add(serialname);
+
+ ColumnSchema salary = new ColumnSchema();
+ salary.setColumnName("salary");
+ salary.setColumnar(true);
+ salary.setDataType(DataTypes.INT);
+ salary.setEncodingList(new ArrayList<Encoding>());
+ salary.setColumnUniqueId(UUID.randomUUID().toString());
+ salary.setDimensionColumn(false);
+ salary.setColumnReferenceId(id.getColumnUniqueId());
+ salary.setColumnGroup(7);
+ columnSchemas.add(salary);
+
+ tableSchema.setListOfColumns(columnSchemas);
+ SchemaEvolution schemaEvol = new SchemaEvolution();
+ schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
+ tableSchema.setSchemaEvalution(schemaEvol);
+ tableSchema.setTableId(UUID.randomUUID().toString());
+ tableInfo.setTableUniqueName(
+ identifier.getCarbonTableIdentifier().getTableUniqueName()
+ );
+ tableInfo.setLastUpdatedTime(System.currentTimeMillis());
+ tableInfo.setFactTable(tableSchema);
+ tableInfo.setTablePath(identifier.getTablePath());
+ String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
+ String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
+ CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
+
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ org.apache.carbondata.format.TableInfo thriftTableInfo =
+ schemaConverter.fromWrapperToExternalTableInfo(
+ tableInfo,
+ tableInfo.getDatabaseName(),
+ tableInfo.getFactTable().getTableName());
+ org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
+ new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());
+ thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()
+ .add(schemaEvolutionEntry);
+
+ FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath);
+ if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+ FileFactory.mkdirs(schemaMetadataPath, fileType);
+ }
+
+ ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
+ thriftWriter.open();
+ thriftWriter.write(thriftTableInfo);
+ thriftWriter.close();
+ return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
+ }
+
+ private static void writeDictionary(String factFilePath, CarbonTable table) throws Exception {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(factFilePath), "UTF-8"));
+ List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
+ Set<String>[] set = new HashSet[dims.size()];
+ for (int i = 0; i < set.length; i++) {
+ set[i] = new HashSet<String>();
+ }
+ String line = reader.readLine();
+ while (line != null) {
+ String[] data = line.split(",");
+ for (int i = 0; i < set.length; i++) {
+ set[i].add(data[i]);
+ }
+ line = reader.readLine();
+ }
+
+ Cache dictCache = CacheProvider.getInstance()
+ .createCache(CacheType.REVERSE_DICTIONARY);
+ for (int i = 0; i < set.length; i++) {
+ ColumnIdentifier columnIdentifier =
+ new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
+ DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
+ new DictionaryColumnUniqueIdentifier(
+ table.getAbsoluteTableIdentifier(), columnIdentifier, columnIdentifier.getDataType());
+ CarbonDictionaryWriter writer =
+ new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
+ for (String value : set[i]) {
+ writer.write(value);
+ }
+ writer.close();
+ writer.commit();
+ Dictionary dict = (Dictionary) dictCache.get(
+ new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+ columnIdentifier, dims.get(i).getDataType()));
+ CarbonDictionarySortInfoPreparator preparator =
+ new CarbonDictionarySortInfoPreparator();
+ List<String> newDistinctValues = new ArrayList<String>();
+ CarbonDictionarySortInfo dictionarySortInfo =
+ preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType());
+ CarbonDictionarySortIndexWriter carbonDictionaryWriter =
+ new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier);
+ try {
+ carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex());
+ carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted());
+ } finally {
+ carbonDictionaryWriter.close();
+ }
+ }
+ reader.close();
+ }
+
+ /**
+ * Execute graph which will further load data
+ *
+ * @param loadModel
+ * @param storeLocation
+ * @throws Exception
+ */
+ public static void loadData(CarbonLoadModel loadModel, String storeLocation)
+ throws Exception {
+ if (new File(storeLocation).mkdirs()) {
+ LOG.warn("mkdir is failed");
+ }
+ String outPutLoc = storeLocation + "/etl";
+ String databaseName = loadModel.getDatabaseName();
+ String tableName = loadModel.getTableName();
+ String tempLocationKey = databaseName + '_' + tableName + "_1";
+ CarbonProperties.getInstance().addProperty(
+ tempLocationKey, storeLocation + "/" + databaseName + "/" + tableName);
+ CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
+ CarbonProperties.getInstance().addProperty("send.signal.load", "false");
+ CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true");
+ CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1");
+ CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true");
+ CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true");
+ CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true");
+ CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false");
+ CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000");
+
+ String graphPath =
+ outPutLoc + File.separator + loadModel.getDatabaseName() + File.separator + tableName
+ + File.separator + 0 + File.separator + 1 + File.separator + tableName + ".ktr";
+ File path = new File(graphPath);
+ if (path.exists()) {
+ if (!path.delete()) {
+ LOG.warn("delete " + path + " failed");
+ }
+ }
+
+ BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
+ 0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
+ Configuration configuration = new Configuration();
+ CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar());
+ CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter());
+ CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar());
+ CSVInputFormat.setHeaderExtractionEnabled(configuration, true);
+ CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar());
+ CSVInputFormat.setReadBufferSize(configuration,
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
+ CSVInputFormat.setNumberOfColumns(
+ configuration, String.valueOf(loadModel.getCsvHeaderColumns().length));
+ CSVInputFormat.setMaxColumns(configuration, "10");
+
+ TaskAttemptContextImpl hadoopAttemptContext =
+ new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
+ CSVInputFormat format = new CSVInputFormat();
+
+ RecordReader<NullWritable, StringArrayWritable> recordReader =
+ format.createRecordReader(blockDetails, hadoopAttemptContext);
+
+ CSVRecordReaderIterator readerIterator =
+ new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext);
+ new DataLoadExecutor().execute(loadModel,
+ new String[] {storeLocation + "/" + databaseName + "/" + tableName},
+ new CarbonIterator[]{readerIterator});
+
+ writeLoadMetadata(
+ loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
+ new ArrayList<LoadMetadataDetails>());
+ }
+
+ public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
+ String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+ LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
+ loadMetadataDetails.setLoadEndTime(System.currentTimeMillis());
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
+ loadMetadataDetails.setLoadName(String.valueOf(0));
+ loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
+ listOfLoadFolderDetails.add(loadMetadataDetails);
+
+ String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator
+ + CarbonTablePath.TABLE_STATUS_FILE;
+
+ DataOutputStream dataOutputStream;
+ Gson gsonObjectToWrite = new Gson();
+ BufferedWriter brWriter = null;
+
+ AtomicFileOperations writeOperation =
+ new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+
+ try {
+
+ dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
+ brWriter.write(metadataInstance);
+ } finally {
+ try {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ } catch (Exception e) {
+ throw e;
+
+ }
+ CarbonUtil.closeStreams(brWriter);
+
+ }
+ writeOperation.close();
+
+ }
+
+ public static String readCurrentTime() {
+ SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
+ String date = null;
+
+ date = sdf.format(new Date());
+
+ return date;
+ }
+
+ public static void main(String[] args) throws Exception {
+ StoreCreator.createCarbonStore();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
deleted file mode 100644
index 395015e..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.util;
-
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructField;
-
-public class CarbonTypeUtil {
-
- public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
- DataType carbonDataType) {
- if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
- return DataTypes.StringType;
- } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
- return DataTypes.ShortType;
- } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
- return DataTypes.IntegerType;
- } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
- return DataTypes.LongType;
- } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
- return DataTypes.DoubleType;
- } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
- return DataTypes.BooleanType;
- } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
- return DataTypes.createDecimalType();
- } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
- return DataTypes.TimestampType;
- } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
- return DataTypes.DateType;
- } else {
- return null;
- }
- }
-
- public static StructField[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) {
- StructField[] fields = new StructField[carbonColumns.length];
- for (int i = 0; i < carbonColumns.length; i++) {
- CarbonColumn carbonColumn = carbonColumns[i];
- if (carbonColumn.isDimension()) {
- if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(carbonColumn.getDataType());
- fields[i] = new StructField(carbonColumn.getColName(),
- CarbonTypeUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null);
- } else if (!carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
- fields[i] = new StructField(carbonColumn.getColName(),
- CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
- } else if (carbonColumn.isComplex()) {
- fields[i] = new StructField(carbonColumn.getColName(),
- CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
- } else {
- fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil
- .convertCarbonToSparkDataType(
- org.apache.carbondata.core.metadata.datatype.DataTypes.INT), true, null);
- }
- } else if (carbonColumn.isMeasure()) {
- DataType dataType = carbonColumn.getDataType();
- if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN
- || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT
- || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT
- || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
- fields[i] = new StructField(carbonColumn.getColName(),
- CarbonTypeUtil.convertCarbonToSparkDataType(dataType), true, null);
- } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
- CarbonMeasure measure = (CarbonMeasure) carbonColumn;
- fields[i] = new StructField(carbonColumn.getColName(),
- new DecimalType(measure.getPrecision(), measure.getScale()), true, null);
- } else {
- fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil
- .convertCarbonToSparkDataType(
- org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE), true, null);
- }
- }
- }
- return fields;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
index 2f029ab..ea242d1 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
@@ -27,8 +27,6 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
-import junit.framework.TestCase;
-
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -41,7 +39,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonProjection;
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.hadoop.test.util.StoreCreator;
+import org.apache.carbondata.hadoop.testutil.StoreCreator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@@ -52,9 +50,7 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
public class CarbonTableInputFormatTest {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
index 653a49e..99f69c2 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
-import org.apache.carbondata.hadoop.test.util.StoreCreator;
+import org.apache.carbondata.hadoop.testutil.StoreCreator;
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
deleted file mode 100644
index 57f488f..0000000
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.statusmanager.FileFormat;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class CarbonStreamInputFormatTest extends TestCase {
-
- private TaskAttemptID taskAttemptId;
- private TaskAttemptContext taskAttemptContext;
- private Configuration hadoopConf;
- private AbsoluteTableIdentifier identifier;
- private String tablePath;
-
-
- @Override protected void setUp() throws Exception {
- tablePath = new File("target/stream_input").getCanonicalPath();
- String dbName = "default";
- String tableName = "stream_table_input";
- identifier = AbsoluteTableIdentifier.from(
- tablePath,
- new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
-
- JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
- TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
- taskAttemptId = new TaskAttemptID(taskId, 0);
-
- hadoopConf = new Configuration();
- taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
- }
-
- private InputSplit buildInputSplit() throws IOException {
- CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
- List<CarbonInputSplit> splitList = new ArrayList<>();
- splitList.add(carbonInputSplit);
- return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" },
- FileFormat.ROW_V1);
- }
-
- @Test public void testCreateRecordReader() {
- try {
- InputSplit inputSplit = buildInputSplit();
- CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
- RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
- Assert.assertNotNull("Failed to create record reader", recordReader);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.assertTrue(e.getMessage(), false);
- }
- }
-
- @Override protected void tearDown() throws Exception {
- super.tearDown();
- if (tablePath != null) {
- FileFactory.deleteAllFilesOfDir(new File(tablePath));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
deleted file mode 100644
index e871c7e..0000000
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Date;
-import java.util.UUID;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.hadoop.test.util.StoreCreator;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class CarbonStreamOutputFormatTest extends TestCase {
-
- private Configuration hadoopConf;
- private TaskAttemptID taskAttemptId;
- private CarbonLoadModel carbonLoadModel;
- private String tablePath;
-
- @Override protected void setUp() throws Exception {
- super.setUp();
- JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
- TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
- taskAttemptId = new TaskAttemptID(taskId, 0);
-
- hadoopConf = new Configuration();
- hadoopConf.set("mapred.job.id", jobId.toString());
- hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID().toString());
- hadoopConf.set("mapred.task.id", taskAttemptId.toString());
- hadoopConf.setBoolean("mapred.task.is.map", true);
- hadoopConf.setInt("mapred.task.partition", 0);
-
- tablePath = new File("target/stream_output").getCanonicalPath();
- String dbName = "default";
- String tableName = "stream_table_output";
- AbsoluteTableIdentifier identifier =
- AbsoluteTableIdentifier.from(
- tablePath,
- new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
-
- CarbonTable table = StoreCreator.createTable(identifier);
-
- String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
- carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier);
- }
-
- @Test public void testSetCarbonLoadModel() {
- try {
- CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
- } catch (IOException e) {
- Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false);
- }
- }
-
- @Test public void testGetCarbonLoadModel() {
- try {
- CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
- CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
-
- Assert.assertNotNull("Failed to get CarbonLoadModel", model);
- Assert.assertTrue("CarbonLoadModel should be same with previous",
- carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp());
-
- } catch (IOException e) {
- Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false);
- }
- }
-
- @Test public void testGetRecordWriter() {
- CarbonStreamOutputFormat outputFormat = new CarbonStreamOutputFormat();
- try {
- CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
- TaskAttemptContext taskAttemptContext =
- new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
- RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
- Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.assertTrue(e.getMessage(), false);
- }
- }
-
- @Override protected void tearDown() throws Exception {
- super.tearDown();
- if (tablePath != null) {
- FileFactory.deleteAllFilesOfDir(new File(tablePath));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
deleted file mode 100644
index 8e8916d..0000000
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.test.util;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.DataMapStoreManager;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
-import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.metadata.schema.table.TableSchema;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatus;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
-import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
-import org.apache.carbondata.core.writer.ThriftWriter;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
-import org.apache.carbondata.processing.loading.DataLoadExecutor;
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
-import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
-import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.util.TableOptionConstant;
-
-import com.google.gson.Gson;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * This class will create store file based on provided schema
- *
- */
-public class StoreCreator {
-
- private static AbsoluteTableIdentifier absoluteTableIdentifier;
- private static String storePath = null;
-
- static {
- try {
- storePath = new File("target/store").getCanonicalPath();
- String dbName = "testdb";
- String tableName = "testtable";
- absoluteTableIdentifier =
- AbsoluteTableIdentifier.from(
- storePath +"/testdb/testtable",
- new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
- } catch (IOException ex) {
-
- }
- }
-
- public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
- return absoluteTableIdentifier;
- }
-
- public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath,
- AbsoluteTableIdentifier absoluteTableIdentifier) {
- CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
- CarbonLoadModel loadModel = new CarbonLoadModel();
- loadModel.setCarbonDataLoadSchema(schema);
- loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
- loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
- loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
- loadModel.setFactFilePath(factFilePath);
- loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
- loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
- loadModel.setDateFormat(null);
- loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
- CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));
- loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_DATE_FORMAT,
- CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
- loadModel
- .setSerializationNullFormat(
- TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
- loadModel
- .setBadRecordsLoggerEnable(
- TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false");
- loadModel
- .setBadRecordsAction(
- TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE");
- loadModel
- .setIsEmptyDataBadRecord(
- DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false");
- loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary");
- loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
- loadModel.setTaskNo("0");
- loadModel.setSegmentId("0");
- loadModel.setFactTimeStamp(System.currentTimeMillis());
- loadModel.setMaxColumns("10");
- return loadModel;
- }
-
- /**
- * Create store without any restructure
- */
- public static void createCarbonStore() throws Exception {
- CarbonLoadModel loadModel = createTableAndLoadModel();
- loadData(loadModel, storePath);
- }
-
- /**
- * Method to clear the data maps
- */
- public static void clearDataMaps() {
- DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier);
- }
-
- public static CarbonLoadModel createTableAndLoadModel() throws Exception {
- String factFilePath =
- new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
- File storeDir = new File(storePath);
- CarbonUtil.deleteFoldersAndFiles(storeDir);
- CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
- storePath);
-
- CarbonTable table = createTable(absoluteTableIdentifier);
- writeDictionary(factFilePath, table);
- return buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier);
- }
-
- public static CarbonTable createTable(
- AbsoluteTableIdentifier identifier) throws IOException {
- TableInfo tableInfo = new TableInfo();
- tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
- TableSchema tableSchema = new TableSchema();
- tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
- List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
- ArrayList<Encoding> encodings = new ArrayList<>();
- encodings.add(Encoding.DICTIONARY);
- ColumnSchema id = new ColumnSchema();
- id.setColumnName("ID");
- id.setColumnar(true);
- id.setDataType(DataTypes.INT);
- id.setEncodingList(encodings);
- id.setColumnUniqueId(UUID.randomUUID().toString());
- id.setColumnReferenceId(id.getColumnUniqueId());
- id.setDimensionColumn(true);
- id.setColumnGroup(1);
- columnSchemas.add(id);
-
- ColumnSchema date = new ColumnSchema();
- date.setColumnName("date");
- date.setColumnar(true);
- date.setDataType(DataTypes.STRING);
- date.setEncodingList(encodings);
- date.setColumnUniqueId(UUID.randomUUID().toString());
- date.setDimensionColumn(true);
- date.setColumnGroup(2);
- date.setSortColumn(true);
- date.setColumnReferenceId(id.getColumnUniqueId());
- columnSchemas.add(date);
-
- ColumnSchema country = new ColumnSchema();
- country.setColumnName("country");
- country.setColumnar(true);
- country.setDataType(DataTypes.STRING);
- country.setEncodingList(encodings);
- country.setColumnUniqueId(UUID.randomUUID().toString());
- country.setDimensionColumn(true);
- country.setColumnGroup(3);
- country.setSortColumn(true);
- country.setColumnReferenceId(id.getColumnUniqueId());
- columnSchemas.add(country);
-
- ColumnSchema name = new ColumnSchema();
- name.setColumnName("name");
- name.setColumnar(true);
- name.setDataType(DataTypes.STRING);
- name.setEncodingList(encodings);
- name.setColumnUniqueId(UUID.randomUUID().toString());
- name.setDimensionColumn(true);
- name.setColumnGroup(4);
- name.setSortColumn(true);
- name.setColumnReferenceId(id.getColumnUniqueId());
- columnSchemas.add(name);
-
- ColumnSchema phonetype = new ColumnSchema();
- phonetype.setColumnName("phonetype");
- phonetype.setColumnar(true);
- phonetype.setDataType(DataTypes.STRING);
- phonetype.setEncodingList(encodings);
- phonetype.setColumnUniqueId(UUID.randomUUID().toString());
- phonetype.setDimensionColumn(true);
- phonetype.setColumnGroup(5);
- phonetype.setSortColumn(true);
- phonetype.setColumnReferenceId(id.getColumnUniqueId());
- columnSchemas.add(phonetype);
-
- ColumnSchema serialname = new ColumnSchema();
- serialname.setColumnName("serialname");
- serialname.setColumnar(true);
- serialname.setDataType(DataTypes.STRING);
- serialname.setEncodingList(encodings);
- serialname.setColumnUniqueId(UUID.randomUUID().toString());
- serialname.setDimensionColumn(true);
- serialname.setColumnGroup(6);
- serialname.setSortColumn(true);
- serialname.setColumnReferenceId(id.getColumnUniqueId());
- columnSchemas.add(serialname);
-
- ColumnSchema salary = new ColumnSchema();
- salary.setColumnName("salary");
- salary.setColumnar(true);
- salary.setDataType(DataTypes.INT);
- salary.setEncodingList(new ArrayList<Encoding>());
- salary.setColumnUniqueId(UUID.randomUUID().toString());
- salary.setDimensionColumn(false);
- salary.setColumnReferenceId(id.getColumnUniqueId());
- salary.setColumnGroup(7);
- columnSchemas.add(salary);
-
- tableSchema.setListOfColumns(columnSchemas);
- SchemaEvolution schemaEvol = new SchemaEvolution();
- schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
- tableSchema.setSchemaEvalution(schemaEvol);
- tableSchema.setTableId(UUID.randomUUID().toString());
- tableInfo.setTableUniqueName(
- identifier.getCarbonTableIdentifier().getTableUniqueName()
- );
- tableInfo.setLastUpdatedTime(System.currentTimeMillis());
- tableInfo.setFactTable(tableSchema);
- tableInfo.setTablePath(identifier.getTablePath());
- String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
- String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
- CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
-
- SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
- org.apache.carbondata.format.TableInfo thriftTableInfo =
- schemaConverter.fromWrapperToExternalTableInfo(
- tableInfo,
- tableInfo.getDatabaseName(),
- tableInfo.getFactTable().getTableName());
- org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
- new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());
- thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()
- .add(schemaEvolutionEntry);
-
- FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath);
- if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
- FileFactory.mkdirs(schemaMetadataPath, fileType);
- }
-
- ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
- thriftWriter.open();
- thriftWriter.write(thriftTableInfo);
- thriftWriter.close();
- return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
- }
-
- private static void writeDictionary(String factFilePath, CarbonTable table) throws Exception {
- BufferedReader reader = new BufferedReader(new FileReader(factFilePath));
- String header = reader.readLine();
- String[] split = header.split(",");
- List<CarbonColumn> allCols = new ArrayList<CarbonColumn>();
- List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
- allCols.addAll(dims);
- List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName());
- allCols.addAll(msrs);
- Set<String>[] set = new HashSet[dims.size()];
- for (int i = 0; i < set.length; i++) {
- set[i] = new HashSet<String>();
- }
- String line = reader.readLine();
- while (line != null) {
- String[] data = line.split(",");
- for (int i = 0; i < set.length; i++) {
- set[i].add(data[i]);
- }
- line = reader.readLine();
- }
-
- Cache dictCache = CacheProvider.getInstance()
- .createCache(CacheType.REVERSE_DICTIONARY);
- for (int i = 0; i < set.length; i++) {
- ColumnIdentifier columnIdentifier = new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
- DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
- new DictionaryColumnUniqueIdentifier(table.getAbsoluteTableIdentifier(), columnIdentifier,
- columnIdentifier.getDataType());
- CarbonDictionaryWriter writer =
- new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
- for (String value : set[i]) {
- writer.write(value);
- }
- writer.close();
- writer.commit();
- Dictionary dict = (Dictionary) dictCache.get(
- new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
- columnIdentifier, dims.get(i).getDataType()));
- CarbonDictionarySortInfoPreparator preparator =
- new CarbonDictionarySortInfoPreparator();
- List<String> newDistinctValues = new ArrayList<String>();
- CarbonDictionarySortInfo dictionarySortInfo =
- preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType());
- CarbonDictionarySortIndexWriter carbonDictionaryWriter =
- new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier);
- try {
- carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex());
- carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted());
- } finally {
- carbonDictionaryWriter.close();
- }
- }
- reader.close();
- }
-
- /**
- * Execute graph which will further load data
- *
- * @param loadModel
- * @param storeLocation
- * @throws Exception
- */
- public static void loadData(CarbonLoadModel loadModel, String storeLocation)
- throws Exception {
- new File(storeLocation).mkdirs();
- String outPutLoc = storeLocation + "/etl";
- String databaseName = loadModel.getDatabaseName();
- String tableName = loadModel.getTableName();
- String tempLocationKey = databaseName + '_' + tableName + "_1";
- CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation + "/" + databaseName + "/" + tableName);
- CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
- CarbonProperties.getInstance().addProperty("send.signal.load", "false");
- CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true");
- CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1");
- CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true");
- CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true");
- CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true");
- CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false");
- CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000");
-
- String graphPath =
- outPutLoc + File.separator + loadModel.getDatabaseName() + File.separator + tableName
- + File.separator + 0 + File.separator + 1 + File.separator + tableName + ".ktr";
- File path = new File(graphPath);
- if (path.exists()) {
- path.delete();
- }
-
- BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
- 0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
- Configuration configuration = new Configuration();
- CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar());
- CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter());
- CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar());
- CSVInputFormat.setHeaderExtractionEnabled(configuration, true);
- CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar());
- CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
- CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
- CSVInputFormat.setNumberOfColumns(configuration, String.valueOf(loadModel.getCsvHeaderColumns().length));
- CSVInputFormat.setMaxColumns(configuration, "10");
-
- TaskAttemptContextImpl hadoopAttemptContext = new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
- CSVInputFormat format = new CSVInputFormat();
-
- RecordReader<NullWritable, StringArrayWritable> recordReader =
- format.createRecordReader(blockDetails, hadoopAttemptContext);
-
- CSVRecordReaderIterator readerIterator = new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext);
- new DataLoadExecutor().execute(loadModel,
- new String[] {storeLocation + "/" + databaseName + "/" + tableName},
- new CarbonIterator[]{readerIterator});
-
- writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
- new ArrayList<LoadMetadataDetails>());
- }
-
- public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
- String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
- LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
- loadMetadataDetails.setLoadEndTime(System.currentTimeMillis());
- loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
- loadMetadataDetails.setLoadName(String.valueOf(0));
- loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
- listOfLoadFolderDetails.add(loadMetadataDetails);
-
- String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator
- + CarbonTablePath.TABLE_STATUS_FILE;
-
- DataOutputStream dataOutputStream;
- Gson gsonObjectToWrite = new Gson();
- BufferedWriter brWriter = null;
-
- AtomicFileOperations writeOperation =
- new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
-
- try {
-
- dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
- brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
- String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
- brWriter.write(metadataInstance);
- } finally {
- try {
- if (null != brWriter) {
- brWriter.flush();
- }
- } catch (Exception e) {
- throw e;
-
- }
- CarbonUtil.closeStreams(brWriter);
-
- }
- writeOperation.close();
-
- }
-
- public static String readCurrentTime() {
- SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
- String date = null;
-
- date = sdf.format(new Date());
-
- return date;
- }
-
- public static void main(String[] args) throws Exception {
- StoreCreator.createCarbonStore();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 16f327d..f011a75 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -36,7 +36,7 @@
<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-hadoop</artifactId>
+ <artifactId>carbondata-streaming</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
index 6e9e0a6..f6dc65b 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
@@ -20,10 +20,19 @@ package org.apache.carbondata.spark.util;
import java.io.Serializable;
import java.math.BigDecimal;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.util.DataTypeConverter;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
import org.apache.spark.unsafe.types.UTF8String;
/**
@@ -90,4 +99,76 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
public Object wrapWithGenericRow(Object[] fields) {
return new GenericInternalRow(fields);
}
+
+ private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
+ DataType carbonDataType) {
+ if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
+ return DataTypes.StringType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
+ return DataTypes.ShortType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
+ return DataTypes.IntegerType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+ return DataTypes.LongType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
+ return DataTypes.DoubleType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
+ return DataTypes.BooleanType;
+ } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
+ return DataTypes.createDecimalType();
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
+ return DataTypes.TimestampType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
+ return DataTypes.DateType;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * convert from CarbonColumn array to Spark's StructField array
+ */
+ @Override
+ public Object[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) {
+ StructField[] fields = new StructField[carbonColumns.length];
+ for (int i = 0; i < carbonColumns.length; i++) {
+ CarbonColumn carbonColumn = carbonColumns[i];
+ if (carbonColumn.isDimension()) {
+ if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(carbonColumn.getDataType());
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(generator.getReturnType()), true, null);
+ } else if (!carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+ } else if (carbonColumn.isComplex()) {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+ } else {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(
+ org.apache.carbondata.core.metadata.datatype.DataTypes.INT), true, null);
+ }
+ } else if (carbonColumn.isMeasure()) {
+ DataType dataType = carbonColumn.getDataType();
+ if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN
+ || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT
+ || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT
+ || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(dataType), true, null);
+ } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
+ CarbonMeasure measure = (CarbonMeasure) carbonColumn;
+ fields[i] = new StructField(carbonColumn.getColName(),
+ new DecimalType(measure.getPrecision(), measure.getScale()), true, null);
+ } else {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ convertCarbonToSparkDataType(
+ org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE), true, null);
+ }
+ }
+ }
+ return fields;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 2be0efc..7e549a6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -53,10 +53,11 @@ import org.apache.carbondata.core.statusmanager.FileFormat
import org.apache.carbondata.core.util._
import org.apache.carbondata.hadoop._
import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableInputFormat}
-import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.InitInputMetrics
import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
+import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
/**
* This RDD is used to perform query on CarbonData file. Before sending tasks to scan