You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/08/26 17:36:12 UTC

carbondata git commit: [CARBONDATA-2874] Support SDK writer as thread safe api

Repository: carbondata
Updated Branches:
  refs/heads/master 347b8e1db -> 17a4b485c


[CARBONDATA-2874] Support SDK writer as thread safe api

Problem: Currently CarbonWriter.write() not a thread safe. if multiple threads calls .write() for one writer.
Data count inconsistency is observed.

root casue: As all the threads are writing to same batch of blocking queue. need to synchronize this. Else one thread data overwrite the other thread data.

Solution:
a) DataLoadExecutor is using only one iterator, take number of threads as input and internally create that many iterator to loop over the data. This will reduce the blocking time of queue as each iterator has its own queue.
b) InputProcessor step is taking only default 2 cores (2 thread) for data load in SDK flow, can use the same number as number of threads created by user.
c) writer step is using only 2 cores (2 thread). can use the same number as number of threads created by user.

This closes #2653


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/17a4b485
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/17a4b485
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/17a4b485

Branch: refs/heads/master
Commit: 17a4b485cd4e87aaeb0b39a02939da0aa719b210
Parents: 347b8e1
Author: ajantha-bhat <aj...@gmail.com>
Authored: Tue Aug 21 10:52:35 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Aug 26 23:05:59 2018 +0530

----------------------------------------------------------------------
 docs/sdk-guide.md                               |  49 +++++++-
 .../hadoop/api/CarbonTableOutputFormat.java     |  64 +++++++++--
 .../loading/DataLoadProcessBuilder.java         |   3 +
 .../loading/model/CarbonLoadModel.java          |  20 +++-
 .../loading/steps/InputProcessorStepImpl.java   |   5 +-
 .../steps/JsonInputProcessorStepImpl.java       |   5 +-
 .../util/CarbonDataProcessorUtil.java           |   9 +-
 .../sdk/file/CarbonWriterBuilder.java           |  82 ++++++++++++++
 .../sdk/file/ConcurrentSdkWriterTest.java       | 111 +++++++++++++++++++
 9 files changed, 329 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/17a4b485/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 8120efa..7ed8fc2 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -369,6 +369,7 @@ public CarbonWriterBuilder withTableProperties(Map<String, String> options);
 
 ```
 /**
+* this writer is not thread safe, use buildThreadSafeWriterForCSVInput in multi thread environment
 * Build a {@link CarbonWriter}, which accepts row in CSV format object
 * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
 * @return CSVCarbonWriter
@@ -378,8 +379,24 @@ public CarbonWriterBuilder withTableProperties(Map<String, String> options);
 public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema) throws IOException, InvalidLoadOptionException;
 ```
 
+```
+/**
+* Can use this writer in multi-thread instance.
+* Build a {@link CarbonWriter}, which accepts row in CSV format
+* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
+* @param numOfThreads number of threads() in which .write will be called.              
+* @return CSVCarbonWriter
+* @throws IOException
+* @throws InvalidLoadOptionException
+*/
+public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads)
+  throws IOException, InvalidLoadOptionException;
+```
+
+
 ```  
 /**
+* this writer is not thread safe, use buildThreadSafeWriterForAvroInput in multi thread environment
 * Build a {@link CarbonWriter}, which accepts Avro format object
 * @param avroSchema avro Schema object {org.apache.avro.Schema}
 * @return AvroCarbonWriter 
@@ -391,6 +408,22 @@ public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throw
 
 ```
 /**
+* Can use this writer in multi-thread instance.
+* Build a {@link CarbonWriter}, which accepts Avro object
+* @param avroSchema avro Schema object {org.apache.avro.Schema}
+* @param numOfThreads number of threads() in which .write will be called.
+* @return AvroCarbonWriter
+* @throws IOException
+* @throws InvalidLoadOptionException
+*/
+public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short numOfThreads)
+  throws IOException, InvalidLoadOptionException
+```
+
+
+```
+/**
+* this writer is not thread safe, use buildThreadSafeWriterForJsonInput in multi thread environment
 * Build a {@link CarbonWriter}, which accepts Json object
 * @param carbonSchema carbon Schema object
 * @return JsonCarbonWriter
@@ -400,6 +433,19 @@ public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throw
 public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema);
 ```
 
+```
+/**
+* Can use this writer in multi-thread instance.
+* Build a {@link CarbonWriter}, which accepts Json object
+* @param carbonSchema carbon Schema object
+* @param numOfThreads number of threads() in which .write will be called.
+* @return JsonCarbonWriter
+* @throws IOException
+* @throws InvalidLoadOptionException
+*/
+public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads)
+```
+
 ### Class org.apache.carbondata.sdk.file.CarbonWriter
 ```
 /**
@@ -408,7 +454,7 @@ public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema);
 *                      which is one row of data.
 * If CSVCarbonWriter, object is of type String[], which is one row of data
 * If JsonCarbonWriter, object is of type String, which is one row of json
-* Note: This API is not thread safe
+* Note: This API is not thread safe if writer is not built with number of threads argument.
 * @param object
 * @throws IOException
 */
@@ -696,7 +742,6 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/
    *
    * @param dataFilePath complete path including carbondata file name
    * @return Schema object
-   * @throws IOException
    */
   public static Schema readSchemaInDataFile(String dataFilePath);
 ```

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17a4b485/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 78c670a..5938c20 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.metadata.datatype.StructField;
@@ -235,6 +234,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
       TaskAttemptContext taskAttemptContext) throws IOException {
     final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
     //if loadModel having taskNo already(like in SDK) then no need to overwrite
+    short sdkUserCore = loadModel.getSdkUserCores();
+    int itrSize = (sdkUserCore > 0) ? sdkUserCore : 1;
+    final CarbonOutputIteratorWrapper[] iterators = new CarbonOutputIteratorWrapper[itrSize];
+    for (int i = 0; i < itrSize; i++) {
+      iterators[i] = new CarbonOutputIteratorWrapper();
+    }
     if (null == loadModel.getTaskNo() || loadModel.getTaskNo().isEmpty()) {
       loadModel.setTaskNo(taskAttemptContext.getConfiguration()
           .get("carbon.outputformat.taskno", String.valueOf(System.nanoTime())));
@@ -242,7 +247,6 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     loadModel.setDataWritePath(
         taskAttemptContext.getConfiguration().get("carbon.outputformat.writepath"));
     final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext);
-    final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper();
     final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
     final ExecutorService executorService = Executors.newFixedThreadPool(1,
         new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));;
@@ -251,10 +255,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
       @Override public void run() {
         try {
           dataLoadExecutor
-              .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper });
+              .execute(loadModel, tempStoreLocations, iterators);
         } catch (Exception e) {
           executorService.shutdownNow();
-          iteratorWrapper.closeWriter(true);
+          for (CarbonOutputIteratorWrapper iterator : iterators) {
+            iterator.closeWriter(true);
+          }
           dataLoadExecutor.close();
           // clean up the folders and files created locally for data load operation
           TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
@@ -264,8 +270,14 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
       }
     });
 
-    return new CarbonRecordWriter(iteratorWrapper, dataLoadExecutor, loadModel, future,
-        executorService);
+    if (sdkUserCore > 0) {
+      // CarbonMultiRecordWriter handles the load balancing of the write rows in round robin.
+      return new CarbonMultiRecordWriter(iterators, dataLoadExecutor, loadModel, future,
+          executorService);
+    } else {
+      return new CarbonRecordWriter(iterators[0], dataLoadExecutor, loadModel, future,
+          executorService);
+    }
   }
 
   public static CarbonLoadModel getLoadModel(Configuration conf) throws IOException {
@@ -411,11 +423,15 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
 
     @Override public void write(NullWritable aVoid, ObjectArrayWritable objects)
         throws InterruptedException {
-      iteratorWrapper.write(objects.get());
+      if (iteratorWrapper != null) {
+        iteratorWrapper.write(objects.get());
+      }
     }
 
     @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException {
-      iteratorWrapper.closeWriter(false);
+      if (iteratorWrapper != null) {
+        iteratorWrapper.closeWriter(false);
+      }
       try {
         future.get();
       } catch (ExecutionException e) {
@@ -434,4 +450,36 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
       return loadModel;
     }
   }
+
+  /* CarbonMultiRecordWriter takes multiple iterators
+  and handles the load balancing of the write rows in round robin. */
+  public static class CarbonMultiRecordWriter extends CarbonRecordWriter {
+
+    private CarbonOutputIteratorWrapper[] iterators;
+
+    private int counter;
+
+    CarbonMultiRecordWriter(CarbonOutputIteratorWrapper[] iterators,
+        DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future,
+        ExecutorService executorService) {
+      super(null, dataLoadExecutor, loadModel, future, executorService);
+      this.iterators = iterators;
+    }
+
+    @Override public synchronized void write(NullWritable aVoid, ObjectArrayWritable objects)
+        throws InterruptedException {
+      iterators[counter].write(objects.get());
+      if (++counter == iterators.length) {
+        //round robin reset
+        counter = 0;
+      }
+    }
+
+    @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException {
+      for (CarbonOutputIteratorWrapper itr : iterators) {
+        itr.closeWriter(false);
+      }
+      super.close(taskAttemptContext);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17a4b485/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index aa9aa01..666c598 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -313,6 +313,9 @@ public final class DataLoadProcessBuilder {
     }
     TableSpec tableSpec = new TableSpec(carbonTable);
     configuration.setTableSpec(tableSpec);
+    if (loadModel.getSdkUserCores() > 0) {
+      configuration.setWritingCoresCount(loadModel.getSdkUserCores());
+    }
     return configuration;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17a4b485/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 146d9af..c985952 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -59,6 +59,9 @@ public class CarbonLoadModel implements Serializable {
    */
   private boolean carbonTransactionalTable = true;
 
+  /* Number of thread in which sdk writer is used */
+  private short sdkUserCores;
+
   private String csvHeader;
   private String[] csvHeaderColumns;
   private String csvDelimiter;
@@ -392,7 +395,6 @@ public class CarbonLoadModel implements Serializable {
     this.colDictFilePath = colDictFilePath;
   }
 
-
   public DictionaryServiceProvider getDictionaryServiceProvider() {
     return dictionaryServiceProvider;
   }
@@ -470,6 +472,7 @@ public class CarbonLoadModel implements Serializable {
     copy.sortColumnsBoundsStr = sortColumnsBoundsStr;
     copy.loadMinSize = loadMinSize;
     copy.parentTablePath = parentTablePath;
+    copy.sdkUserCores = sdkUserCores;
     return copy;
   }
 
@@ -525,9 +528,11 @@ public class CarbonLoadModel implements Serializable {
     copyObj.sortColumnsBoundsStr = sortColumnsBoundsStr;
     copyObj.loadMinSize = loadMinSize;
     copyObj.parentTablePath = parentTablePath;
+    copyObj.sdkUserCores = sdkUserCores;
     return copyObj;
   }
 
+
   /**
    * @param tablePath The tablePath to set.
    */
@@ -541,7 +546,6 @@ public class CarbonLoadModel implements Serializable {
   public String getTablePath() {
     return tablePath;
   }
-
   /**
    * getLoadMetadataDetails.
    *
@@ -551,6 +555,7 @@ public class CarbonLoadModel implements Serializable {
     return loadMetadataDetails;
   }
 
+
   /**
    * Get the current load metadata.
    *
@@ -572,7 +577,6 @@ public class CarbonLoadModel implements Serializable {
   public void setLoadMetadataDetails(List<LoadMetadataDetails> loadMetadataDetails) {
     this.loadMetadataDetails = loadMetadataDetails;
   }
-
   /**
    * getSegmentUpdateStatusManager
    *
@@ -850,6 +854,7 @@ public class CarbonLoadModel implements Serializable {
   public void setTimestampformat(String timestampformat) {
     this.timestampformat = timestampformat;
   }
+
   public String getSkipEmptyLine() {
     return skipEmptyLine;
   }
@@ -858,7 +863,6 @@ public class CarbonLoadModel implements Serializable {
     this.skipEmptyLine = skipEmptyLine;
   }
 
-
   public boolean isLoadWithoutConverterStep() {
     return isLoadWithoutConverterStep;
   }
@@ -903,7 +907,6 @@ public class CarbonLoadModel implements Serializable {
   public void setMergedSegmentIds(List<String> mergedSegmentIds) {
     this.mergedSegmentIds = mergedSegmentIds;
   }
-
   public List<String> getMergedSegmentIds() {
     if (null == mergedSegmentIds) {
       mergedSegmentIds = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -911,4 +914,11 @@ public class CarbonLoadModel implements Serializable {
     return mergedSegmentIds;
   }
 
+  public short getSdkUserCores() {
+    return sdkUserCores;
+  }
+
+  public void setSdkUserCores(short sdkUserCores) {
+    this.sdkUserCores = sdkUserCores;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17a4b485/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index 50c0215..5cccd4f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -47,6 +47,8 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   private CarbonIterator<Object[]>[] inputIterators;
 
+  private short sdkUserCore;
+
   /**
    * executor service to execute the query
    */
@@ -56,6 +58,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
       CarbonIterator<Object[]>[] inputIterators) {
     super(configuration, null);
     this.inputIterators = inputIterators;
+    this.sdkUserCore = configuration.getWritingCoresCount();
   }
 
   @Override public DataField[] getOutput() {
@@ -75,7 +78,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
   @Override public Iterator<CarbonRowBatch>[] execute() {
     int batchSize = CarbonProperties.getInstance().getBatchSize();
     List<CarbonIterator<Object[]>>[] readerIterators =
-        CarbonDataProcessorUtil.partitionInputReaderIterators(inputIterators);
+        CarbonDataProcessorUtil.partitionInputReaderIterators(inputIterators, sdkUserCore);
     Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
     for (int i = 0; i < outIterators.length; i++) {
       outIterators[i] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17a4b485/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
index 13877f9..f78c224 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
@@ -41,10 +41,13 @@ public class JsonInputProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   boolean isRawDataRequired = false;
 
+  short sdkUserCore;
+
   public JsonInputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       CarbonIterator<Object[]>[] inputIterators) {
     super(configuration, null);
     this.inputIterators = inputIterators;
+    sdkUserCore = configuration.getWritingCoresCount();
   }
 
   @Override public DataField[] getOutput() {
@@ -61,7 +64,7 @@ public class JsonInputProcessorStepImpl extends AbstractDataLoadProcessorStep {
   @Override public Iterator<CarbonRowBatch>[] execute() {
     int batchSize = CarbonProperties.getInstance().getBatchSize();
     List<CarbonIterator<Object[]>>[] readerIterators =
-        CarbonDataProcessorUtil.partitionInputReaderIterators(inputIterators);
+        CarbonDataProcessorUtil.partitionInputReaderIterators(inputIterators, sdkUserCore);
     Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
     for (int i = 0; i < outIterators.length; i++) {
       outIterators[i] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17a4b485/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index dfe0e24..f497da3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -659,9 +659,14 @@ public final class CarbonDataProcessorUtil {
    * @return
    */
   public static List<CarbonIterator<Object[]>>[] partitionInputReaderIterators(
-      CarbonIterator<Object[]>[] inputIterators) {
+      CarbonIterator<Object[]>[] inputIterators, short sdkUserCores) {
     // Get the number of cores configured in property.
-    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    int numberOfCores;
+    if (sdkUserCores > 0) {
+      numberOfCores = sdkUserCores;
+    } else {
+      numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    }
     // Get the minimum of number of cores and iterators size to get the number of parallel threads
     // to be launched.
     int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17a4b485/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 1489435..d41e3d0 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -374,6 +374,8 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * This writer is not thread safe,
+   * use buildThreadSafeWriterForCSVInput in multi thread environment
    * Build a {@link CarbonWriter}, which accepts row in CSV format
    * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
    * @return CSVCarbonWriter
@@ -390,6 +392,31 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   *
+   * Build a {@link CarbonWriter}, which accepts row in CSV format
+   * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
+   * @param numOfThreads number of threads() in which .write will be called.
+   * @return CSVCarbonWriter
+   * @throws IOException
+   * @throws InvalidLoadOptionException
+   */
+  public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads)
+      throws IOException, InvalidLoadOptionException {
+    Objects.requireNonNull(schema, "schema should not be null");
+    Objects.requireNonNull(numOfThreads, "numOfThreads should not be null");
+    Objects.requireNonNull(path, "path should not be null");
+    this.schema = schema;
+    if (numOfThreads <= 0) {
+      throw new IllegalArgumentException(" numOfThreads must be greater than 0");
+    }
+    CarbonLoadModel loadModel = buildLoadModel(schema);
+    loadModel.setSdkUserCores(numOfThreads);
+    return new CSVCarbonWriter(loadModel);
+  }
+
+  /**
+   * This writer is not thread safe,
+   * use buildThreadSafeWriterForAvroInput in multi thread environment
    * Build a {@link CarbonWriter}, which accepts Avro object
    * @param avroSchema avro Schema object {org.apache.avro.Schema}
    * @return AvroCarbonWriter
@@ -411,6 +438,36 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * Build a {@link CarbonWriter}, which accepts Avro object
+   * @param avroSchema avro Schema object {org.apache.avro.Schema}
+   * @param numOfThreads number of threads() in which .write will be called.
+   * @return AvroCarbonWriter
+   * @throws IOException
+   * @throws InvalidLoadOptionException
+   */
+  public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema,
+      short numOfThreads)
+      throws IOException, InvalidLoadOptionException {
+    this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
+    Objects.requireNonNull(schema, "schema should not be null");
+    Objects.requireNonNull(path, "path should not be null");
+    Objects.requireNonNull(numOfThreads, "numOfThreads should not be null");
+    if (numOfThreads <= 0) {
+      throw new IllegalArgumentException(" numOfThreads must be greater than 0");
+    }
+    CarbonLoadModel loadModel = buildLoadModel(schema);
+    // AVRO records are pushed to Carbon as Object not as Strings. This was done in order to
+    // handle multi level complex type support. As there are no conversion converter step is
+    // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder
+    // which will skip Conversion Step.
+    loadModel.setLoadWithoutConverterStep(true);
+    loadModel.setSdkUserCores(numOfThreads);
+    return new AvroCarbonWriter(loadModel);
+  }
+
+  /**
+   * This writer is not thread safe,
+   * use buildThreadSafeWriterForJsonInput in multi thread environment
    * Build a {@link CarbonWriter}, which accepts Json object
    * @param carbonSchema carbon Schema object
    * @return JsonCarbonWriter
@@ -427,6 +484,31 @@ public class CarbonWriterBuilder {
     return new JsonCarbonWriter(loadModel);
   }
 
+  /**
+   * Can use this writer in multi-thread instance.
+   *
+   * Build a {@link CarbonWriter}, which accepts Json object
+   * @param carbonSchema carbon Schema object
+   * @param numOfThreads number of threads() in which .write will be called.
+   * @return JsonCarbonWriter
+   * @throws IOException
+   * @throws InvalidLoadOptionException
+   */
+  public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads)
+      throws IOException, InvalidLoadOptionException {
+    Objects.requireNonNull(carbonSchema, "schema should not be null");
+    Objects.requireNonNull(path, "path should not be null");
+    Objects.requireNonNull(numOfThreads, "numOfThreads should not be null");
+    if (numOfThreads <= 0) {
+      throw new IllegalArgumentException(" numOfThreads must be greater than 0");
+    }
+    this.schema = carbonSchema;
+    CarbonLoadModel loadModel = buildLoadModel(schema);
+    loadModel.setJsonFileLoad(true);
+    loadModel.setSdkUserCores(numOfThreads);
+    return new JsonCarbonWriter(loadModel);
+  }
+
   private void setCsvHeader(CarbonLoadModel model) {
     Field[] fields = schema.getFields();
     StringBuilder builder = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17a4b485/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
new file mode 100644
index 0000000..8ce1ef1
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * multi-thread Test suite for {@link CSVCarbonWriter}
+ */
+public class ConcurrentSdkWriterTest {
+
+  private static final int recordsPerItr = 10;
+  private static final short numOfThreads = 4;
+
+  @Test
+  public void testWriteFiles() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+
+
+    ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .outputPath(path);
+      CarbonWriter writer =
+          builder.buildThreadSafeWriterForCSVInput(new Schema(fields), numOfThreads);
+      // write in multi-thread
+      for (int i = 0; i < numOfThreads; i++) {
+        executorService.submit(new WriteLogic(writer));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.HOURS);
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    // read the files and verify the count
+    CarbonReader reader;
+    try {
+      reader = CarbonReader
+          .builder(path, "_temp")
+          .projection(new String[]{"name", "age"})
+          .build();
+      int i = 0;
+      while (reader.hasNext()) {
+        Object[] row = (Object[]) reader.readNextRow();
+        i++;
+      }
+      Assert.assertEquals(i, numOfThreads * recordsPerItr);
+      reader.close();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  class WriteLogic implements Runnable {
+    CarbonWriter writer;
+
+    WriteLogic(CarbonWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override public void run() {
+      try {
+        for (int i = 0; i < recordsPerItr; i++) {
+          writer.write(new String[] { "robot" + (i % 10), String.valueOf(i),
+              String.valueOf((double) i / 2) });
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+  }
+
+}