You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/28 03:19:45 UTC

[4/5] carbondata git commit: [CARBONDATA-2165]Remove spark in carbon-hadoop module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
new file mode 100644
index 0000000..b2c2d39
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.testutil;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
+import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
+import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
+import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * This class will create store file based on provided schema
+ *
+ */
+public class StoreCreator {
+
+  private static LogService LOG =
+      LogServiceFactory.getLogService(StoreCreator.class.getCanonicalName());
+  private static AbsoluteTableIdentifier absoluteTableIdentifier;
+  private static String storePath = null;
+
+  static {
+    storePath = new File("target/store").getAbsolutePath();
+    String dbName = "testdb";
+    String tableName = "testtable";
+    absoluteTableIdentifier = AbsoluteTableIdentifier.from(storePath + "/testdb/testtable",
+        new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+  }
+
+  public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+    return absoluteTableIdentifier;
+  }
+
+  public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
+    CarbonLoadModel loadModel = new CarbonLoadModel();
+    loadModel.setCarbonDataLoadSchema(schema);
+    loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
+    loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+    loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+    loadModel.setFactFilePath(factFilePath);
+    loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
+    loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
+    loadModel.setDateFormat(null);
+    loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));
+    loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+    loadModel
+        .setSerializationNullFormat(
+            TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
+    loadModel
+        .setBadRecordsLoggerEnable(
+            TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false");
+    loadModel
+        .setBadRecordsAction(
+            TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE");
+    loadModel
+        .setIsEmptyDataBadRecord(
+            DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false");
+    loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary");
+    loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
+    loadModel.setTaskNo("0");
+    loadModel.setSegmentId("0");
+    loadModel.setFactTimeStamp(System.currentTimeMillis());
+    loadModel.setMaxColumns("10");
+    return loadModel;
+  }
+
+  /**
+   * Create store without any restructure
+   */
+  public static void createCarbonStore() throws Exception {
+    CarbonLoadModel loadModel = createTableAndLoadModel();
+    loadData(loadModel, storePath);
+  }
+
+  /**
+   * Method to clear the data maps
+   */
+  public static void clearDataMaps() {
+    DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier);
+  }
+
+  public static CarbonLoadModel createTableAndLoadModel() throws Exception {
+    String factFilePath =
+        new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
+    File storeDir = new File(storePath);
+    CarbonUtil.deleteFoldersAndFiles(storeDir);
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
+        storePath);
+
+    CarbonTable table = createTable(absoluteTableIdentifier);
+    writeDictionary(factFilePath, table);
+    return buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier);
+  }
+
+  public static CarbonTable createTable(
+      AbsoluteTableIdentifier identifier) throws IOException {
+    TableInfo tableInfo = new TableInfo();
+    tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
+    TableSchema tableSchema = new TableSchema();
+    tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
+    List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
+    ArrayList<Encoding> encodings = new ArrayList<>();
+    encodings.add(Encoding.DICTIONARY);
+    ColumnSchema id = new ColumnSchema();
+    id.setColumnName("ID");
+    id.setColumnar(true);
+    id.setDataType(DataTypes.INT);
+    id.setEncodingList(encodings);
+    id.setColumnUniqueId(UUID.randomUUID().toString());
+    id.setColumnReferenceId(id.getColumnUniqueId());
+    id.setDimensionColumn(true);
+    id.setColumnGroup(1);
+    columnSchemas.add(id);
+
+    ColumnSchema date = new ColumnSchema();
+    date.setColumnName("date");
+    date.setColumnar(true);
+    date.setDataType(DataTypes.STRING);
+    date.setEncodingList(encodings);
+    date.setColumnUniqueId(UUID.randomUUID().toString());
+    date.setDimensionColumn(true);
+    date.setColumnGroup(2);
+    date.setSortColumn(true);
+    date.setColumnReferenceId(id.getColumnUniqueId());
+    columnSchemas.add(date);
+
+    ColumnSchema country = new ColumnSchema();
+    country.setColumnName("country");
+    country.setColumnar(true);
+    country.setDataType(DataTypes.STRING);
+    country.setEncodingList(encodings);
+    country.setColumnUniqueId(UUID.randomUUID().toString());
+    country.setDimensionColumn(true);
+    country.setColumnGroup(3);
+    country.setSortColumn(true);
+    country.setColumnReferenceId(id.getColumnUniqueId());
+    columnSchemas.add(country);
+
+    ColumnSchema name = new ColumnSchema();
+    name.setColumnName("name");
+    name.setColumnar(true);
+    name.setDataType(DataTypes.STRING);
+    name.setEncodingList(encodings);
+    name.setColumnUniqueId(UUID.randomUUID().toString());
+    name.setDimensionColumn(true);
+    name.setColumnGroup(4);
+    name.setSortColumn(true);
+    name.setColumnReferenceId(id.getColumnUniqueId());
+    columnSchemas.add(name);
+
+    ColumnSchema phonetype = new ColumnSchema();
+    phonetype.setColumnName("phonetype");
+    phonetype.setColumnar(true);
+    phonetype.setDataType(DataTypes.STRING);
+    phonetype.setEncodingList(encodings);
+    phonetype.setColumnUniqueId(UUID.randomUUID().toString());
+    phonetype.setDimensionColumn(true);
+    phonetype.setColumnGroup(5);
+    phonetype.setSortColumn(true);
+    phonetype.setColumnReferenceId(id.getColumnUniqueId());
+    columnSchemas.add(phonetype);
+
+    ColumnSchema serialname = new ColumnSchema();
+    serialname.setColumnName("serialname");
+    serialname.setColumnar(true);
+    serialname.setDataType(DataTypes.STRING);
+    serialname.setEncodingList(encodings);
+    serialname.setColumnUniqueId(UUID.randomUUID().toString());
+    serialname.setDimensionColumn(true);
+    serialname.setColumnGroup(6);
+    serialname.setSortColumn(true);
+    serialname.setColumnReferenceId(id.getColumnUniqueId());
+    columnSchemas.add(serialname);
+
+    ColumnSchema salary = new ColumnSchema();
+    salary.setColumnName("salary");
+    salary.setColumnar(true);
+    salary.setDataType(DataTypes.INT);
+    salary.setEncodingList(new ArrayList<Encoding>());
+    salary.setColumnUniqueId(UUID.randomUUID().toString());
+    salary.setDimensionColumn(false);
+    salary.setColumnReferenceId(id.getColumnUniqueId());
+    salary.setColumnGroup(7);
+    columnSchemas.add(salary);
+
+    tableSchema.setListOfColumns(columnSchemas);
+    SchemaEvolution schemaEvol = new SchemaEvolution();
+    schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
+    tableSchema.setSchemaEvalution(schemaEvol);
+    tableSchema.setTableId(UUID.randomUUID().toString());
+    tableInfo.setTableUniqueName(
+        identifier.getCarbonTableIdentifier().getTableUniqueName()
+    );
+    tableInfo.setLastUpdatedTime(System.currentTimeMillis());
+    tableInfo.setFactTable(tableSchema);
+    tableInfo.setTablePath(identifier.getTablePath());
+    String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
+    String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
+    CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
+
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    org.apache.carbondata.format.TableInfo thriftTableInfo =
+        schemaConverter.fromWrapperToExternalTableInfo(
+            tableInfo,
+            tableInfo.getDatabaseName(),
+            tableInfo.getFactTable().getTableName());
+    org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
+        new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());
+    thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()
+        .add(schemaEvolutionEntry);
+
+    FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath);
+    if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+      FileFactory.mkdirs(schemaMetadataPath, fileType);
+    }
+
+    ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
+    thriftWriter.open();
+    thriftWriter.write(thriftTableInfo);
+    thriftWriter.close();
+    return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
+  }
+
+  private static void writeDictionary(String factFilePath, CarbonTable table) throws Exception {
+    BufferedReader reader = new BufferedReader(new InputStreamReader(
+        new FileInputStream(factFilePath), "UTF-8"));
+    List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
+    Set<String>[] set = new HashSet[dims.size()];
+    for (int i = 0; i < set.length; i++) {
+      set[i] = new HashSet<String>();
+    }
+    String line = reader.readLine();
+    while (line != null) {
+      String[] data = line.split(",");
+      for (int i = 0; i < set.length; i++) {
+        set[i].add(data[i]);
+      }
+      line = reader.readLine();
+    }
+
+    Cache dictCache = CacheProvider.getInstance()
+        .createCache(CacheType.REVERSE_DICTIONARY);
+    for (int i = 0; i < set.length; i++) {
+      ColumnIdentifier columnIdentifier =
+          new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
+      DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
+          new DictionaryColumnUniqueIdentifier(
+              table.getAbsoluteTableIdentifier(), columnIdentifier, columnIdentifier.getDataType());
+      CarbonDictionaryWriter writer =
+          new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
+      for (String value : set[i]) {
+        writer.write(value);
+      }
+      writer.close();
+      writer.commit();
+      Dictionary dict = (Dictionary) dictCache.get(
+          new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+              columnIdentifier, dims.get(i).getDataType()));
+      CarbonDictionarySortInfoPreparator preparator =
+          new CarbonDictionarySortInfoPreparator();
+      List<String> newDistinctValues = new ArrayList<String>();
+      CarbonDictionarySortInfo dictionarySortInfo =
+          preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType());
+      CarbonDictionarySortIndexWriter carbonDictionaryWriter =
+          new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier);
+      try {
+        carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex());
+        carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted());
+      } finally {
+        carbonDictionaryWriter.close();
+      }
+    }
+    reader.close();
+  }
+
+  /**
+   * Execute graph which will further load data
+   *
+   * @param loadModel
+   * @param storeLocation
+   * @throws Exception
+   */
+  public static void loadData(CarbonLoadModel loadModel, String storeLocation)
+      throws Exception {
+    if (new File(storeLocation).mkdirs()) {
+      LOG.warn("mkdir is failed");
+    }
+    String outPutLoc = storeLocation + "/etl";
+    String databaseName = loadModel.getDatabaseName();
+    String tableName = loadModel.getTableName();
+    String tempLocationKey = databaseName + '_' + tableName + "_1";
+    CarbonProperties.getInstance().addProperty(
+        tempLocationKey, storeLocation + "/" + databaseName + "/" + tableName);
+    CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
+    CarbonProperties.getInstance().addProperty("send.signal.load", "false");
+    CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true");
+    CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1");
+    CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true");
+    CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true");
+    CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true");
+    CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false");
+    CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000");
+
+    String graphPath =
+        outPutLoc + File.separator + loadModel.getDatabaseName() + File.separator + tableName
+            + File.separator + 0 + File.separator + 1 + File.separator + tableName + ".ktr";
+    File path = new File(graphPath);
+    if (path.exists()) {
+      if (!path.delete()) {
+        LOG.warn("delete " + path + " failed");
+      }
+    }
+
+    BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
+        0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
+    Configuration configuration = new Configuration();
+    CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar());
+    CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter());
+    CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar());
+    CSVInputFormat.setHeaderExtractionEnabled(configuration, true);
+    CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar());
+    CSVInputFormat.setReadBufferSize(configuration,
+        CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+            CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
+    CSVInputFormat.setNumberOfColumns(
+        configuration, String.valueOf(loadModel.getCsvHeaderColumns().length));
+    CSVInputFormat.setMaxColumns(configuration, "10");
+
+    TaskAttemptContextImpl hadoopAttemptContext =
+        new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
+    CSVInputFormat format = new CSVInputFormat();
+
+    RecordReader<NullWritable, StringArrayWritable> recordReader =
+        format.createRecordReader(blockDetails, hadoopAttemptContext);
+
+    CSVRecordReaderIterator readerIterator =
+        new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext);
+    new DataLoadExecutor().execute(loadModel,
+        new String[] {storeLocation + "/" + databaseName + "/" + tableName},
+        new CarbonIterator[]{readerIterator});
+
+    writeLoadMetadata(
+        loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
+        new ArrayList<LoadMetadataDetails>());
+  }
+
+  public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
+      String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+    LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
+    loadMetadataDetails.setLoadEndTime(System.currentTimeMillis());
+    loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
+    loadMetadataDetails.setLoadName(String.valueOf(0));
+    loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
+    listOfLoadFolderDetails.add(loadMetadataDetails);
+
+    String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator
+        + CarbonTablePath.TABLE_STATUS_FILE;
+
+    DataOutputStream dataOutputStream;
+    Gson gsonObjectToWrite = new Gson();
+    BufferedWriter brWriter = null;
+
+    AtomicFileOperations writeOperation =
+        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+
+    try {
+
+      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
+      brWriter.write(metadataInstance);
+    } finally {
+      try {
+        if (null != brWriter) {
+          brWriter.flush();
+        }
+      } catch (Exception e) {
+        throw e;
+
+      }
+      CarbonUtil.closeStreams(brWriter);
+
+    }
+    writeOperation.close();
+
+  }
+
+  public static String readCurrentTime() {
+    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
+    String date = null;
+
+    date = sdf.format(new Date());
+
+    return date;
+  }
+
+  public static void main(String[] args) throws Exception {
+    StoreCreator.createCarbonStore();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
deleted file mode 100644
index 395015e..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.util;
-
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructField;
-
-public class CarbonTypeUtil {
-
-  public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
-      DataType carbonDataType) {
-    if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
-      return DataTypes.StringType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
-      return DataTypes.ShortType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
-      return DataTypes.IntegerType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
-      return DataTypes.LongType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
-      return DataTypes.DoubleType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
-      return DataTypes.BooleanType;
-    } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
-      return DataTypes.createDecimalType();
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
-      return DataTypes.TimestampType;
-    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
-      return DataTypes.DateType;
-    } else {
-      return null;
-    }
-  }
-
-  public static StructField[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) {
-    StructField[] fields = new StructField[carbonColumns.length];
-    for (int i = 0; i < carbonColumns.length; i++) {
-      CarbonColumn carbonColumn = carbonColumns[i];
-      if (carbonColumn.isDimension()) {
-        if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-          DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
-              .getDirectDictionaryGenerator(carbonColumn.getDataType());
-          fields[i] = new StructField(carbonColumn.getColName(),
-              CarbonTypeUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null);
-        } else if (!carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
-          fields[i] = new StructField(carbonColumn.getColName(),
-              CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
-        } else if (carbonColumn.isComplex()) {
-          fields[i] = new StructField(carbonColumn.getColName(),
-              CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
-        } else {
-          fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil
-              .convertCarbonToSparkDataType(
-                  org.apache.carbondata.core.metadata.datatype.DataTypes.INT), true, null);
-        }
-      } else if (carbonColumn.isMeasure()) {
-        DataType dataType = carbonColumn.getDataType();
-        if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN
-            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT
-            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT
-            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
-          fields[i] = new StructField(carbonColumn.getColName(),
-              CarbonTypeUtil.convertCarbonToSparkDataType(dataType), true, null);
-        } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
-          CarbonMeasure measure = (CarbonMeasure) carbonColumn;
-          fields[i] = new StructField(carbonColumn.getColName(),
-              new DecimalType(measure.getPrecision(), measure.getScale()), true, null);
-        } else {
-          fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil
-              .convertCarbonToSparkDataType(
-                  org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE), true, null);
-        }
-      }
-    }
-    return fields;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
index 2f029ab..ea242d1 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
@@ -27,8 +27,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 
-import junit.framework.TestCase;
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -41,7 +39,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.hadoop.test.util.StoreCreator;
+import org.apache.carbondata.hadoop.testutil.StoreCreator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -52,9 +50,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 public class CarbonTableInputFormatTest {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
index 653a49e..99f69c2 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
-import org.apache.carbondata.hadoop.test.util.StoreCreator;
+import org.apache.carbondata.hadoop.testutil.StoreCreator;
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
deleted file mode 100644
index 57f488f..0000000
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.statusmanager.FileFormat;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class CarbonStreamInputFormatTest extends TestCase {
-
-  private TaskAttemptID taskAttemptId;
-  private TaskAttemptContext taskAttemptContext;
-  private Configuration hadoopConf;
-  private AbsoluteTableIdentifier identifier;
-  private String tablePath;
-
-
-  @Override protected void setUp() throws Exception {
-    tablePath = new File("target/stream_input").getCanonicalPath();
-    String dbName = "default";
-    String tableName = "stream_table_input";
-    identifier = AbsoluteTableIdentifier.from(
-        tablePath,
-        new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
-
-    JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
-    TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
-    taskAttemptId = new TaskAttemptID(taskId, 0);
-
-    hadoopConf = new Configuration();
-    taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
-  }
-
-  private InputSplit buildInputSplit() throws IOException {
-    CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
-    List<CarbonInputSplit> splitList = new ArrayList<>();
-    splitList.add(carbonInputSplit);
-    return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" },
-        FileFormat.ROW_V1);
-  }
-
-  @Test public void testCreateRecordReader() {
-    try {
-      InputSplit inputSplit = buildInputSplit();
-      CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
-      RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
-      Assert.assertNotNull("Failed to create record reader", recordReader);
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.assertTrue(e.getMessage(), false);
-    }
-  }
-
-  @Override protected void tearDown() throws Exception {
-    super.tearDown();
-    if (tablePath != null) {
-      FileFactory.deleteAllFilesOfDir(new File(tablePath));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
deleted file mode 100644
index e871c7e..0000000
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Date;
-import java.util.UUID;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.hadoop.test.util.StoreCreator;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class CarbonStreamOutputFormatTest extends TestCase {
-
-  private Configuration hadoopConf;
-  private TaskAttemptID taskAttemptId;
-  private CarbonLoadModel carbonLoadModel;
-  private String tablePath;
-
-  @Override protected void setUp() throws Exception {
-    super.setUp();
-    JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
-    TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
-    taskAttemptId = new TaskAttemptID(taskId, 0);
-
-    hadoopConf = new Configuration();
-    hadoopConf.set("mapred.job.id", jobId.toString());
-    hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID().toString());
-    hadoopConf.set("mapred.task.id", taskAttemptId.toString());
-    hadoopConf.setBoolean("mapred.task.is.map", true);
-    hadoopConf.setInt("mapred.task.partition", 0);
-
-    tablePath = new File("target/stream_output").getCanonicalPath();
-    String dbName = "default";
-    String tableName = "stream_table_output";
-    AbsoluteTableIdentifier identifier =
-        AbsoluteTableIdentifier.from(
-            tablePath,
-            new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
-
-    CarbonTable table = StoreCreator.createTable(identifier);
-
-    String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
-    carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier);
-  }
-
-  @Test public void testSetCarbonLoadModel() {
-    try {
-      CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
-    } catch (IOException e) {
-      Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false);
-    }
-  }
-
-  @Test public void testGetCarbonLoadModel() {
-    try {
-      CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
-      CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
-
-      Assert.assertNotNull("Failed to get CarbonLoadModel", model);
-      Assert.assertTrue("CarbonLoadModel should be same with previous",
-          carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp());
-
-    } catch (IOException e) {
-      Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false);
-    }
-  }
-
-  @Test public void testGetRecordWriter() {
-    CarbonStreamOutputFormat outputFormat = new CarbonStreamOutputFormat();
-    try {
-      CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
-      TaskAttemptContext taskAttemptContext =
-          new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
-      RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
-      Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter);
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.assertTrue(e.getMessage(), false);
-    }
-  }
-
-  @Override protected void tearDown() throws Exception {
-    super.tearDown();
-    if (tablePath != null) {
-      FileFactory.deleteAllFilesOfDir(new File(tablePath));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
deleted file mode 100644
index 8e8916d..0000000
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.test.util;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.DataMapStoreManager;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
-import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.metadata.schema.table.TableSchema;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatus;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
-import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
-import org.apache.carbondata.core.writer.ThriftWriter;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
-import org.apache.carbondata.processing.loading.DataLoadExecutor;
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
-import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
-import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.util.TableOptionConstant;
-
-import com.google.gson.Gson;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * This class will create store file based on provided schema
- *
- */
-public class StoreCreator {
-
-  private static AbsoluteTableIdentifier absoluteTableIdentifier;
-  private static String storePath = null;
-
-  static {
-    try {
-      storePath = new File("target/store").getCanonicalPath();
-      String dbName = "testdb";
-      String tableName = "testtable";
-      absoluteTableIdentifier =
-          AbsoluteTableIdentifier.from(
-              storePath +"/testdb/testtable",
-              new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
-    } catch (IOException ex) {
-
-    }
-  }
-
-  public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
-    return absoluteTableIdentifier;
-  }
-
-  public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath,
-      AbsoluteTableIdentifier absoluteTableIdentifier) {
-    CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
-    CarbonLoadModel loadModel = new CarbonLoadModel();
-    loadModel.setCarbonDataLoadSchema(schema);
-    loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
-    loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
-    loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
-    loadModel.setFactFilePath(factFilePath);
-    loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
-    loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
-    loadModel.setDateFormat(null);
-    loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-        CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));
-    loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.CARBON_DATE_FORMAT,
-        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
-    loadModel
-        .setSerializationNullFormat(
-            TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
-    loadModel
-        .setBadRecordsLoggerEnable(
-            TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false");
-    loadModel
-        .setBadRecordsAction(
-            TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE");
-    loadModel
-        .setIsEmptyDataBadRecord(
-            DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false");
-    loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary");
-    loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
-    loadModel.setTaskNo("0");
-    loadModel.setSegmentId("0");
-    loadModel.setFactTimeStamp(System.currentTimeMillis());
-    loadModel.setMaxColumns("10");
-    return loadModel;
-  }
-
-  /**
-   * Create store without any restructure
-   */
-  public static void createCarbonStore() throws Exception {
-    CarbonLoadModel loadModel = createTableAndLoadModel();
-    loadData(loadModel, storePath);
-  }
-
-  /**
-   * Method to clear the data maps
-   */
-  public static void clearDataMaps() {
-    DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier);
-  }
-
-  public static CarbonLoadModel createTableAndLoadModel() throws Exception {
-    String factFilePath =
-        new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
-    File storeDir = new File(storePath);
-    CarbonUtil.deleteFoldersAndFiles(storeDir);
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
-        storePath);
-
-    CarbonTable table = createTable(absoluteTableIdentifier);
-    writeDictionary(factFilePath, table);
-    return buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier);
-  }
-
-  public static CarbonTable createTable(
-      AbsoluteTableIdentifier identifier) throws IOException {
-    TableInfo tableInfo = new TableInfo();
-    tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
-    TableSchema tableSchema = new TableSchema();
-    tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
-    List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
-    ArrayList<Encoding> encodings = new ArrayList<>();
-    encodings.add(Encoding.DICTIONARY);
-    ColumnSchema id = new ColumnSchema();
-    id.setColumnName("ID");
-    id.setColumnar(true);
-    id.setDataType(DataTypes.INT);
-    id.setEncodingList(encodings);
-    id.setColumnUniqueId(UUID.randomUUID().toString());
-    id.setColumnReferenceId(id.getColumnUniqueId());
-    id.setDimensionColumn(true);
-    id.setColumnGroup(1);
-    columnSchemas.add(id);
-
-    ColumnSchema date = new ColumnSchema();
-    date.setColumnName("date");
-    date.setColumnar(true);
-    date.setDataType(DataTypes.STRING);
-    date.setEncodingList(encodings);
-    date.setColumnUniqueId(UUID.randomUUID().toString());
-    date.setDimensionColumn(true);
-    date.setColumnGroup(2);
-    date.setSortColumn(true);
-    date.setColumnReferenceId(id.getColumnUniqueId());
-    columnSchemas.add(date);
-
-    ColumnSchema country = new ColumnSchema();
-    country.setColumnName("country");
-    country.setColumnar(true);
-    country.setDataType(DataTypes.STRING);
-    country.setEncodingList(encodings);
-    country.setColumnUniqueId(UUID.randomUUID().toString());
-    country.setDimensionColumn(true);
-    country.setColumnGroup(3);
-    country.setSortColumn(true);
-    country.setColumnReferenceId(id.getColumnUniqueId());
-    columnSchemas.add(country);
-
-    ColumnSchema name = new ColumnSchema();
-    name.setColumnName("name");
-    name.setColumnar(true);
-    name.setDataType(DataTypes.STRING);
-    name.setEncodingList(encodings);
-    name.setColumnUniqueId(UUID.randomUUID().toString());
-    name.setDimensionColumn(true);
-    name.setColumnGroup(4);
-    name.setSortColumn(true);
-    name.setColumnReferenceId(id.getColumnUniqueId());
-    columnSchemas.add(name);
-
-    ColumnSchema phonetype = new ColumnSchema();
-    phonetype.setColumnName("phonetype");
-    phonetype.setColumnar(true);
-    phonetype.setDataType(DataTypes.STRING);
-    phonetype.setEncodingList(encodings);
-    phonetype.setColumnUniqueId(UUID.randomUUID().toString());
-    phonetype.setDimensionColumn(true);
-    phonetype.setColumnGroup(5);
-    phonetype.setSortColumn(true);
-    phonetype.setColumnReferenceId(id.getColumnUniqueId());
-    columnSchemas.add(phonetype);
-
-    ColumnSchema serialname = new ColumnSchema();
-    serialname.setColumnName("serialname");
-    serialname.setColumnar(true);
-    serialname.setDataType(DataTypes.STRING);
-    serialname.setEncodingList(encodings);
-    serialname.setColumnUniqueId(UUID.randomUUID().toString());
-    serialname.setDimensionColumn(true);
-    serialname.setColumnGroup(6);
-    serialname.setSortColumn(true);
-    serialname.setColumnReferenceId(id.getColumnUniqueId());
-    columnSchemas.add(serialname);
-
-    ColumnSchema salary = new ColumnSchema();
-    salary.setColumnName("salary");
-    salary.setColumnar(true);
-    salary.setDataType(DataTypes.INT);
-    salary.setEncodingList(new ArrayList<Encoding>());
-    salary.setColumnUniqueId(UUID.randomUUID().toString());
-    salary.setDimensionColumn(false);
-    salary.setColumnReferenceId(id.getColumnUniqueId());
-    salary.setColumnGroup(7);
-    columnSchemas.add(salary);
-
-    tableSchema.setListOfColumns(columnSchemas);
-    SchemaEvolution schemaEvol = new SchemaEvolution();
-    schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
-    tableSchema.setSchemaEvalution(schemaEvol);
-    tableSchema.setTableId(UUID.randomUUID().toString());
-    tableInfo.setTableUniqueName(
-        identifier.getCarbonTableIdentifier().getTableUniqueName()
-    );
-    tableInfo.setLastUpdatedTime(System.currentTimeMillis());
-    tableInfo.setFactTable(tableSchema);
-    tableInfo.setTablePath(identifier.getTablePath());
-    String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
-    String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
-    CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
-
-    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    org.apache.carbondata.format.TableInfo thriftTableInfo =
-        schemaConverter.fromWrapperToExternalTableInfo(
-            tableInfo,
-            tableInfo.getDatabaseName(),
-            tableInfo.getFactTable().getTableName());
-    org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
-        new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());
-    thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()
-        .add(schemaEvolutionEntry);
-
-    FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath);
-    if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
-      FileFactory.mkdirs(schemaMetadataPath, fileType);
-    }
-
-    ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
-    thriftWriter.open();
-    thriftWriter.write(thriftTableInfo);
-    thriftWriter.close();
-    return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
-  }
-
-  private static void writeDictionary(String factFilePath, CarbonTable table) throws Exception {
-    BufferedReader reader = new BufferedReader(new FileReader(factFilePath));
-    String header = reader.readLine();
-    String[] split = header.split(",");
-    List<CarbonColumn> allCols = new ArrayList<CarbonColumn>();
-    List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
-    allCols.addAll(dims);
-    List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName());
-    allCols.addAll(msrs);
-    Set<String>[] set = new HashSet[dims.size()];
-    for (int i = 0; i < set.length; i++) {
-      set[i] = new HashSet<String>();
-    }
-    String line = reader.readLine();
-    while (line != null) {
-      String[] data = line.split(",");
-      for (int i = 0; i < set.length; i++) {
-        set[i].add(data[i]);
-      }
-      line = reader.readLine();
-    }
-
-    Cache dictCache = CacheProvider.getInstance()
-        .createCache(CacheType.REVERSE_DICTIONARY);
-    for (int i = 0; i < set.length; i++) {
-      ColumnIdentifier columnIdentifier = new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
-      DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
-          new DictionaryColumnUniqueIdentifier(table.getAbsoluteTableIdentifier(), columnIdentifier,
-              columnIdentifier.getDataType());
-      CarbonDictionaryWriter writer =
-          new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
-      for (String value : set[i]) {
-        writer.write(value);
-      }
-      writer.close();
-      writer.commit();
-      Dictionary dict = (Dictionary) dictCache.get(
-          new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
-        		  columnIdentifier, dims.get(i).getDataType()));
-      CarbonDictionarySortInfoPreparator preparator =
-          new CarbonDictionarySortInfoPreparator();
-      List<String> newDistinctValues = new ArrayList<String>();
-      CarbonDictionarySortInfo dictionarySortInfo =
-          preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType());
-      CarbonDictionarySortIndexWriter carbonDictionaryWriter =
-          new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier);
-      try {
-        carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex());
-        carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted());
-      } finally {
-        carbonDictionaryWriter.close();
-      }
-    }
-    reader.close();
-  }
-
-  /**
-   * Execute graph which will further load data
-   *
-   * @param loadModel
-   * @param storeLocation
-   * @throws Exception
-   */
-  public static void loadData(CarbonLoadModel loadModel, String storeLocation)
-      throws Exception {
-    new File(storeLocation).mkdirs();
-    String outPutLoc = storeLocation + "/etl";
-    String databaseName = loadModel.getDatabaseName();
-    String tableName = loadModel.getTableName();
-    String tempLocationKey = databaseName + '_' + tableName + "_1";
-    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation + "/" + databaseName + "/" + tableName);
-    CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
-    CarbonProperties.getInstance().addProperty("send.signal.load", "false");
-    CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true");
-    CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1");
-    CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true");
-    CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true");
-    CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true");
-    CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false");
-    CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000");
-
-    String graphPath =
-        outPutLoc + File.separator + loadModel.getDatabaseName() + File.separator + tableName
-            + File.separator + 0 + File.separator + 1 + File.separator + tableName + ".ktr";
-    File path = new File(graphPath);
-    if (path.exists()) {
-      path.delete();
-    }
-
-    BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
-        0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
-    Configuration configuration = new Configuration();
-    CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar());
-    CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter());
-    CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar());
-    CSVInputFormat.setHeaderExtractionEnabled(configuration, true);
-    CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar());
-    CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
-            CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
-    CSVInputFormat.setNumberOfColumns(configuration, String.valueOf(loadModel.getCsvHeaderColumns().length));
-    CSVInputFormat.setMaxColumns(configuration, "10");
-
-    TaskAttemptContextImpl hadoopAttemptContext = new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
-    CSVInputFormat format = new CSVInputFormat();
-
-    RecordReader<NullWritable, StringArrayWritable> recordReader =
-        format.createRecordReader(blockDetails, hadoopAttemptContext);
-
-    CSVRecordReaderIterator readerIterator = new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext);
-    new DataLoadExecutor().execute(loadModel,
-        new String[] {storeLocation + "/" + databaseName + "/" + tableName},
-        new CarbonIterator[]{readerIterator});
-
-    writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
-        new ArrayList<LoadMetadataDetails>());
-  }
-
-  public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
-      String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
-    loadMetadataDetails.setLoadEndTime(System.currentTimeMillis());
-    loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
-    loadMetadataDetails.setLoadName(String.valueOf(0));
-    loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
-    listOfLoadFolderDetails.add(loadMetadataDetails);
-
-    String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator
-        + CarbonTablePath.TABLE_STATUS_FILE;
-
-    DataOutputStream dataOutputStream;
-    Gson gsonObjectToWrite = new Gson();
-    BufferedWriter brWriter = null;
-
-    AtomicFileOperations writeOperation =
-        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
-
-    try {
-
-      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
-      brWriter.write(metadataInstance);
-    } finally {
-      try {
-        if (null != brWriter) {
-          brWriter.flush();
-        }
-      } catch (Exception e) {
-        throw e;
-
-      }
-      CarbonUtil.closeStreams(brWriter);
-
-    }
-    writeOperation.close();
-
-  }
-
-  public static String readCurrentTime() {
-    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
-    String date = null;
-
-    date = sdf.format(new Date());
-
-    return date;
-  }
-
-  public static void main(String[] args) throws Exception {
-    StoreCreator.createCarbonStore();
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 16f327d..f011a75 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -36,7 +36,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-hadoop</artifactId>
+      <artifactId>carbondata-streaming</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
index 6e9e0a6..f6dc65b 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
@@ -20,10 +20,19 @@ package org.apache.carbondata.spark.util;
 import java.io.Serializable;
 import java.math.BigDecimal;
 
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.DataTypeConverter;
 
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -90,4 +99,76 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
   public Object wrapWithGenericRow(Object[] fields) {
     return new GenericInternalRow(fields);
   }
+
+  private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
+      DataType carbonDataType) {
+    if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
+      return DataTypes.StringType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
+      return DataTypes.ShortType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
+      return DataTypes.IntegerType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+      return DataTypes.LongType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
+      return DataTypes.DoubleType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
+      return DataTypes.BooleanType;
+    } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
+      return DataTypes.createDecimalType();
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
+      return DataTypes.TimestampType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
+      return DataTypes.DateType;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * convert from CarbonColumn array to Spark's StructField array
+   */
+  @Override
+  public Object[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) {
+    StructField[] fields = new StructField[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      CarbonColumn carbonColumn = carbonColumns[i];
+      if (carbonColumn.isDimension()) {
+        if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+              .getDirectDictionaryGenerator(carbonColumn.getDataType());
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(generator.getReturnType()), true, null);
+        } else if (!carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+        } else if (carbonColumn.isComplex()) {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+        } else {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(
+                  org.apache.carbondata.core.metadata.datatype.DataTypes.INT), true, null);
+        }
+      } else if (carbonColumn.isMeasure()) {
+        DataType dataType = carbonColumn.getDataType();
+        if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(dataType), true, null);
+        } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
+          CarbonMeasure measure = (CarbonMeasure) carbonColumn;
+          fields[i] = new StructField(carbonColumn.getColName(),
+              new DecimalType(measure.getPrecision(), measure.getScale()), true, null);
+        } else {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(
+                  org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE), true, null);
+        }
+      }
+    }
+    return fields;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 2be0efc..7e549a6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -53,10 +53,11 @@ import org.apache.carbondata.core.statusmanager.FileFormat
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableInputFormat}
-import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
 import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
+import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
 
 /**
  * This RDD is used to perform query on CarbonData file. Before sending tasks to scan