You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/17 05:31:11 UTC

carbondata git commit: [CARBONDATA-1855][PARTITION] Added outputformat to carbon

Repository: carbondata
Updated Branches:
  refs/heads/master 9f332be24 -> 91e6f6f43


[CARBONDATA-1855][PARTITION] Added outputformat to carbon

Support standard Hadoop outputformat interface for carbon. This PR supports table level output format: CarbonTableOutputFormat, It will be helpful for integrations to execution engines like the spark, hive, and presto.
It should maintain segment management as well while writing the data to support incremental loading feature.

This closes #1642


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/91e6f6f4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/91e6f6f4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/91e6f6f4

Branch: refs/heads/master
Commit: 91e6f6f43b915b7ddc52f36233526aa2f5a77782
Parents: 9f332be
Author: ravipesala <ra...@gmail.com>
Authored: Mon Dec 4 16:07:03 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sun Dec 17 13:30:55 2017 +0800

----------------------------------------------------------------------
 .../core/metadata/datatype/StructType.java      |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  15 -
 .../hadoop/api/CarbonOutputCommitter.java       |  94 +++++
 .../hadoop/api/CarbonTableOutputFormat.java     | 349 ++++++++++++++++++-
 .../hadoop/ft/CarbonOutputMapperTest.java       | 114 ++++++
 .../hadoop/test/util/StoreCreator.java          |  32 +-
 .../carbondata/spark/util/CommonUtil.scala      |  45 ---
 .../spark/rdd/CarbonDataRDDFactory.scala        |   8 +-
 .../management/CarbonLoadDataCommand.scala      |   7 +-
 .../iterator/CarbonOutputIteratorWrapper.java   | 128 +++++++
 .../loading/model/CarbonLoadModel.java          |   9 +
 .../processing/merger/CarbonDataMergerUtil.java |   4 +-
 .../processing/util/CarbonLoaderUtil.java       |  68 +++-
 .../carbondata/streaming/StreamHandoffRDD.scala |   2 +-
 14 files changed, 772 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
index 6417f37..97cc4f0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.metadata.datatype;
 
 import java.util.List;
 
-class StructType extends DataType {
+public class StructType extends DataType {
 
   private List<StructField> fields;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 148098d..910efea 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2156,21 +2156,6 @@ public final class CarbonUtil {
     return parentPath.toString() + CarbonCommonConstants.FILE_SEPARATOR + newTableName;
   }
 
-  /*
-   * This method will add data size and index size into tablestatus for each segment
-   */
-  public static void addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails,
-      String segmentId, CarbonTable carbonTable) throws IOException {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
-    Map<String, Long> dataIndexSize =
-        CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId);
-    loadMetadataDetails
-        .setDataSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString());
-    loadMetadataDetails
-        .setIndexSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString());
-  }
-
   /**
    * This method will calculate the data size and index size for carbon table
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
new file mode 100644
index 0000000..9bcb2be
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+/**
+ * Outputcommitter which manages the segments during loading.It commits segment information to the
+ * tablestatus file upon success or fail.
+ */
+public class CarbonOutputCommitter extends FileOutputCommitter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonOutputCommitter.class.getName());
+
+  public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+  }
+
+  /**
+   * Update the tablestatus with inprogress while setup the job.
+   *
+   * @param context
+   * @throws IOException
+   */
+  @Override public void setupJob(JobContext context) throws IOException {
+    super.setupJob(context);
+    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
+    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
+    CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet);
+    CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel);
+  }
+
+  /**
+   * Update the tablestatus as success after job is success
+   *
+   * @param context
+   * @throws IOException
+   */
+  @Override public void commitJob(JobContext context) throws IOException {
+    super.commitJob(context);
+    boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
+    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
+    LoadMetadataDetails newMetaEntry = loadModel.getCurrentLoadMetadataDetail();
+    CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS,
+        loadModel.getFactTimeStamp(), true);
+    CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(),
+        loadModel.getCarbonDataLoadSchema().getCarbonTable());
+    CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet);
+  }
+
+  /**
+   * Update the tablestatus as fail if any fail happens.
+   *
+   * @param context
+   * @param state
+   * @throws IOException
+   */
+  @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException {
+    super.abortJob(context, state);
+    CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
+    CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
+    LOGGER.error("Loading failed with job status : " + state);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 7c9b3ed..9504502 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -18,22 +18,349 @@
 package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
+import java.util.List;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.util.Progressable;
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.datatype.StructType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
+import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
+import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 /**
- * Base class for all output format for CarbonData file.
- * @param <T>
+ * This is table level output format which writes the data to store in new segment. Each load
+ * creates new segment folder and manages the folder through tablestatus file.
+ * It also generate and writes dictionary data during load only if dictionary server is configured.
  */
-public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> {
+// TODO Move dictionary generater which is coded in spark to MR framework.
+public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> {
 
-  @Override
-  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name,
-      Progressable progress) throws IOException {
+  private static final String LOAD_MODEL = "mapreduce.carbontable.load.model";
+  private static final String DATABASE_NAME = "mapreduce.carbontable.databaseName";
+  private static final String TABLE_NAME = "mapreduce.carbontable.tableName";
+  private static final String TABLE = "mapreduce.carbontable.table";
+  private static final String TABLE_PATH = "mapreduce.carbontable.tablepath";
+  private static final String INPUT_SCHEMA = "mapreduce.carbontable.inputschema";
+  private static final String TEMP_STORE_LOCATIONS = "mapreduce.carbontable.tempstore.locations";
+  private static final String OVERWRITE_SET = "mapreduce.carbontable.set.overwrite";
+  public static final String COMPLEX_DELIMITERS = "mapreduce.carbontable.complex_delimiters";
+  public static final String SERIALIZATION_NULL_FORMAT =
+      "mapreduce.carbontable.serialization.null.format";
+  public static final String BAD_RECORDS_LOGGER_ENABLE =
+      "mapreduce.carbontable.bad.records.logger.enable";
+  public static final String BAD_RECORDS_LOGGER_ACTION =
+      "mapreduce.carbontable.bad.records.logger.action";
+  public static final String IS_EMPTY_DATA_BAD_RECORD =
+      "mapreduce.carbontable.empty.data.bad.record";
+  public static final String SKIP_EMPTY_LINE = "mapreduce.carbontable.skip.empty.line";
+  public static final String SORT_SCOPE = "mapreduce.carbontable.load.sort.scope";
+  public static final String BATCH_SORT_SIZE_INMB =
+      "mapreduce.carbontable.batch.sort.size.inmb";
+  public static final String GLOBAL_SORT_PARTITIONS =
+      "mapreduce.carbontable.global.sort.partitions";
+  public static final String BAD_RECORD_PATH = "mapreduce.carbontable.bad.record.path";
+  public static final String DATE_FORMAT = "mapreduce.carbontable.date.format";
+  public static final String TIMESTAMP_FORMAT = "mapreduce.carbontable.timestamp.format";
+  public static final String IS_ONE_PASS_LOAD = "mapreduce.carbontable.one.pass.load";
+  public static final String DICTIONARY_SERVER_HOST =
+      "mapreduce.carbontable.dict.server.host";
+  public static final String DICTIONARY_SERVER_PORT =
+      "mapreduce.carbontable.dict.server.port";
+
+  private CarbonOutputCommitter committer;
+
+  public static void setDatabaseName(Configuration configuration, String databaseName) {
+    if (null != databaseName) {
+      configuration.set(DATABASE_NAME, databaseName);
+    }
+  }
+
+  public static String getDatabaseName(Configuration configuration) {
+    return configuration.get(DATABASE_NAME);
+  }
+
+  public static void setTableName(Configuration configuration, String tableName) {
+    if (null != tableName) {
+      configuration.set(TABLE_NAME, tableName);
+    }
+  }
+
+  public static String getTableName(Configuration configuration) {
+    return configuration.get(TABLE_NAME);
+  }
+
+  public static void setTablePath(Configuration configuration, String tablePath) {
+    if (null != tablePath) {
+      configuration.set(TABLE_PATH, tablePath);
+    }
+  }
+
+  public static String getTablePath(Configuration configuration) {
+    return configuration.get(TABLE_PATH);
+  }
+
+  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+      throws IOException {
+    if (carbonTable != null) {
+      configuration.set(TABLE,
+          ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize()));
+    }
+  }
+
+  public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
+    CarbonTable carbonTable = null;
+    String encodedString = configuration.get(TABLE);
+    if (encodedString != null) {
+      byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString);
+      TableInfo tableInfo = TableInfo.deserialize(bytes);
+      carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+    }
+    return carbonTable;
+  }
+
+  public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel)
+      throws IOException {
+    if (loadModel != null) {
+      configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel));
+    }
+  }
+
+  public static void setInputSchema(Configuration configuration, StructType inputSchema)
+      throws IOException {
+    if (inputSchema != null && inputSchema.getFields().size() > 0) {
+      configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema));
+    } else {
+      throw new UnsupportedOperationException("Input schema must be set");
+    }
+  }
+
+  private static StructType getInputSchema(Configuration configuration) throws IOException {
+    String encodedString = configuration.get(INPUT_SCHEMA);
+    if (encodedString != null) {
+      return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString);
+    }
     return null;
   }
+
+  public static boolean isOverwriteSet(Configuration configuration) {
+    String overwrite = configuration.get(OVERWRITE_SET);
+    if (overwrite != null) {
+      return Boolean.parseBoolean(overwrite);
+    }
+    return false;
+  }
+
+  public static void setOverwrite(Configuration configuration, boolean overwrite) {
+    configuration.set(OVERWRITE_SET, String.valueOf(overwrite));
+  }
+
+  public static void setTempStoreLocations(Configuration configuration, String[] tempLocations)
+      throws IOException {
+    if (tempLocations != null && tempLocations.length > 0) {
+      configuration
+          .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations));
+    }
+  }
+
+  private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext)
+      throws IOException {
+    String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS);
+    if (encodedString != null) {
+      return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString);
+    }
+    return new String[] {
+        System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext
+            .getTaskAttemptID().toString() };
+  }
+
+  @Override
+  public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
+      throws IOException {
+    if (this.committer == null) {
+      Path output = getOutputPath(context);
+      this.committer = new CarbonOutputCommitter(output, context);
+    }
+    return this.committer;
+  }
+
+  @Override
+  public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter(
+      TaskAttemptContext taskAttemptContext) throws IOException {
+    final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
+    loadModel.setTaskNo(taskAttemptContext.getTaskAttemptID().getTaskID().getId() + "");
+    final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext);
+    final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper();
+    final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
+    CarbonRecordWriter recordWriter = new CarbonRecordWriter(iteratorWrapper, dataLoadExecutor);
+    new Thread() {
+      @Override public void run() {
+        try {
+          dataLoadExecutor.execute(
+              loadModel,
+              tempStoreLocations,
+              new CarbonIterator[] { iteratorWrapper });
+        } catch (Exception e) {
+          dataLoadExecutor.close();
+          throw new RuntimeException(e);
+        }
+      }
+    }.start();
+
+    return recordWriter;
+  }
+
+  public static CarbonLoadModel getLoadModel(Configuration conf) throws IOException {
+    CarbonLoadModel model;
+    String encodedString = conf.get(LOAD_MODEL);
+    if (encodedString != null) {
+      model = (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(encodedString);
+      return model;
+    }
+    model = new CarbonLoadModel();
+    CarbonProperties carbonProperty = CarbonProperties.getInstance();
+    model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf));
+    model.setTableName(CarbonTableOutputFormat.getTableName(conf));
+    model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getCarbonTable(conf)));
+    model.setTablePath(getTablePath(conf));
+
+    setFileHeader(conf, model);
+    model.setSerializationNullFormat(conf.get(SERIALIZATION_NULL_FORMAT, "\\N"));
+    model.setBadRecordsLoggerEnable(
+        conf.get(
+            BAD_RECORDS_LOGGER_ENABLE,
+            carbonProperty.getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)));
+    model.setBadRecordsAction(
+        conf.get(
+            BAD_RECORDS_LOGGER_ACTION,
+            carbonProperty.getProperty(
+                CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+                CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)));
+
+    model.setIsEmptyDataBadRecord(
+        conf.get(
+            IS_EMPTY_DATA_BAD_RECORD,
+            carbonProperty.getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)));
+
+    model.setSkipEmptyLine(
+        conf.get(
+            SKIP_EMPTY_LINE,
+            carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)));
+
+    String complexDelim = conf.get(COMPLEX_DELIMITERS, "\\$" + "," + "\\:");
+    String[] split = complexDelim.split(",");
+    model.setComplexDelimiterLevel1(split[0]);
+    if (split.length > 1) {
+      model.setComplexDelimiterLevel1(split[1]);
+    }
+    model.setDateFormat(
+        conf.get(
+            DATE_FORMAT,
+            carbonProperty.getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)));
+
+    model.setTimestampformat(
+        conf.get(
+            TIMESTAMP_FORMAT,
+            carbonProperty.getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)));
+
+    model.setGlobalSortPartitions(
+        conf.get(
+            GLOBAL_SORT_PARTITIONS,
+            carbonProperty.getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+                null)));
+
+    model.setBatchSortSizeInMb(
+        conf.get(
+            BATCH_SORT_SIZE_INMB,
+            carbonProperty.getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+                carbonProperty.getProperty(
+                    CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+                    CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))));
+
+    model.setBadRecordsLocation(
+        conf.get(BAD_RECORD_PATH,
+            carbonProperty.getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+                carbonProperty.getProperty(
+                    CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+                    CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))));
+
+    model.setUseOnePass(
+        conf.getBoolean(IS_ONE_PASS_LOAD,
+            Boolean.parseBoolean(
+                carbonProperty.getProperty(
+                    CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+                    CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT))));
+    return model;
+  }
+
+  private static void setFileHeader(Configuration configuration, CarbonLoadModel model)
+      throws IOException {
+    StructType inputSchema = getInputSchema(configuration);
+    if (inputSchema == null || inputSchema.getFields().size() == 0) {
+      throw new UnsupportedOperationException("Input schema must be set");
+    }
+    List<StructField> fields = inputSchema.getFields();
+    StringBuilder builder = new StringBuilder();
+    String[] columns = new String[fields.size()];
+    int i = 0;
+    for (StructField field : fields) {
+      builder.append(field.getFieldName());
+      builder.append(",");
+      columns[i++] = field.getFieldName();
+    }
+    String header = builder.toString();
+    model.setCsvHeader(header.substring(0, header.length() - 1));
+    model.setCsvHeaderColumns(columns);
+  }
+
+  private static class CarbonRecordWriter extends RecordWriter<NullWritable, StringArrayWritable> {
+
+    private CarbonOutputIteratorWrapper iteratorWrapper;
+
+    private DataLoadExecutor dataLoadExecutor;
+
+    public CarbonRecordWriter(CarbonOutputIteratorWrapper iteratorWrapper,
+        DataLoadExecutor dataLoadExecutor) {
+      this.iteratorWrapper = iteratorWrapper;
+      this.dataLoadExecutor = dataLoadExecutor;
+    }
+
+    @Override
+    public void write(NullWritable aVoid, StringArrayWritable strings)
+        throws InterruptedException {
+      iteratorWrapper.write(strings.get());
+    }
+
+    @Override
+    public void close(TaskAttemptContext taskAttemptContext) {
+      iteratorWrapper.close();
+      dataLoadExecutor.close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java
new file mode 100644
index 0000000..006ffd2
--- /dev/null
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.ft;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hadoop.test.util.StoreCreator;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+public class CarbonOutputMapperTest extends TestCase {
+
+  CarbonLoadModel carbonLoadModel;
+
+  // changed setUp to static init block to avoid un wanted multiple time store creation
+  static {
+    CarbonProperties.getInstance().
+        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
+  }
+
+
+  @Test public void testOutputFormat() throws Exception {
+    runJob("");
+    String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0");
+    File file = new File(segmentPath);
+    assert (file.exists());
+    File[] listFiles = file.listFiles(new FilenameFilter() {
+      @Override public boolean accept(File dir, String name) {
+        return name.endsWith(".carbondata") || name.endsWith(".carbonindex");
+      }
+    });
+
+    assert (listFiles.length == 2);
+
+  }
+
+
+  @Override public void tearDown() throws Exception {
+    super.tearDown();
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true");
+  }
+
+  @Override public void setUp() throws Exception {
+    super.setUp();
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false");
+    carbonLoadModel = StoreCreator.getCarbonLoadModel();
+  }
+
+ public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable, StringArrayWritable> {
+
+   @Override protected void map(NullWritable key, StringArrayWritable value, Context context)
+       throws IOException, InterruptedException {
+     context.write(key, value);
+   }
+ }
+
+  private void runJob(String outPath) throws Exception {
+    Configuration configuration = new Configuration();
+    configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath());
+    Job job = Job.getInstance(configuration);
+    job.setJarByClass(CarbonOutputMapperTest.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(StringArrayWritable.class);
+    job.setMapperClass(Map.class);
+    job.setNumReduceTasks(0);
+
+    FileInputFormat.addInputPath(job, new Path(carbonLoadModel.getFactFilePath()));
+    CarbonTableOutputFormat.setLoadModel(job.getConfiguration(), carbonLoadModel);
+    CarbonTableOutputFormat.setCarbonTable(job.getConfiguration(), carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable());
+    CSVInputFormat.setHeaderExtractionEnabled(job.getConfiguration(), true);
+    job.setInputFormatClass(CSVInputFormat.class);
+    job.setOutputFormatClass(CarbonTableOutputFormat.class);
+    CarbonUtil.deleteFoldersAndFiles(new File(carbonLoadModel.getTablePath() + "1"));
+    FileOutputFormat.setOutputPath(job, new Path(carbonLoadModel.getTablePath() + "1"));
+    job.getConfiguration().set("outpath", outPath);
+    job.getConfiguration().set("query.id", String.valueOf(System.nanoTime()));
+    job.waitForCompletion(true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 531bed5..d3fd087 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -164,17 +164,7 @@ public class StoreCreator {
    */
   public static void createCarbonStore() {
     try {
-      String factFilePath =
-          new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
-      File storeDir = new File(storePath);
-      CarbonUtil.deleteFoldersAndFiles(storeDir);
-      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
-          storePath);
-
-      CarbonTable table = createTable(absoluteTableIdentifier);
-      writeDictionary(factFilePath, table);
-      CarbonLoadModel loadModel =
-          buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier);
+      CarbonLoadModel loadModel = getCarbonLoadModel();
 
       executeGraph(loadModel, storePath);
 
@@ -183,6 +173,19 @@ public class StoreCreator {
     }
   }
 
+  public static CarbonLoadModel getCarbonLoadModel() throws Exception {
+    String factFilePath =
+        new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
+    File storeDir = new File(storePath);
+    CarbonUtil.deleteFoldersAndFiles(storeDir);
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
+        storePath);
+
+    CarbonTable table = createTable(absoluteTableIdentifier);
+    writeDictionary(factFilePath, table);
+    return buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier);
+  }
+
   public static CarbonTable createTable(
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
     TableInfo tableInfo = new TableInfo();
@@ -198,6 +201,7 @@ public class StoreCreator {
     id.setDataType(DataTypes.INT);
     id.setEncodingList(encodings);
     id.setColumnUniqueId(UUID.randomUUID().toString());
+    id.setColumnReferenceId(id.getColumnUniqueId());
     id.setDimensionColumn(true);
     id.setColumnGroup(1);
     columnSchemas.add(id);
@@ -211,6 +215,7 @@ public class StoreCreator {
     date.setDimensionColumn(true);
     date.setColumnGroup(2);
     date.setSortColumn(true);
+    date.setColumnReferenceId(id.getColumnUniqueId());
     columnSchemas.add(date);
 
     ColumnSchema country = new ColumnSchema();
@@ -222,6 +227,7 @@ public class StoreCreator {
     country.setDimensionColumn(true);
     country.setColumnGroup(3);
     country.setSortColumn(true);
+    country.setColumnReferenceId(id.getColumnUniqueId());
     columnSchemas.add(country);
 
     ColumnSchema name = new ColumnSchema();
@@ -233,6 +239,7 @@ public class StoreCreator {
     name.setDimensionColumn(true);
     name.setColumnGroup(4);
     name.setSortColumn(true);
+    name.setColumnReferenceId(id.getColumnUniqueId());
     columnSchemas.add(name);
 
     ColumnSchema phonetype = new ColumnSchema();
@@ -244,6 +251,7 @@ public class StoreCreator {
     phonetype.setDimensionColumn(true);
     phonetype.setColumnGroup(5);
     phonetype.setSortColumn(true);
+    phonetype.setColumnReferenceId(id.getColumnUniqueId());
     columnSchemas.add(phonetype);
 
     ColumnSchema serialname = new ColumnSchema();
@@ -255,6 +263,7 @@ public class StoreCreator {
     serialname.setDimensionColumn(true);
     serialname.setColumnGroup(6);
     serialname.setSortColumn(true);
+    serialname.setColumnReferenceId(id.getColumnUniqueId());
     columnSchemas.add(serialname);
 
     ColumnSchema salary = new ColumnSchema();
@@ -264,6 +273,7 @@ public class StoreCreator {
     salary.setEncodingList(new ArrayList<Encoding>());
     salary.setColumnUniqueId(UUID.randomUUID().toString());
     salary.setDimensionColumn(false);
+    salary.setColumnReferenceId(id.getColumnUniqueId());
     salary.setColumnGroup(7);
     columnSchemas.add(salary);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 943c0a5..ab47532 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -514,51 +514,6 @@ object CommonUtil {
     parsedPropertyValueString
   }
 
-  def readAndUpdateLoadProgressInTableMeta(model: CarbonLoadModel,
-      insertOverwrite: Boolean): Unit = {
-    val newLoadMetaEntry = new LoadMetadataDetails
-    val status = if (insertOverwrite) {
-      SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
-    } else {
-      SegmentStatus.INSERT_IN_PROGRESS
-    }
-
-    // reading the start time of data load.
-    val loadStartTime = CarbonUpdateUtil.readCurrentTime
-    model.setFactTimeStamp(loadStartTime)
-    CarbonLoaderUtil.populateNewLoadMetaEntry(
-      newLoadMetaEntry, status, model.getFactTimeStamp, false)
-    val entryAdded: Boolean =
-      CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite)
-    if (!entryAdded) {
-      sys.error(s"Failed to add entry in table status for " +
-                s"${ model.getDatabaseName }.${model.getTableName}")
-    }
-  }
-
-  /**
-   * This method will update the load failure entry in the table status file
-   *
-   * @param model
-   */
-  def updateTableStatusForFailure(
-      model: CarbonLoadModel): Unit = {
-    // in case if failure the load status should be "Marked for delete" so that it will be taken
-    // care during clean up
-    val loadStatus = SegmentStatus.MARKED_FOR_DELETE
-    // always the last entry in the load metadata details will be the current load entry
-    val loadMetaEntry = model.getLoadMetadataDetails.get(model.getLoadMetadataDetails.size - 1)
-    CarbonLoaderUtil
-      .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp, true)
-    val updationStatus = CarbonLoaderUtil.recordNewLoadMetadata(loadMetaEntry, model, false, false)
-    if (!updationStatus) {
-      sys
-        .error(s"Failed to update failure entry in table status for ${
-          model
-            .getDatabaseName
-        }.${ model.getTableName }")
-    }
-  }
 
   def readLoadMetadataDetails(model: CarbonLoadModel): Unit = {
     val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 4933a45..d6360a7 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -473,7 +473,7 @@ object CarbonDataRDDFactory {
     }
     if (loadStatus == SegmentStatus.LOAD_FAILURE) {
       // update the load entry in table status file for changing the status to marked for delete
-      CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+      CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
       LOGGER.info("********starting clean up**********")
       CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
       LOGGER.info("********clean up done**********")
@@ -488,7 +488,7 @@ object CarbonDataRDDFactory {
           status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
           carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
         // update the load entry in table status file for changing the status to marked for delete
-        CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+        CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
         LOGGER.info("********starting clean up**********")
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
         LOGGER.info("********clean up done**********")
@@ -527,7 +527,7 @@ object CarbonDataRDDFactory {
           newEntryLoadStatus,
           overwriteTable)
       if (!done) {
-        CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+        CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
         LOGGER.info("********starting clean up**********")
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
         LOGGER.info("********clean up done**********")
@@ -800,7 +800,7 @@ object CarbonDataRDDFactory {
       newEntryLoadStatus,
       carbonLoadModel.getFactTimeStamp,
       true)
-    CarbonUtil
+    CarbonLoaderUtil
       .addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable)
     val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
       overwriteTable)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 0c6aeda..58671b7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -46,6 +46,7 @@ import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
@@ -153,7 +154,7 @@ case class CarbonLoadDataCommand(
         GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
         // add the start entry for the new load in the table status file
         if (updateModel.isEmpty) {
-          CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
+          CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
         }
         if (isOverwriteTable) {
           LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
@@ -197,12 +198,12 @@ case class CarbonLoadDataCommand(
       } catch {
         case CausedBy(ex: NoRetryException) =>
           // update the load entry in table status file for changing the status to marked for delete
-          CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
           LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
           throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
         case ex: Exception =>
           // update the load entry in table status file for changing the status to marked for delete
-          CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
           LOGGER.error(ex)
           LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
           throw ex

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
new file mode 100644
index 0000000..abe90f1
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.iterator;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.carbondata.common.CarbonIterator;
+
+/**
+ * It is wrapper class to hold the rows in batches when record writer writes the data and allows
+ * to iterate on it during data load. It uses blocking queue to coordinate between read and write.
+ */
+public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
+
+  private boolean close = false;
+
+  /**
+   * Number of rows kept in memory at most will be batchSize * queue size
+   */
+  private int batchSize = 1000;
+
+  private RowBatch loadBatch = new RowBatch(batchSize);
+
+  private RowBatch readBatch;
+
+  private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
+
+  public void write(String[] row) throws InterruptedException {
+    if (!loadBatch.addRow(row)) {
+      loadBatch.readyRead();
+      queue.put(loadBatch);
+      loadBatch = new RowBatch(batchSize);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return !queue.isEmpty() || !close || readBatch != null && readBatch.hasNext();
+  }
+
+  @Override
+  public String[] next() {
+    if (readBatch == null || !readBatch.hasNext()) {
+      try {
+        readBatch = queue.take();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return readBatch.next();
+  }
+
+  @Override
+  public void close() {
+    if (loadBatch.isLoading()) {
+      try {
+        loadBatch.readyRead();
+        queue.put(loadBatch);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    close = true;
+  }
+
+  private static class RowBatch extends CarbonIterator<String[]> {
+
+    private int counter;
+
+    private String[][] batch;
+
+    private int size;
+
+    private boolean isLoading = true;
+
+    private RowBatch(int size) {
+      batch = new String[size][];
+      this.size = size;
+    }
+
+    /**
+     * Add row to the batch, it can hold rows till the batch size.
+     * @param row
+     * @return false if the row cannot be added as batch is full.
+     */
+    public boolean addRow(String[] row) {
+      batch[counter++] = row;
+      return counter < size;
+    }
+
+    public void readyRead() {
+      size = counter;
+      counter = 0;
+      isLoading = false;
+    }
+
+    public boolean isLoading() {
+      return isLoading;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return counter < size;
+    }
+
+    @Override
+    public String[] next() {
+      assert (counter < size);
+      return batch[counter++];
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index c85d50f..7b952e3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -525,6 +525,15 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
+   * Get the current load metadata.
+   *
+   * @return
+   */
+  public LoadMetadataDetails getCurrentLoadMetadataDetail() {
+    return loadMetadataDetails.get(loadMetadataDetails.size() - 1);
+  }
+
+  /**
    * setLoadMetadataDetails.
    *
    * @param loadMetadataDetails

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index d6f2d9a..3729b1d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -50,11 +50,11 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
 
 /**
  * utility class for load merging.
@@ -327,7 +327,7 @@ public final class CarbonDataMergerUtil {
         loadMetadataDetails.setLoadEndTime(loadEnddate);
         CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
         loadMetadataDetails.setLoadName(mergedLoadNumber);
-        CarbonUtil
+        CarbonLoaderUtil
             .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, mergedLoadNumber, carbonTable);
         loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
         loadMetadataDetails.setPartitionCount("0");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index b0c690b..10fdd31 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -24,17 +24,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -158,7 +148,7 @@ public final class CarbonLoaderUtil {
    */
   public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
       CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
-      throws IOException, InterruptedException {
+      throws IOException {
     boolean status = false;
     AbsoluteTableIdentifier absoluteTableIdentifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
@@ -373,6 +363,46 @@ public final class CarbonLoaderUtil {
   }
 
 
+  public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model,
+      boolean insertOverwrite) throws IOException {
+    LoadMetadataDetails newLoadMetaEntry = new LoadMetadataDetails();
+    SegmentStatus status = SegmentStatus.INSERT_IN_PROGRESS;
+    if (insertOverwrite) {
+      status = SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS;
+    }
+
+    // reading the start time of data load.
+    long loadStartTime = CarbonUpdateUtil.readCurrentTime();
+    model.setFactTimeStamp(loadStartTime);
+    CarbonLoaderUtil
+        .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false);
+    boolean entryAdded =
+        CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite);
+    if (!entryAdded) {
+      throw new IOException("Failed to add entry in table status for " + model.getTableName());
+    }
+  }
+
+  /**
+   * This method will update the load failure entry in the table status file
+   */
+  public static void updateTableStatusForFailure(CarbonLoadModel model)
+      throws IOException {
+    // in case if failure the load status should be "Marked for delete" so that it will be taken
+    // care during clean up
+    SegmentStatus loadStatus = SegmentStatus.MARKED_FOR_DELETE;
+    // always the last entry in the load metadata details will be the current load entry
+    LoadMetadataDetails loadMetaEntry = model.getCurrentLoadMetadataDetail();
+    CarbonLoaderUtil
+        .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp(), true);
+    boolean entryAdded =
+        CarbonLoaderUtil.recordNewLoadMetadata(loadMetaEntry, model, false, false);
+    if (!entryAdded) {
+      throw new IOException(
+          "Failed to update failure entry in table status for " + model.getTableName());
+    }
+  }
+
   public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier)
       throws IOException {
     Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
@@ -827,4 +857,18 @@ public final class CarbonLoaderUtil {
     return newListMetadata;
   }
 
+  /*
+   * This method will add data size and index size into tablestatus for each segment
+   */
+  public static void addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails,
+      String segmentId, CarbonTable carbonTable) throws IOException {
+    CarbonTablePath carbonTablePath =
+        CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
+    Map<String, Long> dataIndexSize =
+        CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId);
+    loadMetadataDetails
+        .setDataSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString());
+    loadMetadataDetails
+        .setIndexSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString());
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/91e6f6f4/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 8c4d5ba..37aaea5 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -315,7 +315,7 @@ object StreamHandoffRDD {
     }
 
     if (loadStatus == SegmentStatus.LOAD_FAILURE) {
-      CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+      CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
       LOGGER.info("********starting clean up**********")
       CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
       LOGGER.info("********clean up done**********")