You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/11 12:16:15 UTC

[1/3] carbondata git commit: [CARBONDATA-2909] Multi user support for SDK on S3

Repository: carbondata
Updated Branches:
  refs/heads/master 7c827c0a9 -> 8f1a029b9


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 4859dd2..58bf3ab 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -179,7 +179,8 @@ public class CarbonReaderBuilder {
    * @throws IOException
    * @throws InterruptedException
    */
-  public <T> CarbonReader<T> build() throws IOException, InterruptedException {
+  public <T> CarbonReader<T> build(Configuration configuration)
+      throws IOException, InterruptedException {
     // DB name is not applicable for SDK reader as, table will be never registered.
     CarbonTable table;
     if (isTransactionalTable) {
@@ -193,7 +194,7 @@ public class CarbonReaderBuilder {
       }
     }
     final CarbonFileInputFormat format = new CarbonFileInputFormat();
-    final Job job = new Job(new Configuration());
+    final Job job = new Job(configuration);
     format.setTableInfo(job.getConfiguration(), table.getTableInfo());
     format.setTablePath(job.getConfiguration(), table.getTablePath());
     format.setTableName(job.getConfiguration(), table.getTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 89e69fb..28a0dde 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -45,14 +45,13 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonSessionInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.ThriftWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.s3a.Constants;
 
 /**
@@ -74,10 +73,6 @@ public class CarbonWriterBuilder {
   private int localDictionaryThreshold;
   private boolean isLocalDictionaryEnabled;
 
-  public CarbonWriterBuilder() {
-    ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
-  }
-
   /**
    * Sets the output path of the writer builder
    * @param path is the absolute path where output files are written
@@ -398,13 +393,13 @@ public class CarbonWriterBuilder {
    * @throws IOException
    * @throws InvalidLoadOptionException
    */
-  public CarbonWriter buildWriterForCSVInput(Schema schema)
+  public CarbonWriter buildWriterForCSVInput(Schema schema, Configuration configuration)
       throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(schema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
     this.schema = schema;
     CarbonLoadModel loadModel = buildLoadModel(schema);
-    return new CSVCarbonWriter(loadModel);
+    return new CSVCarbonWriter(loadModel, configuration);
   }
 
   /**
@@ -416,8 +411,8 @@ public class CarbonWriterBuilder {
    * @throws IOException
    * @throws InvalidLoadOptionException
    */
-  public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads)
-      throws IOException, InvalidLoadOptionException {
+  public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads,
+      Configuration configuration) throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(schema, "schema should not be null");
     Objects.requireNonNull(numOfThreads, "numOfThreads should not be null");
     Objects.requireNonNull(path, "path should not be null");
@@ -427,7 +422,7 @@ public class CarbonWriterBuilder {
     }
     CarbonLoadModel loadModel = buildLoadModel(schema);
     loadModel.setSdkWriterCores(numOfThreads);
-    return new CSVCarbonWriter(loadModel);
+    return new CSVCarbonWriter(loadModel, configuration);
   }
 
   /**
@@ -439,8 +434,8 @@ public class CarbonWriterBuilder {
    * @throws IOException
    * @throws InvalidLoadOptionException
    */
-  public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema avroSchema)
-      throws IOException, InvalidLoadOptionException {
+  public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema avroSchema,
+      Configuration configuration) throws IOException, InvalidLoadOptionException {
     this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
     Objects.requireNonNull(schema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
@@ -450,7 +445,7 @@ public class CarbonWriterBuilder {
     // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder
     // which will skip Conversion Step.
     loadModel.setLoadWithoutConverterStep(true);
-    return new AvroCarbonWriter(loadModel);
+    return new AvroCarbonWriter(loadModel, configuration);
   }
 
   /**
@@ -462,7 +457,7 @@ public class CarbonWriterBuilder {
    * @throws InvalidLoadOptionException
    */
   public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema,
-      short numOfThreads)
+      short numOfThreads, Configuration configuration)
       throws IOException, InvalidLoadOptionException {
     this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
     Objects.requireNonNull(schema, "schema should not be null");
@@ -478,7 +473,7 @@ public class CarbonWriterBuilder {
     // which will skip Conversion Step.
     loadModel.setLoadWithoutConverterStep(true);
     loadModel.setSdkWriterCores(numOfThreads);
-    return new AvroCarbonWriter(loadModel);
+    return new AvroCarbonWriter(loadModel, configuration);
   }
 
   /**
@@ -490,14 +485,14 @@ public class CarbonWriterBuilder {
    * @throws IOException
    * @throws InvalidLoadOptionException
    */
-  public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema)
+  public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, Configuration configuration)
       throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(carbonSchema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
     this.schema = carbonSchema;
     CarbonLoadModel loadModel = buildLoadModel(carbonSchema);
     loadModel.setJsonFileLoad(true);
-    return new JsonCarbonWriter(loadModel);
+    return new JsonCarbonWriter(loadModel, configuration);
   }
 
   /**
@@ -510,11 +505,10 @@ public class CarbonWriterBuilder {
    * @throws IOException
    * @throws InvalidLoadOptionException
    */
-  public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads)
-      throws IOException, InvalidLoadOptionException {
+  public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads,
+      Configuration configuration) throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(carbonSchema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
-    Objects.requireNonNull(numOfThreads, "numOfThreads should not be null");
     if (numOfThreads <= 0) {
       throw new IllegalArgumentException(" numOfThreads must be greater than 0");
     }
@@ -522,7 +516,7 @@ public class CarbonWriterBuilder {
     CarbonLoadModel loadModel = buildLoadModel(schema);
     loadModel.setJsonFileLoad(true);
     loadModel.setSdkWriterCores(numOfThreads);
-    return new JsonCarbonWriter(loadModel);
+    return new JsonCarbonWriter(loadModel, configuration);
   }
 
   private void setCsvHeader(CarbonLoadModel model) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
index b6e7ad5..5f65539 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
@@ -23,7 +23,6 @@ import java.util.Random;
 import java.util.UUID;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -47,15 +46,14 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
   private TaskAttemptContext context;
   private ObjectArrayWritable writable;
 
-  JsonCarbonWriter(CarbonLoadModel loadModel) throws IOException {
-    Configuration OutputHadoopConf = FileFactory.getConfiguration();
-    CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel);
+  JsonCarbonWriter(CarbonLoadModel loadModel, Configuration configuration) throws IOException {
+    CarbonTableOutputFormat.setLoadModel(configuration, loadModel);
     CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat();
     JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
     Random random = new Random();
     TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
     TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
-    TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID);
+    TaskAttemptContextImpl context = new TaskAttemptContextImpl(configuration, attemptID);
     this.recordWriter = outputFormat.getRecordWriter(context);
     this.context = context;
     this.writable = new ObjectArrayWritable();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index 4320edc..8f53125 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -83,7 +83,7 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema);
     try {
       CarbonWriter writer = CarbonWriter.builder().outputPath(path).isTransactionalTable(true)
-          .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema));
+          .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -151,7 +151,7 @@ public class AvroCarbonWriterTest {
       CarbonWriter writer = CarbonWriter.builder()
           .outputPath(path)
           .isTransactionalTable(true)
-          .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema));
+          .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -243,7 +243,7 @@ public class AvroCarbonWriterTest {
       CarbonWriter writer = CarbonWriter.builder()
           .outputPath(path)
           .isTransactionalTable(true)
-          .buildWriterForAvroInput(nn);
+          .buildWriterForAvroInput(nn, TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -304,7 +304,7 @@ public class AvroCarbonWriterTest {
       CarbonWriter writer = CarbonWriter.builder()
           .outputPath(path)
           .isTransactionalTable(true)
-          .buildWriterForAvroInput(nn);
+          .buildWriterForAvroInput(nn, TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -340,7 +340,7 @@ public class AvroCarbonWriterTest {
       CarbonWriter writer = CarbonWriter.builder()
           .outputPath(path)
           .isTransactionalTable(true).sortBy(sortColumns)
-          .buildWriterForAvroInput(nn);
+          .buildWriterForAvroInput(nn, TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -458,7 +458,7 @@ public class AvroCarbonWriterTest {
         .uniqueIdentifier(System.currentTimeMillis()).outputPath(path);
 
     try {
-      writer.buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field));
+      writer.buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field), TestUtil.configuration);
       Assert.fail();
     } catch (Exception e) {
       assert(e.getMessage().contains("Duplicate column name found in table schema"));
@@ -478,7 +478,7 @@ public class AvroCarbonWriterTest {
       Map<String, String> loadOptions = new HashMap<String, String>();
       loadOptions.put("bad_records_action", "fail");
       CarbonWriter carbonWriter =
-          writer.isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field));
+          writer.isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field), TestUtil.configuration);
       carbonWriter.write(new String[] { "k", "20-02-2233" });
       carbonWriter.close();
       Assert.fail();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index e71d061..3d59724 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -120,7 +120,7 @@ public class CSVCarbonWriterTest {
           .isTransactionalTable(true)
           .outputPath(path);
 
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         String[] row = new String[]{
@@ -243,7 +243,7 @@ public class CSVCarbonWriterTest {
     fields[1] = new Field("age", DataTypes.INT);
     try {
       carbonWriter = CarbonWriter.builder().isTransactionalTable(false).
-          outputPath(path).buildWriterForCSVInput(new Schema(fields));
+          outputPath(path).buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
       Assert.assertTrue(false);
@@ -263,7 +263,7 @@ public class CSVCarbonWriterTest {
     fields[1] = new Field("age", DataTypes.INT);
     try {
       carbonWriter = CarbonWriter.builder().isTransactionalTable(false).
-          outputPath(path).buildWriterForCSVInput(new Schema(fields));
+          outputPath(path).buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
       Assert.assertTrue(false);
@@ -289,7 +289,7 @@ public class CSVCarbonWriterTest {
           .isTransactionalTable(true).taskNo(5)
           .outputPath(path);
 
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
 
       for (int i = 0; i < 2; i++) {
         String[] row = new String[]{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
index c1d5f88..a7ec6cd 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
@@ -132,7 +132,7 @@ public class CSVNonTransactionalCarbonWriterTest {
         builder = builder.withBlockSize(blockSize);
       }
 
-      CarbonWriter writer = builder.buildWriterForCSVInput(schema);
+      CarbonWriter writer = builder.buildWriterForCSVInput(schema, TestUtil.configuration);
 
       for (int i = 0; i < rows; i++) {
         writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
@@ -183,7 +183,7 @@ public class CSVNonTransactionalCarbonWriterTest {
           .taskNo(System.nanoTime())
           .outputPath(path);
 
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         String[] row = new String[]{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 54b3e9e..dedf30e 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -40,10 +40,13 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import junit.framework.TestCase;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.CharEncoding;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.*;
 
 public class CarbonReaderTest extends TestCase {
 
+  private Configuration conf = new Configuration(false);
+  
   @Before
   public void cleanFile() {
     assert (TestUtil.cleanMdtFile());
@@ -72,7 +75,7 @@ public class CarbonReaderTest extends TestCase {
     TestUtil.writeFilesAndVerify(200, new Schema(fields), path, true);
 
     CarbonReader reader = CarbonReader.builder(path, "_temp").isTransactionalTable(true)
-        .projection(new String[]{"name", "age"}).build();
+        .projection(new String[]{"name", "age"}).build(conf);
 
     // expected output after sorting
     String[] name = new String[200];
@@ -99,7 +102,7 @@ public class CarbonReaderTest extends TestCase {
         .builder(path, "_temp")
         .isTransactionalTable(true)
         .projection(new String[]{"name", "age"})
-        .build();
+        .build(conf);
 
     i = 0;
     while (reader2.hasNext()) {
@@ -134,7 +137,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(true)
         .projection(new String[]{"name", "age"})
         .filter(equalToExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -176,7 +179,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(true)
         .projection(new String[]{"name", "age", "doubleField"})
         .filter(andExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -213,7 +216,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(false)
         .projection(new String[]{"name", "age"})
         .filter(equalToExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -249,7 +252,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(false)
         .projection(new String[]{"name", "age"})
         .filter(equalToExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -292,7 +295,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(false)
         .projection(new String[]{"name", "age", "doubleField"})
         .filter(andExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -335,7 +338,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(false)
         .projection(new String[]{"name", "age", "doubleField"})
         .filter(orExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -378,7 +381,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(false)
         .projection(new String[]{"name", "age", "doubleField"})
         .filter(andExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -421,7 +424,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(false)
         .projection(new String[]{"name", "age", "doubleField"})
         .filter(andExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -464,7 +467,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(false)
         .projection(new String[]{"name", "age", "doubleField"})
         .filter(andExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -507,7 +510,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(false)
         .projection(new String[]{"name", "age", "doubleField"})
         .filter(andExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -550,7 +553,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(false)
         .projection(new String[]{"name", "age", "doubleField"})
         .filter(andExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -582,7 +585,7 @@ public class CarbonReaderTest extends TestCase {
     CarbonWriter carbonWriter = null;
     try {
       carbonWriter = builder.outputPath(path1).isTransactionalTable(false).uniqueIdentifier(12345)
-  .buildWriterForCSVInput(schema);
+  .buildWriterForCSVInput(schema, TestUtil.configuration);
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
     }
@@ -597,7 +600,7 @@ public class CarbonReaderTest extends TestCase {
     CarbonWriter carbonWriter1 = null;
     try {
       carbonWriter1 = builder1.outputPath(path2).isTransactionalTable(false).uniqueIdentifier(12345)
-   .buildWriterForCSVInput(schema1);
+   .buildWriterForCSVInput(schema1, TestUtil.configuration);
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
     }
@@ -608,14 +611,14 @@ public class CarbonReaderTest extends TestCase {
        CarbonReader reader =
        CarbonReader.builder(path1, "_temp").
        projection(new String[] { "c1", "c3" })
-       .isTransactionalTable(false).build();
+       .isTransactionalTable(false).build(conf);
     } catch (Exception e){
        System.out.println("Success");
     }
     CarbonReader reader1 =
          CarbonReader.builder(path2, "_temp1")
      .projection(new String[] { "p1", "p2" })
-     .isTransactionalTable(false).build();
+     .isTransactionalTable(false).build(conf);
 
     while (reader1.hasNext()) {
        Object[] row1 = (Object[]) reader1.readNextRow();
@@ -643,7 +646,7 @@ public class CarbonReaderTest extends TestCase {
         .builder(path, "_temp")
         .projection(new String[]{"name", "name", "age", "name"})
         .isTransactionalTable(true)
-        .build();
+        .build(conf);
 
     // expected output after sorting
     String[] name = new String[100];
@@ -685,13 +688,13 @@ public class CarbonReaderTest extends TestCase {
         .builder(path, "_temp")
         .projection(new String[]{"name", "age"})
         .isTransactionalTable(true)
-        .build();
+        .build(conf);
     // Reader 2
     CarbonReader reader2 = CarbonReader
         .builder(path, "_temp")
         .projection(new String[]{"name", "age"})
         .isTransactionalTable(true)
-        .build();
+        .build(conf);
 
     while (reader.hasNext()) {
       Object[] row = (Object[]) reader.readNextRow();
@@ -719,7 +722,7 @@ public class CarbonReaderTest extends TestCase {
     TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
 
     CarbonReader reader = CarbonReader.builder(path, "_temp").isTransactionalTable(true)
-        .projection(new String[]{"name", "age"}).build();
+        .projection(new String[]{"name", "age"}).build(conf);
 
     reader.close();
     String msg = "CarbonReader not initialise, please create it first.";
@@ -762,7 +765,7 @@ public class CarbonReaderTest extends TestCase {
         .builder(path)
         .projection(new String[]{"name", "age"})
         .isTransactionalTable(true)
-        .build();
+        .build(conf);
 
     // expected output after sorting
     String[] name = new String[100];
@@ -799,7 +802,7 @@ public class CarbonReaderTest extends TestCase {
 
     CarbonReader reader = CarbonReader
         .builder(path)
-        .build();
+        .build(conf);
 
     // expected output after sorting
     String[] name = new String[100];
@@ -910,7 +913,7 @@ public class CarbonReaderTest extends TestCase {
 
     CarbonReader reader = CarbonReader.builder(path, "_temp")
         .projection(new String[]{"name", "age"})
-        .build();
+        .build(conf);
 
     // expected output after sorting
     String[] name = new String[100];
@@ -987,7 +990,7 @@ public class CarbonReaderTest extends TestCase {
           .persistSchemaFile(true)
           .outputPath(path);
 
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         String[] row = new String[]{
@@ -1045,7 +1048,7 @@ public class CarbonReaderTest extends TestCase {
             , "dateField"
             , "timeField"
             , "decimalField"})
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -1108,7 +1111,7 @@ public class CarbonReaderTest extends TestCase {
           .persistSchemaFile(true)
           .outputPath(path);
 
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         String[] row2 = new String[]{
@@ -1161,7 +1164,7 @@ public class CarbonReaderTest extends TestCase {
         .builder(path, "_temp")
         .isTransactionalTable(true)
         .projection(strings)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -1224,7 +1227,7 @@ public class CarbonReaderTest extends TestCase {
           .persistSchemaFile(true)
           .outputPath(path);
 
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         String[] row2 = new String[]{
@@ -1275,7 +1278,7 @@ public class CarbonReaderTest extends TestCase {
         .builder(path, "_temp")
         .projection(strings)
         .isTransactionalTable(true)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -1338,7 +1341,7 @@ public class CarbonReaderTest extends TestCase {
           .persistSchemaFile(true)
           .outputPath(path);
 
-      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         String[] row2 = new String[]{
@@ -1381,7 +1384,7 @@ public class CarbonReaderTest extends TestCase {
         .builder(path, "_temp")
         .isTransactionalTable(true)
         .projection(strings)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {
@@ -1423,7 +1426,7 @@ public class CarbonReaderTest extends TestCase {
     CarbonReader reader = CarbonReader
         .builder(path, "_temp")
         .isTransactionalTable(true)
-        .build();
+        .build(conf);
 
     // expected output after sorting
     String[] name = new String[100];
@@ -1461,7 +1464,7 @@ public class CarbonReaderTest extends TestCase {
     CarbonReader reader = CarbonReader
         .builder(path, "_temp")
         .isTransactionalTable(true)
-        .build();
+        .build(conf);
 
     // expected output after sorting
     String[] name = new String[100];
@@ -1498,7 +1501,7 @@ public class CarbonReaderTest extends TestCase {
           .builder(path, "_temp")
           .projection(new String[]{})
           .isTransactionalTable(true)
-          .build();
+          .build(conf);
       assert (false);
     } catch (RuntimeException e) {
       assert (e.getMessage().equalsIgnoreCase("Projection can't be empty"));
@@ -1517,7 +1520,7 @@ public class CarbonReaderTest extends TestCase {
       CarbonWriter writer = CarbonWriter.builder()
           .outputPath(path)
           .isTransactionalTable(isTransactionalTable)
-          .buildWriterForAvroInput(nn);
+          .buildWriterForAvroInput(nn, TestUtil.configuration);
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -1645,7 +1648,7 @@ public class CarbonReaderTest extends TestCase {
     CarbonReader reader = CarbonReader
         .builder(path, "_temp")
         .isTransactionalTable(false)
-        .build();
+        .build(conf);
 
     // expected output
     String name = "bob";
@@ -1688,7 +1691,7 @@ public class CarbonReaderTest extends TestCase {
         .isTransactionalTable(false)
         .projection(new String[]{"name", "age"})
         .filter(equalToExpression)
-        .build();
+        .build(conf);
 
     int i = 0;
     while (reader.hasNext()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
index 9a6e8e0..4cb816b 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.avro.generic.GenericData;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -59,7 +60,8 @@ public class ConcurrentAvroSdkWriterTest {
     ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
-      CarbonWriter writer = builder.buildThreadSafeWriterForAvroInput(avroSchema, numOfThreads);
+      CarbonWriter writer = builder.buildThreadSafeWriterForAvroInput(avroSchema, numOfThreads,
+          TestUtil.configuration);
       // write in multi-thread
       for (int i = 0; i < numOfThreads; i++) {
         executorService.submit(new WriteLogic(writer, record));
@@ -76,7 +78,7 @@ public class ConcurrentAvroSdkWriterTest {
     CarbonReader reader;
     try {
       reader =
-          CarbonReader.builder(path, "_temp").projection(new String[] { "name", "age" }).build();
+          CarbonReader.builder(path, "_temp").projection(new String[] { "name", "age" }).build(new Configuration(false));
       int i = 0;
       while (reader.hasNext()) {
         Object[] row = (Object[]) reader.readNextRow();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
index 8ce1ef1..9bb3f29 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -53,7 +54,7 @@ public class ConcurrentSdkWriterTest {
       CarbonWriterBuilder builder = CarbonWriter.builder()
           .outputPath(path);
       CarbonWriter writer =
-          builder.buildThreadSafeWriterForCSVInput(new Schema(fields), numOfThreads);
+          builder.buildThreadSafeWriterForCSVInput(new Schema(fields), numOfThreads, TestUtil.configuration);
       // write in multi-thread
       for (int i = 0; i < numOfThreads; i++) {
         executorService.submit(new WriteLogic(writer));
@@ -72,7 +73,7 @@ public class ConcurrentSdkWriterTest {
       reader = CarbonReader
           .builder(path, "_temp")
           .projection(new String[]{"name", "age"})
-          .build();
+          .build(new Configuration(false));
       int i = 0;
       while (reader.hasNext()) {
         Object[] row = (Object[]) reader.readNextRow();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
index 83026a2..2d5dbcd 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -38,10 +38,13 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.JsonDecoder;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 
 public class TestUtil {
 
+  public static Configuration configuration = new Configuration();
+
   public static GenericData.Record jsonToAvro(String json, String avroSchema) throws IOException {
     InputStream input = null;
     DataFileWriter writer = null;
@@ -130,7 +133,7 @@ public class TestUtil {
         builder = builder.withBlockSize(blockSize);
       }
 
-      CarbonWriter writer = builder.buildWriterForCSVInput(schema);
+      CarbonWriter writer = builder.buildWriterForCSVInput(schema, configuration);
 
       for (int i = 0; i < rows; i++) {
         writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index 30aa415..7a764dd 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -176,7 +177,7 @@ public class SearchRequestHandler {
       if (uniqueSegments.get(segmentId) == null) {
         segments.add(Segment.toSegment(segmentId,
             new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
-                loadMetadataDetails)));
+                loadMetadataDetails, FileFactory.getConfiguration())));
         uniqueSegments.put(segmentId, 1);
       } else {
         uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);


[2/3] carbondata git commit: [CARBONDATA-2909] Multi user support for SDK on S3

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index ff2ffdd..809d68b 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -53,8 +54,8 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
     FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema());
     try {
       dataMap.init(new DataMapModel(
-          DataMapWriter.getDefaultDataMapPath(
-              tableIdentifier.getTablePath(), segment.getSegmentNo(), dataMapName)));
+          DataMapWriter.getDefaultDataMapPath(tableIdentifier.getTablePath(),
+              segment.getSegmentNo(), dataMapName), segment.getConfiguration()));
     } catch (MemoryException e) {
       LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage());
       return lstDataMap;
@@ -73,7 +74,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
     FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema());
     String indexPath = ((LuceneDataMapDistributable) distributable).getIndexPath();
     try {
-      dataMap.init(new DataMapModel(indexPath));
+      dataMap.init(new DataMapModel(indexPath, FileFactory.getConfiguration()));
     } catch (MemoryException e) {
       LOGGER.error(String.format("failed to get lucene datamap , detail is %s", e.getMessage()));
       return lstDataMap;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index c80cc75..9c1e18d 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -383,11 +383,12 @@ public CarbonWriterBuilder withTableProperties(Map<String, String> options);
 * this writer is not thread safe, use buildThreadSafeWriterForCSVInput in multi thread environment
 * Build a {@link CarbonWriter}, which accepts row in CSV format object
 * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
+* @param configuration hadoop configuration object.
 * @return CSVCarbonWriter
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema) throws IOException, InvalidLoadOptionException;
+public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException;
 ```
 
 ```
@@ -395,12 +396,13 @@ public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema
 * Can use this writer in multi-thread instance.
 * Build a {@link CarbonWriter}, which accepts row in CSV format
 * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
-* @param numOfThreads number of threads() in which .write will be called.              
+* @param numOfThreads number of threads() in which .write will be called.    
+* @param configuration hadoop configuration object          
 * @return CSVCarbonWriter
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads)
+public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads, Configuration configuration)
   throws IOException, InvalidLoadOptionException;
 ```
 
@@ -410,11 +412,12 @@ public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfT
 * this writer is not thread safe, use buildThreadSafeWriterForAvroInput in multi thread environment
 * Build a {@link CarbonWriter}, which accepts Avro format object
 * @param avroSchema avro Schema object {org.apache.avro.Schema}
+* @param configuration hadoop configuration object
 * @return AvroCarbonWriter 
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throws IOException, InvalidLoadOptionException;
+public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException;
 ```
 
 ```
@@ -423,11 +426,13 @@ public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throw
 * Build a {@link CarbonWriter}, which accepts Avro object
 * @param avroSchema avro Schema object {org.apache.avro.Schema}
 * @param numOfThreads number of threads() in which .write will be called.
+* @param configuration hadoop configuration object
 * @return AvroCarbonWriter
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short numOfThreads)
+public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short 
+numOfThreads, Configuration configuration)
   throws IOException, InvalidLoadOptionException
 ```
 
@@ -437,11 +442,12 @@ public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avr
 * this writer is not thread safe, use buildThreadSafeWriterForJsonInput in multi thread environment
 * Build a {@link CarbonWriter}, which accepts Json object
 * @param carbonSchema carbon Schema object
+* @param configuration hadoop configuration object
 * @return JsonCarbonWriter
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema);
+public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, Configuration configuration);
 ```
 
 ```
@@ -450,11 +456,12 @@ public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema);
 * Build a {@link CarbonWriter}, which accepts Json object
 * @param carbonSchema carbon Schema object
 * @param numOfThreads number of threads() in which .write will be called.
+* @param configuration hadoop configuraiton object.
 * @return JsonCarbonWriter
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads)
+public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads, Configuration configuration)
 ```
 
 ### Class org.apache.carbondata.sdk.file.CarbonWriter

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
index ada1a8c..4eec4bf 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
@@ -23,6 +23,7 @@ import java.sql.Date;
 import java.sql.Timestamp;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
 
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.sdk.file.CarbonReader;
@@ -55,7 +56,7 @@ public class CarbonReaderExample {
 
             CarbonWriter writer = CarbonWriter.builder()
                 .outputPath(path)
-                .buildWriterForCSVInput(new Schema(fields));
+                .buildWriterForCSVInput(new Schema(fields), new Configuration(false));
 
             for (int i = 0; i < 10; i++) {
                 String[] row2 = new String[]{
@@ -98,7 +99,7 @@ public class CarbonReaderExample {
             CarbonReader reader = CarbonReader
                 .builder(path, "_temp")
                 .projection(strings)
-                .build();
+                .build(new Configuration(false));
 
             System.out.println("\nData:");
             long day = 24L * 3600 * 1000;
@@ -116,7 +117,7 @@ public class CarbonReaderExample {
             // Read data
             CarbonReader reader2 = CarbonReader
                 .builder(path, "_temp")
-                .build();
+                .build(new Configuration(false));
 
             System.out.println("\nData:");
             i = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index 52d51b5..3abc342 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -25,6 +25,8 @@ import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
 import org.apache.carbondata.sdk.file.*;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Example for testing CarbonWriter on S3
  */
@@ -56,7 +58,7 @@ public class SDKS3Example {
                 .setEndPoint(args[2])
                 .outputPath(path);
 
-        CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+        CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), new Configuration(false));
 
         for (int i = 0; i < num; i++) {
             writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});
@@ -75,7 +77,7 @@ public class SDKS3Example {
             .setAccessKey(args[0])
             .setSecretKey(args[1])
             .setEndPoint(args[2])
-            .build();
+            .build(new Configuration(false));
 
         System.out.println("\nData:");
         int i = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
index a011d80..86bf854 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.examples
 import java.io.File
 
 import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.examples.util.ExampleUtils
@@ -36,7 +37,7 @@ object DirectSQLExample {
   def buildTestData(
       path: String,
       num: Int = 3,
-      persistSchema: Boolean = false): Any = {
+      persistSchema: Boolean = false, sparkSession: SparkSession): Any = {
 
     // getCanonicalPath gives path with \, but the code expects /.
     val writerPath = path.replace("\\", "/");
@@ -56,7 +57,8 @@ object DirectSQLExample {
       if (persistSchema) {
         builder.persistSchemaFile(true)
       }
-      val writer = builder.buildWriterForCSVInput(new Schema(fields))
+      val writer = builder
+        .buildWriterForCSVInput(new Schema(fields), sparkSession.sparkContext.hadoopConfiguration)
       var i = 0
       while (i < num) {
         writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -82,7 +84,7 @@ object DirectSQLExample {
     import carbonSession._
     // 1. generate data file
     cleanTestData(path)
-    buildTestData(path, 20)
+    buildTestData(path, 20, sparkSession = carbonSession)
     val readPath = path + "Fact/Part0/Segment_null"
 
     println("Running SQL on carbon files directly")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
index 1795960..c5c9710 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.examples
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
 import org.apache.spark.sql.SparkSession
 import org.slf4j.{Logger, LoggerFactory}
@@ -52,12 +53,12 @@ object S3UsingSDKExample {
           builder.outputPath(writerPath).isTransactionalTable(true)
             .uniqueIdentifier(
               System.currentTimeMillis)
-            .buildWriterForCSVInput(new Schema(fields))
+            .buildWriterForCSVInput(new Schema(fields), new Configuration(false))
         } else {
           builder.outputPath(writerPath).isTransactionalTable(true)
             .uniqueIdentifier(
               System.currentTimeMillis).withBlockSize(2)
-            .buildWriterForCSVInput(new Schema(fields))
+            .buildWriterForCSVInput(new Schema(fields), new Configuration(false))
         }
       var i = 0
       var row = num

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 9e5edc1..fcfb346 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -113,11 +113,14 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
       ReadCommittedScope readCommittedScope = null;
       if (carbonTable.isTransactionalTable()) {
         readCommittedScope = new LatestFilesReadCommittedScope(
-            identifier.getTablePath() + "/Fact/Part0/Segment_null/");
+            identifier.getTablePath() + "/Fact/Part0/Segment_null/", job.getConfiguration());
       } else {
         readCommittedScope = getReadCommittedScope(job.getConfiguration());
         if (readCommittedScope == null) {
-          readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
+          readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(), job
+              .getConfiguration());
+        } else {
+          readCommittedScope.setConfiguration(job.getConfiguration());
         }
       }
       // this will be null in case of corrupt schema file.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index e5e3165..eb9ff7c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -188,8 +188,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
           context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
       List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
       Set<Segment> segmentSet = new HashSet<>(
-          new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
-              .getValidAndInvalidSegments().getValidSegments());
+          new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
+              context.getConfiguration()).getValidAndInvalidSegments().getValidSegments());
       if (updateTime != null) {
         CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true,
             segmentDeleteList);
@@ -223,8 +223,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
 
     if (partitionSpecs != null && partitionSpecs.size() > 0) {
       List<Segment> validSegments =
-          new SegmentStatusManager(table.getAbsoluteTableIdentifier()).getValidAndInvalidSegments()
-              .getValidSegments();
+          new SegmentStatusManager(table.getAbsoluteTableIdentifier())
+              .getValidAndInvalidSegments().getValidSegments();
       String uniqueId = String.valueOf(System.currentTimeMillis());
       List<String> tobeUpdatedSegs = new ArrayList<>();
       List<String> tobeDeletedSegs = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ba6e043..ba3accf 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -143,7 +143,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<Segment> streamSegments = null;
     // get all valid segments and set them into the configuration
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier,
+        readCommittedScope.getConfiguration());
     SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
         .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
 
@@ -583,7 +584,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(
         table, loadMetadataDetails);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
-        new SegmentStatusManager(identifier)
+        new SegmentStatusManager(identifier, readCommittedScope.getConfiguration())
             .getValidAndInvalidSegments(loadMetadataDetails, readCommittedScope);
     Map<String, Long> blockRowCountMapping = new HashMap<>();
     Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
@@ -649,11 +650,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     if (readCommittedScope == null) {
       ReadCommittedScope readCommittedScope;
       if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) {
-        readCommittedScope = new TableStatusReadCommittedScope(identifier);
+        readCommittedScope = new TableStatusReadCommittedScope(identifier, job.getConfiguration());
       } else {
         readCommittedScope = getReadCommittedScope(job.getConfiguration());
         if (readCommittedScope == null) {
-          readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
+          readCommittedScope =
+              new LatestFilesReadCommittedScope(identifier.getTablePath(), job.getConfiguration());
         }
       }
       this.readCommittedScope = readCommittedScope;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 99d8532..2d4f370 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -270,6 +270,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
           TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
 
           throw new RuntimeException(e);
+        } finally {
+          ThreadLocalSessionInfo.unsetAll();
         }
       }
     });
@@ -444,6 +446,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
       } finally {
         executorService.shutdownNow();
         dataLoadExecutor.close();
+        ThreadLocalSessionInfo.unsetAll();
         // clean up the folders and files created locally for data load operation
         TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
index de0d731..40a0a62 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
@@ -112,7 +112,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
             .outputPath(writerPath)
             .isTransactionalTable(false)
             .uniqueIdentifier(System.currentTimeMillis)
-            .buildWriterForCSVInput(Schema.parseJson(schema))
+            .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
         } else {
           if (options != null) {
             builder.outputPath(writerPath)
@@ -120,14 +120,15 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
               .sortBy(sortColumns.toArray)
               .uniqueIdentifier(
                 System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
-              .buildWriterForCSVInput(Schema.parseJson(schema))
+              .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
           } else {
             builder.outputPath(writerPath)
               .isTransactionalTable(false)
               .sortBy(sortColumns.toArray)
               .uniqueIdentifier(
                 System.currentTimeMillis).withBlockSize(2)
-              .buildWriterForCSVInput(Schema.parseJson(schema))
+              .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext
+                .hadoopConfiguration)
           }
         }
       var i = 0
@@ -544,7 +545,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
     try {
       val writer = CarbonWriter.builder
         .outputPath(writerPath).isTransactionalTable(false)
-        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
       var i = 0
       while (i < rows) {
         writer.write(record)
@@ -743,7 +744,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
       .toString()
     val builder = CarbonWriter.builder()
     val writer = builder.outputPath(writerPath)
-      .buildWriterForCSVInput(Schema.parseJson(schema))
+      .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
 
     for (i <- 0 until 5) {
       writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(33000), i.toString))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index a1d4290..63fb2e6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -21,6 +21,7 @@ import java.util
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
@@ -35,6 +36,7 @@ import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.core.scan.expression.{ColumnExpression, LiteralExpression}
@@ -304,7 +306,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
     val resolveFilter: FilterResolverIntf =
       CarbonTable.resolveFilter(andExpression, carbonTable.getAbsoluteTableIdentifier)
     val exprWrapper = DataMapChooser.getDefaultDataMap(carbonTable, resolveFilter)
-    val segment = new Segment("0")
+    val segment = new Segment("0", new TableStatusReadCommittedScope(carbonTable
+      .getAbsoluteTableIdentifier, new Configuration(false)))
     // get the pruned blocklets
     val prunedBlocklets = exprWrapper.prune(List(segment).asJava, null)
     prunedBlocklets.asScala.foreach { blocklet =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 08daa34..1b181bc 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -59,9 +59,11 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
       val writer =
       if (persistSchema) {
         builder.persistSchemaFile(true)
-        builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+        builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema),
+          sqlContext.sparkContext.hadoopConfiguration)
       } else {
-        builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+        builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema),
+          sqlContext.sparkContext.hadoopConfiguration)
       }
 
       var i = 0

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 0b6813f..a03a5eb 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -142,7 +142,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
             .outputPath(writerPath)
             .isTransactionalTable(false)
             .uniqueIdentifier(System.currentTimeMillis)
-            .buildWriterForCSVInput(Schema.parseJson(schema))
+            .buildWriterForCSVInput(Schema.parseJson(schema),
+              sqlContext.sparkContext.hadoopConfiguration)
         } else {
           if (options != null) {
             builder.outputPath(writerPath)
@@ -150,14 +151,16 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
               .sortBy(sortColumns.toArray)
               .uniqueIdentifier(
                 System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
-              .buildWriterForCSVInput(Schema.parseJson(schema))
+              .buildWriterForCSVInput(Schema.parseJson(schema),
+                sqlContext.sparkContext.hadoopConfiguration)
           } else {
             builder.outputPath(writerPath)
               .isTransactionalTable(false)
               .sortBy(sortColumns.toArray)
               .uniqueIdentifier(
                 System.currentTimeMillis).withBlockSize(2)
-              .buildWriterForCSVInput(Schema.parseJson(schema))
+              .buildWriterForCSVInput(Schema.parseJson(schema),
+                sqlContext.sparkContext.hadoopConfiguration)
           }
         }
       var i = 0
@@ -194,7 +197,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         builder.outputPath(writerPath)
           .isTransactionalTable(false)
           .uniqueIdentifier(System.currentTimeMillis()).withBlockSize(2).sortBy(sortColumns)
-          .buildWriterForCSVInput(new Schema(fields))
+          .buildWriterForCSVInput(new Schema(fields),
+            sqlContext.sparkContext.hadoopConfiguration)
 
       var i = 0
       while (i < rows) {
@@ -228,7 +232,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
           .sortBy(sortColumns.toArray)
           .uniqueIdentifier(
             123).withBlockSize(2)
-          .buildWriterForCSVInput(Schema.parseJson(schema))
+          .buildWriterForCSVInput(Schema.parseJson(schema),
+            sqlContext.sparkContext.hadoopConfiguration)
       var i = 0
       while (i < rows) {
         writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -992,7 +997,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val builder: CarbonWriterBuilder = CarbonWriter.builder
       .outputPath(writerPath).isTransactionalTable(false).withLoadOptions(options)
 
-    val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields))
+    val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields),
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(Array("babu","1","02-01-2002","02-01-2002 01:01:00"));
     writer.close()
 
@@ -1117,7 +1123,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     try {
       val writer = CarbonWriter.builder
         .outputPath(writerPath).isTransactionalTable(false)
-        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+        sqlContext.sparkContext.hadoopConfiguration)
       var i = 0
       while (i < rows) {
         writer.write(record)
@@ -2091,7 +2098,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     assert(intercept[RuntimeException] {
       val writer = CarbonWriter.builder.sortBy(Array("name", "id"))
-        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+        sqlContext.sparkContext.hadoopConfiguration)
       writer.write(record)
       writer.close()
     }.getMessage.toLowerCase.contains("column: name specified in sort columns"))
@@ -2131,7 +2139,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
   }
@@ -2169,7 +2178,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder.sortBy(Array("id"))
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
   }
@@ -2213,7 +2223,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
   }
@@ -2253,7 +2264,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -2299,7 +2311,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -2346,7 +2359,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -2366,7 +2380,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val writer: CarbonWriter = CarbonWriter.builder
       .outputPath(writerPath)
       .withTableProperties(options)
-      .buildWriterForCSVInput(new Schema(fields))
+      .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration)
     writer.write(Array("carbon", "1"))
     writer.write(Array("hydrogen", "10"))
     writer.write(Array("boron", "4"))
@@ -2384,7 +2398,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     // write local sort data
     val writer1: CarbonWriter = CarbonWriter.builder
       .outputPath(writerPath)
-      .buildWriterForCSVInput(new Schema(fields))
+      .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration)
     writer1.write(Array("carbon", "1"))
     writer1.write(Array("hydrogen", "10"))
     writer1.write(Array("boron", "4"))
@@ -2493,7 +2507,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     fields(0) = new Field("name", DataTypes.STRING)
     fields(1) = new Field("surname", DataTypes.STRING)
     fields(2) = new Field("age", DataTypes.INT)
-    val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields))
+    val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields),
+      sqlContext.sparkContext.hadoopConfiguration)
     var i = 0
     while (i < 100) {
       {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
index ff5c062..17aae1d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
@@ -98,7 +98,8 @@ class TestNonTransactionalCarbonTableJsonWriter extends QueryTest with BeforeAnd
         .outputPath(writerPath).isTransactionalTable(false)
         .uniqueIdentifier(System.currentTimeMillis())
         .withLoadOptions(options)
-        .buildWriterForJsonInput(carbonSchema)
+        .buildWriterForJsonInput(carbonSchema,
+          sqlContext.sparkContext.hadoopConfiguration)
       writer.write(jsonRow)
       writer.close()
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
index dc13b16..e7fcf95 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
@@ -93,7 +93,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -140,7 +140,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, mySchema)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -184,7 +184,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json, mySchema)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -215,7 +215,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -245,7 +245,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -275,7 +275,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -305,7 +305,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -335,7 +335,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -365,7 +365,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
     val exception1 = intercept[UnsupportedOperationException] {
       val writer = CarbonWriter.builder
-        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
       writer.write(record)
       writer.close()
     }
@@ -402,7 +402,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -438,7 +438,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -476,7 +476,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -509,7 +509,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -568,7 +568,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -663,7 +663,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -716,7 +716,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
 
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -786,7 +786,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     avroRec.put("struct_field_decimal", genericByteArray)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -858,7 +858,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     avroRec.put("dec_fields", genericByteArray)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -905,7 +905,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
 
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false)
+      .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -952,7 +953,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
 
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false)
+      .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -997,7 +999,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val avroRec = new GenericData. Record(nn)
     avroRec.put("id", bytes1)
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1042,7 +1044,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes1)
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1087,7 +1089,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes1)
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1128,7 +1130,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes)
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1170,7 +1172,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     avroRec.put("dec_field", bytes)
     val exception1 = intercept[Exception] {
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     }
@@ -1220,7 +1222,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -1256,7 +1258,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -1306,7 +1308,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
 
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(s"create table sdkOutputTable(union_field struct<union_field0:decimal(10,2),union_field1:int>) " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index 62ba03e..0421ea8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -68,11 +68,13 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
         CarbonWriter.builder
           .outputPath(writerPath).isTransactionalTable(false).enableLocalDictionary(true)
           .localDictionaryThreshold(2000)
-          .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+          .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+          sqlContext.sparkContext.hadoopConfiguration)
       } else {
         CarbonWriter.builder
           .outputPath(writerPath).isTransactionalTable(false)
-          .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+          .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+          sqlContext.sparkContext.hadoopConfiguration)
       }
       var i = 0
       while (i < rows) {
@@ -268,7 +270,8 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
       """.stripMargin
     val pschema= org.apache.avro.Schema.parse(mySchema)
     val records = testUtil.jsonToAvro(jsonvalue, mySchema)
-    val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema)
+    val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(records)
     writer.close()
     sql("DROP TABLE IF EXISTS sdkOutputTable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
index 4e2197d..a8bdb31 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.testsuite.createTable
 import java.io.File
 
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -58,9 +59,11 @@ object TestSparkCarbonFileFormatWithSparkSession {
       val writer =
         if (persistSchema) {
           builder.persistSchemaFile(true)
-          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new
+              Configuration(false))
         } else {
-          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new
+              Configuration(false))
         }
 
       var i = 0

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index 39785a3..6e8e79b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -18,9 +18,10 @@
 package org.apache.carbondata.spark.testsuite.dataload
 
 import scala.collection.JavaConverters._
-
 import java.io.{File, FilenameFilter}
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.reader.CarbonIndexFileReader
 import org.apache.carbondata.core.util.CarbonProperties
@@ -64,7 +65,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
     } else {
       val segment = Segment.getSegment("0", carbonTable.getTablePath)
       val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
-      store.readIndexFiles()
+      store.readIndexFiles(new Configuration(false))
       store.getIndexCarbonFiles.asScala.map(f => new File(f.getAbsolutePath)).toArray
     }
     for (carbonIndexPath <- carbonIndexPaths) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 0c42264..44bc243 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 import java.io.{File, FileWriter}
 
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
@@ -281,7 +282,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     } else {
       val segment = Segment.getSegment("0", carbonTable.getTablePath)
       val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
-      store.readIndexFiles()
+      store.readIndexFiles(new Configuration(false))
       val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
       assertResult(Math.max(4, defaultParallelism) + 1)(size + store.getIndexFilesMap.size())
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 046a2a6..a4bc6f0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -22,12 +22,13 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -71,7 +72,7 @@ class CGDataMapFactory(
     val files = file.listFiles()
     files.map {f =>
       val dataMap: CoarseGrainDataMap = new CGDataMap()
-      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap.init(new DataMapModel(f.getCanonicalPath, new Configuration(false)))
       dataMap
     }.toList.asJava
   }
@@ -83,7 +84,8 @@ class CGDataMapFactory(
   override def getDataMaps(distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = {
     val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
     val dataMap: CoarseGrainDataMap = new CGDataMap()
-    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath, new
+        Configuration(false)))
     Seq(dataMap).asJava
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index b13582b..57b3672 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -22,12 +22,14 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -68,7 +70,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
     val files = file.listFiles()
     files.map { f =>
       val dataMap: FineGrainDataMap = new FGDataMap()
-      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap.init(new DataMapModel(f.getCanonicalPath, new Configuration(false)))
       dataMap
     }.toList.asJava
   }
@@ -79,7 +81,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
   override def getDataMaps(distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= {
     val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
     val dataMap: FineGrainDataMap = new FGDataMap()
-    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath, new Configuration(false)))
     Seq(dataMap).asJava
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 8ebed1f..edd3e9c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -18,9 +18,9 @@
 package org.apache.carbondata.spark.testsuite.datamap
 
 import scala.collection.JavaConverters._
-
 import java.io.{File, FilenameFilter}
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -337,7 +337,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     } else {
       val segment = Segment.getSegment("0", path)
       val store = new SegmentFileStore(path, segment.getSegmentFileName)
-      store.readIndexFiles()
+      store.readIndexFiles(new Configuration(false))
       val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
       assertResult(true)(size > 0)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index 133454a..f4c725e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -18,6 +18,7 @@ package org.apache.carbondata.spark.testsuite.partition
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
@@ -78,7 +79,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
     } else {
       val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
       val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
-      store.readIndexFiles()
+      store.readIndexFiles(new Configuration(false))
       store.getIndexFilesMap.asScala.flatMap(_._2.asScala).map(f => FileFactory.getCarbonFile(f)).toArray
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index e3e8e68..9a0080c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -21,6 +21,7 @@ import java.util
 import java.util.concurrent.{Callable, ExecutorService, Executors}
 
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
 import org.apache.spark.sql.optimizer.CarbonFilters
@@ -352,7 +353,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree")
     val details = SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
     val store = new SegmentFileStore(carbonTable.getTablePath, details(0).getSegmentFile)
-    store.readIndexFiles()
+    store.readIndexFiles(new Configuration(false))
     store.getIndexFiles
     assert(store.getIndexFiles.size() == 10)
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
index af05613..3a650ec 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -117,7 +117,7 @@ class CarbonFileIndex(
       }
       CarbonInputFormat.setReadCommittedScope(
         hadoopConf,
-        new LatestFilesReadCommittedScope(indexFiles))
+        new LatestFilesReadCommittedScope(indexFiles, hadoopConf))
       filter match {
         case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
         case None => None

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index a5e1b39..62d9903 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -78,9 +78,10 @@ class SparkCarbonFileFormat extends FileFormat
   override def inferSchema(sparkSession: SparkSession,
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
+    val conf = sparkSession.sessionState.newHadoopConf()
     val tablePath = options.get("path") match {
       case Some(path) =>
-        FileFactory.checkAndAppendDefaultFs(path, sparkSession.sparkContext.hadoopConfiguration)
+        FileFactory.checkAndAppendDefaultFs(path, conf)
       case _ if files.nonEmpty =>
         FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
       case _ =>
@@ -89,7 +90,8 @@ class SparkCarbonFileFormat extends FileFormat
     if (options.get(CarbonCommonConstants.SORT_COLUMNS).isDefined) {
       throw new UnsupportedOperationException("Cannot use sort columns during infer schema")
     }
-    val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
+    val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""),
+      false, conf)
     val table = CarbonTable.buildFromTableInfo(tableInfo)
     var schema = new StructType
     val fields = tableInfo.getFactTable.getListOfColumns.asScala.map { col =>
@@ -395,7 +397,7 @@ class SparkCarbonFileFormat extends FileFormat
           vectorizedReader
         } else {
           val reader = new CarbonRecordReader(model,
-            new SparkUnsafeRowReadSuport(requiredSchema), null)
+            new SparkUnsafeRowReadSuport(requiredSchema), broadcastedHadoopConf.value.value)
           reader.initialize(split, hadoopAttemptContext)
           reader
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 66c0224..825cdec 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -868,7 +868,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
         builder.outputPath(writerPath)
           .isTransactionalTable(false)
           .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns)
-          .buildWriterForCSVInput(new Schema(fields))
+          .buildWriterForCSVInput(new Schema(fields), spark.sparkContext.hadoopConfiguration)
 
       var i = 0
       while (i < rows) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 26f67f8..43f04b8 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -22,8 +22,8 @@ import java.io.File
 import org.apache.commons.io.FileUtils
 import org.apache.commons.lang.RandomStringUtils
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
-import org.apache.spark.sql.carbondata.datasource.TestUtil._
 import org.apache.spark.util.SparkUtil
+import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
@@ -70,9 +70,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
       val writer =
         if (persistSchema) {
           builder.persistSchemaFile(true)
-          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark
+            .sparkContext.hadoopConfiguration)
         } else {
-          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark
+            .sparkContext.hadoopConfiguration)
         }
 
       var i = 0
@@ -333,7 +335,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
       .toString()
     val builder = CarbonWriter.builder()
     val writer = builder.outputPath(writerPath)
-      .buildWriterForCSVInput(Schema.parseJson(schema))
+      .buildWriterForCSVInput(Schema.parseJson(schema), spark.sessionState.newHadoopConf())
     for (i <- 0 until 3) {
       // write a varchar with 75,000 length
       writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString))
@@ -348,15 +350,12 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
         s"""CREATE TABLE sdkOutputTable (name string, address string, age int)
            |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """
           .stripMargin)
-    } else if (spark.sparkContext.version.startsWith("2.2")) {
+    } else {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon
            |OPTIONS("long_String_columns"="address") LOCATION
            |'$writerPath' """.stripMargin)
-    } else {
-      // TODO. spark2.3 ?
-      assert(false)
     }
     assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1)
     val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList()
@@ -371,14 +370,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
         .sql(
           s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
              |'$writerPath', "long_String_columns" "address") """.stripMargin)
-    } else if (spark.sparkContext.version.startsWith("2.2")) {
+    } else {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
            |("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin)
-    } else {
-      // TODO. spark2.3 ?
-      assert(false)
     }
     assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1)
     val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0fd4e34..57887a7 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -436,7 +436,7 @@ object CarbonDataRDDFactory {
         res.foreach { resultOfSeg =>
           resultSize = resultSize + resultOfSeg.size
           resultOfSeg.foreach { resultOfBlock =>
-            segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null))
+            segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName))
           }
         }
         val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index b77632d..4921b33 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -159,14 +159,13 @@ object DeleteExecution {
         resultOfBlock => {
           if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
             blockUpdateDetailsList.add(resultOfBlock._2._1)
-            segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName, null))
+            segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName))
             // if this block is invalid then decrement block count in map.
             if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
               CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
                 blockMappingVO.getSegmentNumberOfBlockMapping)
             }
-          }
-          else {
+          } else {
             // In case of failure , clean all related delete delta files
             CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
             LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index e3da86d..2951283 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1202,7 +1202,7 @@ public final class CarbonDataMergerUtil {
     segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
 
     CarbonFile[] deleteDeltaFiles =
-        segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg, null), blockName);
+        segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg), blockName);
 
     String destFileName =
         blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 19353d1..f6cc485 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -326,7 +326,7 @@ public final class CarbonLoaderUtil {
 
         for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
           // if the segments is in the list of marked for delete then update the status.
-          if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName(), null))) {
+          if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName()))) {
             detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
           } else if (segmentFilesTobeUpdated
               .contains(Segment.toSegment(detail.getLoadName(), null))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index dd70cc9..a183197 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -74,8 +74,7 @@ public class AvroCarbonWriter extends CarbonWriter {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(CarbonTable.class.getName());
 
-  AvroCarbonWriter(CarbonLoadModel loadModel) throws IOException {
-    Configuration hadoopConf = new Configuration();
+  AvroCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws IOException {
     CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
     CarbonTableOutputFormat format = new CarbonTableOutputFormat();
     JobID jobId = new JobID(UUID.randomUUID().toString(), 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
index 627e060..a8899a7 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
@@ -46,8 +46,7 @@ class CSVCarbonWriter extends CarbonWriter {
   private TaskAttemptContext context;
   private ObjectArrayWritable writable;
 
-  CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException {
-    Configuration hadoopConf = new Configuration();
+  CSVCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws IOException {
     CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
     CarbonTableOutputFormat format = new CarbonTableOutputFormat();
     JobID jobId = new JobID(UUID.randomUUID().toString(), 0);


[3/3] carbondata git commit: [CARBONDATA-2909] Multi user support for SDK on S3

Posted by ra...@apache.org.
[CARBONDATA-2909] Multi user support for SDK on S3

Added support for multiple users with different SK/AK to write concurrently to S3.
Make it mandatory for user to give Hadoop configuration while creating SDK writer/reader.
Passed hadoop configuration to core layer so that FileFactory can access it.
Fixed various SK/AK not found exceptions in CarbonSparkFileFormat.

This closes #2678


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8f1a029b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8f1a029b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8f1a029b

Branch: refs/heads/master
Commit: 8f1a029b9ad82cb3d1972e63be2055c84895661b
Parents: 7c827c0
Author: kunal642 <ku...@gmail.com>
Authored: Fri Aug 31 11:12:30 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Sep 11 17:46:05 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapUtil.java    | 10 ++-
 .../apache/carbondata/core/datamap/Segment.java | 11 +++
 .../carbondata/core/datamap/dev/DataMap.java    |  3 +-
 .../core/datamap/dev/DataMapModel.java          | 12 ++-
 .../filesystem/AbstractDFSCarbonFile.java       | 18 +++--
 .../core/datastore/impl/FileFactory.java        |  5 ++
 .../indexstore/BlockletDataMapIndexStore.java   | 24 ++++--
 .../indexstore/BlockletDataMapIndexWrapper.java | 14 +++-
 .../TableBlockIndexUniqueIdentifierWrapper.java | 19 +++++
 .../indexstore/blockletindex/BlockDataMap.java  |  6 +-
 .../blockletindex/BlockletDataMapFactory.java   |  2 +-
 .../blockletindex/BlockletDataMapModel.java     | 12 +--
 .../blockletindex/SegmentIndexFileStore.java    | 37 ++++++---
 .../core/metadata/SegmentFileStore.java         | 22 +++---
 .../core/metadata/schema/SchemaReader.java      | 19 ++++-
 .../core/mutate/CarbonUpdateUtil.java           |  6 +-
 .../LatestFilesReadCommittedScope.java          | 25 +++++--
 .../core/readcommitter/ReadCommittedScope.java  |  6 ++
 .../TableStatusReadCommittedScope.java          | 19 ++++-
 .../core/reader/CarbonHeaderReader.java         | 13 +++-
 .../core/reader/CarbonIndexFileReader.java      | 12 ++-
 .../carbondata/core/reader/ThriftReader.java    | 20 ++++-
 .../statusmanager/SegmentStatusManager.java     | 23 +++---
 .../util/AbstractDataFileFooterConverter.java   | 15 +++-
 .../core/util/BlockletDataMapUtil.java          | 12 +--
 .../apache/carbondata/core/util/CarbonUtil.java | 47 +++---------
 .../core/util/DataFileFooterConverter.java      | 10 +++
 .../core/util/DataFileFooterConverter2.java     | 13 +++-
 .../core/util/DataFileFooterConverterV3.java    | 11 +++
 .../core/writer/CarbonIndexFileMergeWriter.java |  4 +-
 .../TestBlockletDataMapFactory.java             |  8 +-
 .../carbondata/core/util/CarbonUtilTest.java    |  4 +-
 .../core/util/DataFileFooterConverterTest.java  |  3 +-
 .../bloom/BloomCoarseGrainDataMapFactory.java   |  5 +-
 .../datamap/bloom/BloomDataMapModel.java        |  7 +-
 .../lucene/LuceneFineGrainDataMapFactory.java   |  7 +-
 docs/sdk-guide.md                               | 21 ++++--
 .../examples/sdk/CarbonReaderExample.java       |  7 +-
 .../carbondata/examples/sdk/SDKS3Example.java   |  6 +-
 .../carbondata/examples/DirectSQLExample.scala  |  8 +-
 .../carbondata/examples/S3UsingSDkExample.scala |  5 +-
 .../hadoop/api/CarbonFileInputFormat.java       |  7 +-
 .../hadoop/api/CarbonOutputCommitter.java       |  8 +-
 .../hadoop/api/CarbonTableInputFormat.java      | 10 ++-
 .../hadoop/api/CarbonTableOutputFormat.java     |  3 +
 .../sdv/generated/SDKwriterTestCase.scala       | 11 +--
 ...ithColumnMetCacheAndCacheLevelProperty.scala |  5 +-
 ...FileInputFormatWithExternalCarbonTable.scala |  6 +-
 .../TestNonTransactionalCarbonTable.scala       | 49 +++++++-----
 ...tNonTransactionalCarbonTableJsonWriter.scala |  3 +-
 ...ansactionalCarbonTableWithAvroDataType.scala | 58 +++++++-------
 ...ransactionalCarbonTableWithComplexType.scala |  9 ++-
 ...tSparkCarbonFileFormatWithSparkSession.scala |  7 +-
 .../dataload/TestDataLoadWithFileName.scala     |  5 +-
 .../dataload/TestGlobalSortDataLoad.scala       |  3 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   |  8 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |  8 +-
 .../testsuite/datamap/TestDataMapCommand.scala  |  4 +-
 .../TestDataLoadingForPartitionTable.scala      |  3 +-
 .../StandardPartitionTableLoadingTestCase.scala |  3 +-
 .../execution/datasources/CarbonFileIndex.scala |  2 +-
 .../datasources/SparkCarbonFileFormat.scala     |  8 +-
 .../datasource/SparkCarbonDataSourceTest.scala  |  2 +-
 ...tCreateTableUsingSparkCarbonFileFormat.scala | 20 ++---
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +-
 .../command/mutation/DeleteExecution.scala      |  5 +-
 .../processing/merger/CarbonDataMergerUtil.java |  2 +-
 .../processing/util/CarbonLoaderUtil.java       |  2 +-
 .../carbondata/sdk/file/AvroCarbonWriter.java   |  3 +-
 .../carbondata/sdk/file/CSVCarbonWriter.java    |  3 +-
 .../sdk/file/CarbonReaderBuilder.java           |  5 +-
 .../sdk/file/CarbonWriterBuilder.java           | 38 ++++------
 .../carbondata/sdk/file/JsonCarbonWriter.java   |  8 +-
 .../sdk/file/AvroCarbonWriterTest.java          | 14 ++--
 .../sdk/file/CSVCarbonWriterTest.java           |  8 +-
 .../CSVNonTransactionalCarbonWriterTest.java    |  4 +-
 .../carbondata/sdk/file/CarbonReaderTest.java   | 79 ++++++++++----------
 .../sdk/file/ConcurrentAvroSdkWriterTest.java   |  6 +-
 .../sdk/file/ConcurrentSdkWriterTest.java       |  5 +-
 .../apache/carbondata/sdk/file/TestUtil.java    |  5 +-
 .../store/worker/SearchRequestHandler.java      |  3 +-
 81 files changed, 616 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index e015052..60c5233 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -99,7 +100,7 @@ public class DataMapUtil {
     }
     String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
     SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
-        getValidAndInvalidSegments(carbonTable);
+        getValidAndInvalidSegments(carbonTable, FileFactory.getConfiguration());
     List<Segment> validSegments = validAndInvalidSegmentsInfo.getValidSegments();
     List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
     DataMapExprWrapper dataMapExprWrapper = null;
@@ -140,7 +141,7 @@ public class DataMapUtil {
       List<PartitionSpec> partitionsToPrune) throws IOException {
     String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
     SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
-        getValidAndInvalidSegments(carbonTable);
+        getValidAndInvalidSegments(carbonTable, validSegments.get(0).getConfiguration());
     List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
     DistributableDataMapFormat dataMapFormat =
         createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments,
@@ -152,8 +153,9 @@ public class DataMapUtil {
   }
 
   private static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
-      CarbonTable carbonTable) throws IOException {
-    SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
+      CarbonTable carbonTable, Configuration configuration) throws IOException {
+    SegmentStatusManager ssm =
+        new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration);
     return ssm.getValidAndInvalidSegments();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 30e811a..85445eb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -32,6 +32,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Represents one load of carbondata
  */
@@ -64,6 +66,11 @@ public class Segment implements Serializable {
     this.segmentNo = segmentNo;
   }
 
+  public Segment(String segmentNo, ReadCommittedScope readCommittedScope) {
+    this.segmentNo = segmentNo;
+    this.readCommittedScope = readCommittedScope;
+  }
+
   /**
    * ReadCommittedScope will be null. So getCommittedIndexFile will not work and will throw
    * a NullPointerException. In case getCommittedIndexFile is need to be accessed then
@@ -202,6 +209,10 @@ public class Segment implements Serializable {
     return null;
   }
 
+  public Configuration getConfiguration() {
+    return readCommittedScope.getConfiguration();
+  }
+
   public Set<String> getFilteredIndexShardNames() {
     return filteredIndexShardNames;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 456776b..47eeafe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -37,7 +37,8 @@ public interface DataMap<T extends Blocklet> {
   /**
    * It is called to load the data map to memory or to initialize it.
    */
-  void init(DataMapModel dataMapModel) throws MemoryException, IOException;
+  void init(DataMapModel dataMapModel)
+      throws MemoryException, IOException;
 
   /**
    * Prune the datamap with resolved filter expression and partition information.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
index 76bbeee..5f4d1dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datamap.dev;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Information required to build datamap
  */
@@ -24,11 +26,19 @@ public class DataMapModel {
 
   private String filePath;
 
-  public DataMapModel(String filePath) {
+  private Configuration configuration;
+
+  public DataMapModel(String filePath, Configuration configuration) {
     this.filePath = filePath;
+    this.configuration = configuration;
   }
 
   public String getFilePath() {
     return filePath;
   }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 5128022..41215a3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -282,7 +282,12 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
   @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
       int bufferSize, Configuration hadoopConf) throws IOException {
     return getDataInputStream(path, fileType, bufferSize,
-        CarbonUtil.inferCompressorFromFileName(path));
+        CarbonUtil.inferCompressorFromFileName(path), hadoopConf);
+  }
+
+  @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, String compressor) throws IOException {
+    return getDataInputStream(path, fileType, bufferSize, FileFactory.getConfiguration());
   }
 
   /**
@@ -305,12 +310,12 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     return new DataInputStream(new BufferedInputStream(stream));
   }
 
-  @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
-      int bufferSize, String compressor) throws IOException {
+  private DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, String compressor, Configuration configuration) throws IOException {
     path = path.replace("\\", "/");
     Path pt = new Path(path);
     InputStream inputStream;
-    FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+    FileSystem fs = pt.getFileSystem(configuration);
     if (bufferSize <= 0) {
       inputStream = fs.open(pt);
     } else {
@@ -509,7 +514,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     RemoteIterator<LocatedFileStatus> listStatus = null;
     if (null != fileStatus && fileStatus.isDirectory()) {
       Path path = fileStatus.getPath();
-      listStatus = path.getFileSystem(FileFactory.getConfiguration()).listFiles(path, recursive);
+      listStatus = fs.listFiles(path, recursive);
     } else {
       return new ArrayList<CarbonFile>();
     }
@@ -521,8 +526,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     if (null != fileStatus && fileStatus.isDirectory()) {
       List<FileStatus> listStatus = new ArrayList<>();
       Path path = fileStatus.getPath();
-      RemoteIterator<LocatedFileStatus> iter =
-          path.getFileSystem(FileFactory.getConfiguration()).listLocatedStatus(path);
+      RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
       while (iter.hasNext()) {
         LocatedFileStatus fileStatus = iter.next();
         if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index b462e0c..e8f6cfb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -111,6 +111,11 @@ public final class FileFactory {
     return getDataInputStream(path, fileType, -1);
   }
 
+  public static DataInputStream getDataInputStream(String path, FileType fileType,
+      Configuration configuration) throws IOException {
+    return getDataInputStream(path, fileType, -1, configuration);
+  }
+
   public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
       throws IOException {
     return getDataInputStream(path, fileType, bufferSize, getConfiguration());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index fa84f30..323899e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -40,6 +40,8 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Class to handle loading, unloading,clearing,storing of the table
  * blocks
@@ -87,7 +89,8 @@ public class BlockletDataMapIndexStore
     List<BlockDataMap> dataMaps = new ArrayList<>();
     if (blockletDataMapIndexWrapper == null) {
       try {
-        SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+        SegmentIndexFileStore indexFileStore =
+            new SegmentIndexFileStore(identifierWrapper.getConfiguration());
         Set<String> filesRead = new HashSet<>();
         String segmentFilePath = identifier.getIndexFilePath();
         if (segInfoCache == null) {
@@ -97,7 +100,8 @@ public class BlockletDataMapIndexStore
             segInfoCache.get(segmentFilePath);
         if (carbonDataFileBlockMetaInfoMapping == null) {
           carbonDataFileBlockMetaInfoMapping =
-              BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
+              BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath,
+                  identifierWrapper.getConfiguration());
           segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping);
         }
         // if the identifier is not a merge file we can directly load the datamaps
@@ -107,10 +111,12 @@ public class BlockletDataMapIndexStore
                   carbonDataFileBlockMetaInfoMapping);
           BlockDataMap blockletDataMap =
               loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
-                  identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe());
+                  identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe(),
+                  identifierWrapper.getConfiguration());
           dataMaps.add(blockletDataMap);
           blockletDataMapIndexWrapper =
-              new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps);
+              new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps,
+                  identifierWrapper.getConfiguration());
         } else {
           // if the identifier is a merge file then collect the index files and load the datamaps
           List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
@@ -125,12 +131,14 @@ public class BlockletDataMapIndexStore
               BlockDataMap blockletDataMap =
                   loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
                       identifierWrapper.getCarbonTable(),
-                      identifierWrapper.isAddTableBlockToUnsafe());
+                      identifierWrapper.isAddTableBlockToUnsafe(),
+                      identifierWrapper.getConfiguration());
               dataMaps.add(blockletDataMap);
             }
           }
           blockletDataMapIndexWrapper =
-              new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps);
+              new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps,
+                  identifierWrapper.getConfiguration());
         }
         lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
             blockletDataMapIndexWrapper.getMemorySize());
@@ -265,7 +273,7 @@ public class BlockletDataMapIndexStore
    */
   private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier identifier,
       SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> blockMetaInfoMap,
-      CarbonTable carbonTable, boolean addTableBlockToUnsafe)
+      CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration configuration)
       throws IOException, MemoryException {
     String uniqueTableSegmentIdentifier =
         identifier.getUniqueTableSegmentIdentifier();
@@ -279,7 +287,7 @@ public class BlockletDataMapIndexStore
       dataMap.init(new BlockletDataMapModel(carbonTable,
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
               .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
-          blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe));
+          blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe, configuration));
     }
     return dataMap;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
index 2cf0259..7b8a13b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -24,19 +24,27 @@ import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * A cacheable wrapper of datamaps
  */
 public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
 
+  private static final long serialVersionUID = -2859075086955465810L;
+
   private List<BlockDataMap> dataMaps;
 
   private String segmentId;
 
+  private transient Configuration configuration;
+
   // size of the wrapper. basically the total size of the datamaps this wrapper is holding
   private long wrapperSize;
 
-  public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps) {
+  public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps, Configuration
+      configuration) {
+    this.configuration = configuration;
     this.dataMaps = dataMaps;
     this.wrapperSize = 0L;
     this.segmentId = segmentId;
@@ -72,4 +80,8 @@ public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
   public String getSegmentId() {
     return segmentId;
   }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
index 0924f1f..77756de 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
@@ -19,8 +19,11 @@ package org.apache.carbondata.core.indexstore;
 
 import java.io.Serializable;
 
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Class holds reference to TableBlockIndexUniqueIdentifier and carbonTable related info
  * This is just a wrapper passed between methods like a context, This object must never be cached.
@@ -35,6 +38,8 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
 
   // holds the reference to CarbonTable
   private CarbonTable carbonTable;
+
+  private transient Configuration configuration;
   /**
    * flag to specify whether to load table block metadata in unsafe or safe. Default value is true
    */
@@ -44,6 +49,15 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
       TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable) {
     this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
     this.carbonTable = carbonTable;
+    this.configuration = FileFactory.getConfiguration();
+  }
+
+  public TableBlockIndexUniqueIdentifierWrapper(
+      TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable,
+      Configuration configuration) {
+    this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
+    this.carbonTable = carbonTable;
+    this.configuration = configuration;
   }
 
   // Note: The constructor is getting used in extensions with other functionalities.
@@ -53,6 +67,7 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
       boolean addTableBlockToUnsafe) {
     this(tableBlockIndexUniqueIdentifier, carbonTable);
     this.addTableBlockToUnsafe = addTableBlockToUnsafe;
+    this.configuration = FileFactory.getConfiguration();
   }
 
 
@@ -67,4 +82,8 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
   public boolean isAddTableBlockToUnsafe() {
     return addTableBlockToUnsafe;
   }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 2dbf6a0..57c92c6 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -103,11 +103,13 @@ public class BlockDataMap extends CoarseGrainDataMap
    */
   protected boolean isFilePathStored;
 
-  @Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
+  @Override public void init(DataMapModel dataMapModel)
+      throws IOException, MemoryException {
     long startTime = System.currentTimeMillis();
     assert (dataMapModel instanceof BlockletDataMapModel);
     BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel;
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    DataFileFooterConverter fileFooterConverter =
+        new DataFileFooterConverter(dataMapModel.getConfiguration());
     List<DataFileFooter> indexInfo = fileFooterConverter
         .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData(),
             blockletDataMapInfo.getCarbonTable().isTransactionalTable());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index da2fa39..e16c3cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -137,7 +137,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
         tableBlockIndexUniqueIdentifierWrappers.add(
             new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
-                this.getCarbonTable()));
+                this.getCarbonTable(), segment.getConfiguration()));
       }
     }
     List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index 180c812..0a75d59 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -22,6 +22,8 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * It is the model object to keep the information to build or initialize BlockletDataMap.
  */
@@ -37,9 +39,9 @@ public class BlockletDataMapModel extends DataMapModel {
 
   private boolean addToUnsafe = true;
 
-  public BlockletDataMapModel(CarbonTable carbonTable, String filePath,
-      byte[] fileData, Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) {
-    super(filePath);
+  public BlockletDataMapModel(CarbonTable carbonTable, String filePath, byte[] fileData,
+      Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, Configuration configuration) {
+    super(filePath, configuration);
     this.fileData = fileData;
     this.blockMetaInfoMap = blockMetaInfoMap;
     this.segmentId = segmentId;
@@ -48,8 +50,8 @@ public class BlockletDataMapModel extends DataMapModel {
 
   public BlockletDataMapModel(CarbonTable carbonTable, String filePath,
       byte[] fileData, Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId,
-      boolean addToUnsafe) {
-    this(carbonTable, filePath, fileData, blockMetaInfoMap, segmentId);
+      boolean addToUnsafe, Configuration configuration) {
+    this(carbonTable, filePath, fileData, blockMetaInfoMap, segmentId, configuration);
     this.addToUnsafe = addToUnsafe;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 25cfc26..f19c9c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.MergedBlockIndex;
 import org.apache.carbondata.format.MergedBlockIndexHeader;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 
 /**
@@ -76,10 +77,20 @@ public class SegmentIndexFileStore {
    */
   private Map<String, List<String>> carbonMergeFileToIndexFilesMap;
 
+  private Configuration configuration;
+
   public SegmentIndexFileStore() {
     carbonIndexMap = new HashMap<>();
     carbonIndexMapWithFullPath = new TreeMap<>();
     carbonMergeFileToIndexFilesMap = new HashMap<>();
+    configuration = FileFactory.getConfiguration();
+  }
+
+  public SegmentIndexFileStore(Configuration configuration) {
+    carbonIndexMap = new HashMap<>();
+    carbonIndexMapWithFullPath = new TreeMap<>();
+    carbonMergeFileToIndexFilesMap = new HashMap<>();
+    this.configuration = configuration;
   }
 
   /**
@@ -89,7 +100,7 @@ public class SegmentIndexFileStore {
    * @throws IOException
    */
   public void readAllIIndexOfSegment(String segmentPath) throws IOException {
-    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath, configuration);
     for (int i = 0; i < carbonIndexFiles.length; i++) {
       if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
         readMergeFile(carbonIndexFiles[i].getCanonicalPath());
@@ -155,7 +166,8 @@ public class SegmentIndexFileStore {
    * @throws IOException
    */
   public void readAllIndexAndFillBolckletInfo(String segmentPath) throws IOException {
-    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+    CarbonFile[] carbonIndexFiles =
+        getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
     for (int i = 0; i < carbonIndexFiles.length; i++) {
       if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
         readMergeFile(carbonIndexFiles[i].getCanonicalPath());
@@ -190,7 +202,8 @@ public class SegmentIndexFileStore {
    * @throws IOException
    */
   public Map<String, String> getIndexFilesFromSegment(String segmentPath) throws IOException {
-    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+    CarbonFile[] carbonIndexFiles =
+        getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
     Map<String, String> indexFiles = new HashMap<>();
     for (int i = 0; i < carbonIndexFiles.length; i++) {
       if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
@@ -216,7 +229,8 @@ public class SegmentIndexFileStore {
    */
   public Map<String, String> getMergeOrIndexFilesFromSegment(String segmentPath)
       throws IOException {
-    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+    CarbonFile[] carbonIndexFiles =
+        getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
     Map<String, String> indexFiles = new HashMap<>();
     for (int i = 0; i < carbonIndexFiles.length; i++) {
       if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
@@ -251,7 +265,7 @@ public class SegmentIndexFileStore {
    * @throws IOException
    */
   public void readMergeFile(String mergeFilePath) throws IOException {
-    ThriftReader thriftReader = new ThriftReader(mergeFilePath);
+    ThriftReader thriftReader = new ThriftReader(mergeFilePath, configuration);
     try {
       thriftReader.open();
       MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
@@ -259,7 +273,7 @@ public class SegmentIndexFileStore {
       List<String> file_names = indexHeader.getFile_names();
       carbonMergeFileToIndexFilesMap.put(mergeFilePath, file_names);
       List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
-      CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
+      CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath, configuration);
       String mergeFileAbsolutePath = mergeFile.getParentFile().getAbsolutePath();
       assert (file_names.size() == fileData.size());
       for (int i = 0; i < file_names.size(); i++) {
@@ -282,8 +296,8 @@ public class SegmentIndexFileStore {
    */
   private void readIndexFile(CarbonFile indexFile) throws IOException {
     String indexFilePath = indexFile.getCanonicalPath();
-    DataInputStream dataInputStream =
-        FileFactory.getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath));
+    DataInputStream dataInputStream = FileFactory
+        .getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath), configuration);
     byte[] bytes = new byte[(int) indexFile.getSize()];
     try {
       dataInputStream.readFully(bytes);
@@ -362,8 +376,8 @@ public class SegmentIndexFileStore {
    * @param segmentPath
    * @return
    */
-  public static CarbonFile[] getCarbonIndexFiles(String segmentPath) {
-    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+  public static CarbonFile[] getCarbonIndexFiles(String segmentPath, Configuration configuration) {
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, configuration);
     return carbonFile.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
         return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
@@ -422,7 +436,8 @@ public class SegmentIndexFileStore {
       indexReader.openThriftReader(indexFile.getCanonicalPath());
       // get the index header
       org.apache.carbondata.format.IndexHeader indexHeader = indexReader.readIndexHeader();
-      DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+      DataFileFooterConverter fileFooterConverter =
+          new DataFileFooterConverter(FileFactory.getConfiguration());
       String filePath = FileFactory.getUpdatedFilePath(indexFile.getCanonicalPath());
       String parentPath =
           filePath.substring(0, filePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 1acf0ea..44a2f7e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -49,6 +49,7 @@ import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -469,8 +470,8 @@ public class SegmentFileStore {
    * @readSegment method before calling it.
    * @throws IOException
    */
-  public void readIndexFiles() throws IOException {
-    readIndexFiles(SegmentStatus.SUCCESS, false);
+  public void readIndexFiles(Configuration configuration) throws IOException {
+    readIndexFiles(SegmentStatus.SUCCESS, false, configuration);
   }
 
   public SegmentFile getSegmentFile() {
@@ -484,8 +485,8 @@ public class SegmentFileStore {
    * @param ignoreStatus
    * @throws IOException
    */
-  private List<String> readIndexFiles(SegmentStatus status, boolean ignoreStatus)
-      throws IOException {
+  private List<String> readIndexFiles(SegmentStatus status, boolean ignoreStatus,
+      Configuration configuration) throws IOException {
     if (indexFilesMap != null) {
       return new ArrayList<>();
     }
@@ -494,7 +495,7 @@ public class SegmentFileStore {
     indexFilesMap = new HashMap<>();
     indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, ignoreStatus);
     Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(configuration);
     for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
       List<DataFileFooter> indexInfo =
           fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
@@ -538,8 +539,8 @@ public class SegmentFileStore {
     Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
     for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
-      List<DataFileFooter> indexInfo =
-          fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+      List<DataFileFooter> indexInfo = fileFooterConverter
+          .getIndexInfo(entry.getKey(), entry.getValue());
       if (indexInfo.size() > 0) {
         schemaMap.put(entry.getKey(), indexInfo.get(0).getColumnInTable());
       }
@@ -733,8 +734,8 @@ public class SegmentFileStore {
         // take the list of files from this segment.
         SegmentFileStore fileStore =
             new SegmentFileStore(table.getTablePath(), segment.getSegmentFile());
-        List<String> indexOrMergeFiles =
-            fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
+        List<String> indexOrMergeFiles = fileStore
+            .readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false, FileFactory.getConfiguration());
         if (forceDelete) {
           deletePhysicalPartition(
               partitionSpecs,
@@ -791,7 +792,8 @@ public class SegmentFileStore {
       List<PartitionSpec> partitionSpecs,
       SegmentUpdateStatusManager updateStatusManager) throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
-    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
+    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+        FileFactory.getConfiguration());
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
       FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey()));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index 57370f6..d0bc976 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -29,6 +29,8 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * TODO: It should be removed after store manager implementation.
  */
@@ -81,10 +83,25 @@ public class SchemaReader {
   }
 
   public static TableInfo inferSchema(AbsoluteTableIdentifier identifier,
+      boolean isCarbonFileProvider, Configuration configuration) throws IOException {
+
+    org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
+        .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider,
+            configuration);
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+        tableInfo, identifier.getDatabaseName(), identifier.getTableName(),
+        identifier.getTablePath());
+    wrapperTableInfo.setTransactionalTable(false);
+    return wrapperTableInfo;
+  }
+
+  public static TableInfo inferSchema(AbsoluteTableIdentifier identifier,
       boolean isCarbonFileProvider) throws IOException {
 
     org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
-        .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider);
+        .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider,
+            FileFactory.getConfiguration());
     SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
     TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         tableInfo, identifier.getDatabaseName(), identifier.getTableName(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 7df3937..d52eeb2 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -228,7 +228,7 @@ public class CarbonUpdateUtil {
             }
 
             // if the segments is in the list of marked for delete then update the status.
-            if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName(), null))) {
+            if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName()))) {
               loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
               loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp));
             }
@@ -391,7 +391,7 @@ public class CarbonUpdateUtil {
     List<String> dataFiles = new ArrayList<>();
     if (segment.getSegmentFileName() != null) {
       SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
-      fileStore.readIndexFiles();
+      fileStore.readIndexFiles(FileFactory.getConfiguration());
       Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
       List<String> dataFilePaths = new ArrayList<>();
       for (List<String> paths : indexFilesMap.values()) {
@@ -737,7 +737,7 @@ public class CarbonUpdateUtil {
     for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) {
 
       if (eachSeg.getValue() == 0) {
-        segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), null));
+        segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), ""));
       }
 
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index abd9c2c..9dafed9 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -36,6 +36,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * This is a readCommittedScope for non transactional carbon table
  */
@@ -43,10 +45,12 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 @InterfaceStability.Stable
 public class LatestFilesReadCommittedScope implements ReadCommittedScope {
 
+  private static final long serialVersionUID = -839970494288861816L;
   private String carbonFilePath;
   private String segmentId;
   private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
   private LoadMetadataDetails[] loadMetadataDetails;
+  private transient Configuration configuration;
 
   /**
    * a new constructor of this class
@@ -54,7 +58,9 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
    * @param path      carbon file path
    * @param segmentId segment id
    */
-  public LatestFilesReadCommittedScope(String path, String segmentId) throws IOException {
+  public LatestFilesReadCommittedScope(String path, String segmentId, Configuration configuration)
+      throws IOException {
+    this.configuration = configuration;
     Objects.requireNonNull(path);
     this.carbonFilePath = path;
     this.segmentId = segmentId;
@@ -66,8 +72,9 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
    *
    * @param path carbon file path
    */
-  public LatestFilesReadCommittedScope(String path) throws IOException {
-    this(path, null);
+  public LatestFilesReadCommittedScope(String path, Configuration configuration)
+      throws IOException {
+    this(path, null, configuration);
   }
 
   /**
@@ -75,7 +82,8 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
    *
    * @param indexFiles carbon index files
    */
-  public LatestFilesReadCommittedScope(CarbonFile[] indexFiles) {
+  public LatestFilesReadCommittedScope(CarbonFile[] indexFiles, Configuration configuration) {
+    this.configuration = configuration;
     takeCarbonIndexFileSnapShot(indexFiles);
   }
 
@@ -180,7 +188,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
         carbonIndexFiles = indexFiles.toArray(new CarbonFile[0]);
       } else {
         String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId);
-        carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+        carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, configuration);
       }
       if (carbonIndexFiles.length == 0) {
         throw new IOException(
@@ -232,4 +240,11 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
     prepareLoadMetadata();
   }
 
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  @Override public void setConfiguration(Configuration configuration) {
+    this.configuration = configuration;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
index cbcf173..aea7e97 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -27,6 +27,8 @@ import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * ReadCommitted interface that defines a read scope.
  */
@@ -48,4 +50,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
       throws IOException;
 
   void takeCarbonIndexFileSnapShot() throws IOException;
+
+  Configuration getConfiguration();
+
+  void setConfiguration(Configuration configuration);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 1f61aab..ac0d156 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -31,6 +31,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * ReadCommittedScope for the managed carbon table
  */
@@ -38,18 +40,24 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 @InterfaceStability.Stable
 public class TableStatusReadCommittedScope implements ReadCommittedScope {
 
+  private static final long serialVersionUID = 2324397174595872738L;
   private LoadMetadataDetails[] loadMetadataDetails;
 
   private AbsoluteTableIdentifier identifier;
 
-  public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier) throws IOException {
+  private transient Configuration configuration;
+
+  public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
+      Configuration configuration) throws IOException {
     this.identifier = identifier;
+    this.configuration = configuration;
     takeCarbonIndexFileSnapShot();
   }
 
   public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
-      LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+      LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) throws IOException {
     this.identifier = identifier;
+    this.configuration = configuration;
     this.loadMetadataDetails = loadMetadataDetails;
   }
 
@@ -97,4 +105,11 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
         .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
   }
 
+  @Override public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  @Override public void setConfiguration(Configuration configuration) {
+    this.configuration = configuration;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
index dfd5815..d3d9177 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
@@ -20,11 +20,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.format.FileHeader;
 
 import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 
 /**
@@ -36,10 +38,17 @@ public class CarbonHeaderReader {
   //Fact file path
   private String filePath;
 
+  private Configuration configuration;
+
   public CarbonHeaderReader(String filePath) {
     this.filePath = filePath;
   }
 
+  public CarbonHeaderReader(String filePath, Configuration configuration) {
+    this.filePath = filePath;
+    this.configuration = configuration;
+  }
+
   /**
    * It reads the metadata in FileFooter thrift object format.
    *
@@ -62,12 +71,12 @@ public class CarbonHeaderReader {
    * @throws IOException
    */
   private ThriftReader openThriftReader(String filePath) {
-
+    Configuration conf = configuration != null ? configuration : FileFactory.getConfiguration();
     return new ThriftReader(filePath, new ThriftReader.TBaseCreator() {
       @Override public TBase create() {
         return new FileHeader();
       }
-    });
+    }, conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
index 4617a12..27eaa32 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.IndexHeader;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 
 /**
@@ -28,6 +29,15 @@ import org.apache.thrift.TBase;
  */
 public class CarbonIndexFileReader {
 
+  private Configuration configuration;
+
+  public CarbonIndexFileReader() {
+
+  }
+
+  public CarbonIndexFileReader(Configuration configuration) {
+    this.configuration = configuration;
+  }
   /**
    * reader
    */
@@ -75,7 +85,7 @@ public class CarbonIndexFileReader {
    * @throws IOException
    */
   public void openThriftReader(String filePath) throws IOException {
-    thriftReader = new ThriftReader(filePath);
+    thriftReader = new ThriftReader(filePath, configuration);
     thriftReader.open();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
index 221a285..48d8345 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -55,6 +56,8 @@ public class ThriftReader {
    */
   private TProtocol binaryIn;
 
+  private Configuration configuration;
+
   /**
    * Constructor.
    */
@@ -63,6 +66,12 @@ public class ThriftReader {
     this.creator = creator;
   }
 
+  public ThriftReader(String fileName, TBaseCreator creator, Configuration configuration) {
+    this.fileName = fileName;
+    this.configuration = configuration;
+    this.creator = creator;
+  }
+
   /**
    * Constructor.
    */
@@ -73,6 +82,14 @@ public class ThriftReader {
   /**
    * Constructor.
    */
+  public ThriftReader(String fileName, Configuration configuration) {
+    this.fileName = fileName;
+    this.configuration = configuration;
+  }
+
+  /**
+   * Constructor.
+   */
   public ThriftReader(byte[] fileData) {
     dataInputStream = new DataInputStream(new ByteArrayInputStream(fileData));
     binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream));
@@ -82,8 +99,9 @@ public class ThriftReader {
    * Opens the fileName for reading.
    */
   public void open() throws IOException {
+    Configuration conf = configuration != null ? configuration : FileFactory.getConfiguration();
     FileFactory.FileType fileType = FileFactory.getFileType(fileName);
-    dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize);
+    dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize, conf);
     binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream));
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index fdce76b..f1ee877 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -55,6 +55,7 @@ import org.apache.carbondata.core.util.DeleteLoadFolders;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Manages Load/Segment status
@@ -66,8 +67,16 @@ public class SegmentStatusManager {
 
   private AbsoluteTableIdentifier identifier;
 
+  private Configuration configuration;
+
   public SegmentStatusManager(AbsoluteTableIdentifier identifier) {
     this.identifier = identifier;
+    configuration = FileFactory.getConfiguration();
+  }
+
+  public SegmentStatusManager(AbsoluteTableIdentifier identifier, Configuration configuration) {
+    this.identifier = identifier;
+    this.configuration = configuration;
   }
 
   /**
@@ -93,21 +102,10 @@ public class SegmentStatusManager {
     }
   }
 
-  /**
-   * get valid segment for given table
-   *
-   * @return
-   * @throws IOException
-   */
   public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
     return getValidAndInvalidSegments(null, null);
   }
 
-  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
-      LoadMetadataDetails[] loadMetadataDetails) throws IOException {
-    return getValidAndInvalidSegments(loadMetadataDetails, null);
-  }
-
   /**
    * get valid segment for given load status details.
    */
@@ -129,7 +127,8 @@ public class SegmentStatusManager {
       }
 
       if (readCommittedScope == null) {
-        readCommittedScope = new TableStatusReadCommittedScope(identifier, loadMetadataDetails);
+        readCommittedScope = new TableStatusReadCommittedScope(identifier, loadMetadataDetails,
+            configuration);
       }
       //just directly iterate Array
       for (LoadMetadataDetails segment : loadMetadataDetails) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 168a526..27bc620 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -47,11 +47,19 @@ import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.BlockIndex;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Footer reader class
  */
 public abstract class AbstractDataFileFooterConverter {
 
+  protected Configuration configuration;
+
+  AbstractDataFileFooterConverter(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
   /**
    * Below method will be used to convert the thrift presence meta to wrapper
    * presence meta
@@ -77,7 +85,8 @@ public abstract class AbstractDataFileFooterConverter {
    * @return list of index info
    * @throws IOException problem while reading the index file
    */
-  public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList)
+  public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo>
+      tableBlockInfoList)
       throws IOException {
     CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
     List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
@@ -144,7 +153,7 @@ public abstract class AbstractDataFileFooterConverter {
    */
   public List<DataFileFooter> getIndexInfo(String filePath, byte[] fileData,
       boolean isTransactionalTable) throws IOException {
-    CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+    CarbonIndexFileReader indexReader = new CarbonIndexFileReader(configuration);
     List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
     String parentPath = filePath.substring(0, filePath.lastIndexOf("/"));
     try {
@@ -188,7 +197,7 @@ public abstract class AbstractDataFileFooterConverter {
         }
         if (readBlockIndexInfo.isSetBlocklet_info()) {
           List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
-          BlockletInfo blockletInfo = new DataFileFooterConverterV3()
+          BlockletInfo blockletInfo = new DataFileFooterConverterV3(configuration)
               .getBlockletInfo(readBlockIndexInfo.getBlocklet_info(),
                   CarbonUtil.getNumberOfDimensionColumns(columnSchemaList));
           blockletInfo.setBlockletIndex(blockletIndex);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 8e8b075..f14610c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -61,6 +61,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 
@@ -80,7 +81,7 @@ public class BlockletDataMapUtil {
         && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
       CarbonFile indexMergeFile = FileFactory.getCarbonFile(
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-              .getMergeIndexFileName());
+              .getMergeIndexFileName(), identifierWrapper.getConfiguration());
       if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
         indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
         filesRead.add(indexMergeFile.getPath());
@@ -89,7 +90,7 @@ public class BlockletDataMapUtil {
     if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
       indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-              .getIndexFileName()) });
+              .getIndexFileName(), identifierWrapper.getConfiguration()) });
     }
     Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
     CarbonTable carbonTable = identifierWrapper.getCarbonTable();
@@ -98,7 +99,8 @@ public class BlockletDataMapUtil {
       tableColumnList =
           carbonTable.getTableInfo().getFactTable().getListOfColumns();
     }
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    DataFileFooterConverter fileFooterConverter =
+        new DataFileFooterConverter(identifierWrapper.getConfiguration());
     List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
         identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
             .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
@@ -139,9 +141,9 @@ public class BlockletDataMapUtil {
    * @throws IOException
    */
   public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
-      String segmentFilePath) throws IOException {
+      String segmentFilePath, Configuration configuration) throws IOException {
     Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
-    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath);
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath, configuration);
     if (carbonFile instanceof AbstractDFSCarbonFile) {
       PathFilter pathFilter = new PathFilter() {
         @Override public boolean accept(Path path) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index c9601c0..fa982be 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1277,34 +1277,6 @@ public final class CarbonUtil {
   }
 
   /**
-   * Below method will be used to get all the block index info from index file
-   *
-   * @param taskId                  task id of the file
-   * @param tableBlockInfoList      list of table block
-   * @param identifier absolute table identifier
-   * @return list of block info
-   * @throws IOException if any problem while reading
-   */
-  public static List<DataFileFooter> readCarbonIndexFile(String taskId, String bucketNumber,
-      List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier identifier)
-      throws IOException {
-    // need to sort the  block info list based for task in ascending  order so
-    // it will be sinkup with block index read from file
-    Collections.sort(tableBlockInfoList);
-    // geting the index file path
-    //TODO need to pass proper partition number when partiton will be supported
-    String carbonIndexFilePath = CarbonTablePath
-        .getCarbonIndexFilePath(identifier.getTablePath(), taskId,
-            tableBlockInfoList.get(0).getSegmentId(),
-            bucketNumber, CarbonTablePath.DataFileUtil
-                .getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()),
-            tableBlockInfoList.get(0).getVersion());
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-    // read the index info and return
-    return fileFooterConverter.getIndexInfo(carbonIndexFilePath, tableBlockInfoList);
-  }
-
-  /**
    * initialize the value of dictionary chunk that can be kept in memory at a time
    *
    * @return
@@ -2243,17 +2215,17 @@ public final class CarbonUtil {
     }
   }
 
-  public static String getFilePathExternalFilePath(String path) {
+  public static String getFilePathExternalFilePath(String path, Configuration configuration) {
 
     // return the list of carbondata files in the given path.
-    CarbonFile segment = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+    CarbonFile segment = FileFactory.getCarbonFile(path, configuration);
 
     CarbonFile[] dataFiles = segment.listFiles();
     for (CarbonFile dataFile : dataFiles) {
       if (dataFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
         return dataFile.getAbsolutePath();
       } else if (dataFile.isDirectory()) {
-        return getFilePathExternalFilePath(dataFile.getAbsolutePath());
+        return getFilePathExternalFilePath(dataFile.getAbsolutePath(), configuration);
       }
     }
     return null;
@@ -2265,12 +2237,14 @@ public final class CarbonUtil {
    * @return table info containing the schema
    */
   public static org.apache.carbondata.format.TableInfo inferSchema(String carbonDataFilePath,
-      String tableName, boolean isCarbonFileProvider) throws IOException {
+      String tableName, boolean isCarbonFileProvider, Configuration configuration)
+      throws IOException {
     String fistFilePath = null;
     if (isCarbonFileProvider) {
-      fistFilePath = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
+      fistFilePath = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null",
+          configuration);
     } else {
-      fistFilePath = getFilePathExternalFilePath(carbonDataFilePath);
+      fistFilePath = getFilePathExternalFilePath(carbonDataFilePath, configuration);
     }
     if (fistFilePath == null) {
       // Check if we can infer the schema from the hive metastore.
@@ -2645,7 +2619,7 @@ public final class CarbonUtil {
     HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
     Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getLocationMap();
     if (locationMap != null) {
-      fileStore.readIndexFiles();
+      fileStore.readIndexFiles(FileFactory.getConfiguration());
       Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
       // get the size of carbonindex file
       carbonIndexSize = getCarbonIndexSize(fileStore, locationMap);
@@ -3192,8 +3166,7 @@ public final class CarbonUtil {
    * @param carbonTable
    * carbon Table
    */
-  public static ColumnarFormatVersion getFormatVersion(CarbonTable carbonTable)
-      throws IOException {
+  public static ColumnarFormatVersion getFormatVersion(CarbonTable carbonTable) throws IOException {
     String segmentPath = null;
     boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
     CarbonIndexFileReader indexReader = new CarbonIndexFileReader();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 670536e..61b4f37 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -33,12 +33,22 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.reader.CarbonFooterReader;
 import org.apache.carbondata.format.FileFooter;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Below class will be used to convert the thrift object of data file
  * meta data to wrapper object
  */
 public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
 
+  public DataFileFooterConverter(Configuration configuration) {
+    super(configuration);
+  }
+
+  public DataFileFooterConverter() {
+    super(FileFactory.getConfiguration());
+  }
+
   /**
    * Below method will be used to convert thrift file meta to wrapper file meta
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index b20a336..db52991 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
@@ -29,6 +30,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.reader.CarbonFooterReader;
 import org.apache.carbondata.format.FileFooter;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Below class will be used to convert the thrift object of data file
  * meta data to wrapper object for version 2 data file
@@ -36,6 +39,14 @@ import org.apache.carbondata.format.FileFooter;
 
 public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
 
+  public DataFileFooterConverter2(Configuration configuration) {
+    super(configuration);
+  }
+
+  public DataFileFooterConverter2() {
+    super(FileFactory.getConfiguration());
+  }
+
   /**
    * Below method will be used to convert thrift file meta to wrapper file meta
    */
@@ -136,6 +147,6 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
   }
 
   @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
-    return new DataFileFooterConverter().getSchema(tableBlockInfo);
+    return new DataFileFooterConverter(configuration).getSchema(tableBlockInfo);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index 6a968b4..41e22fd 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
@@ -31,8 +32,18 @@ import org.apache.carbondata.core.reader.CarbonHeaderReader;
 import org.apache.carbondata.format.FileFooter3;
 import org.apache.carbondata.format.FileHeader;
 
+import org.apache.hadoop.conf.Configuration;
+
 public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
 
+  public DataFileFooterConverterV3(Configuration configuration) {
+    super(configuration);
+  }
+
+  public DataFileFooterConverterV3() {
+    super(FileFactory.getConfiguration());
+  }
+
   /**
    * Below method will be used to convert thrift file meta to wrapper file meta
    * This method will read the footer from footer offset present in the data file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 1634091..9dde6b7 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
@@ -77,7 +78,8 @@ public class CarbonIndexFileMergeWriter {
       List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
       indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
     } else {
-      indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+      indexFiles =
+          SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
     }
     if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
       if (sfs == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
index a3acfab..34dca0b 100644
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -40,10 +40,12 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 
 import mockit.Deencapsulation;
 import mockit.Mock;
 import mockit.MockUp;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -103,7 +105,8 @@ public class TestBlockletDataMapFactory {
             BlockletDataMapIndexWrapper.class);
     method.setAccessible(true);
     method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifierWrapper,
-        new BlockletDataMapIndexWrapper(tableBlockIndexUniqueIdentifier.getSegmentId(), dataMaps));
+        new BlockletDataMapIndexWrapper(tableBlockIndexUniqueIdentifier.getSegmentId(), dataMaps,
+            tableBlockIndexUniqueIdentifierWrapper.getConfiguration()));
     BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifierWrapper);
     assert null != result;
   }
@@ -111,7 +114,8 @@ public class TestBlockletDataMapFactory {
   @Test public void getValidDistributables() throws IOException {
     BlockletDataMapDistributable blockletDataMapDistributable = new BlockletDataMapDistributable(
         "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex");
-    Segment segment = new Segment("0", null);
+    Segment segment = new Segment("0", null, new TableStatusReadCommittedScope(carbonTable
+        .getAbsoluteTableIdentifier(), new Configuration(false)));
     blockletDataMapDistributable.setSegment(segment);
     BlockletDataMapDistributable blockletDataMapDistributable1 = new BlockletDataMapDistributable(
         "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756701.carbonindex");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index 5520bfb..a4abc61 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -684,8 +684,8 @@ public class CarbonUtilTest {
 
   @Test public void testToGetSegmentString() {
     List<Segment> list = new ArrayList<>();
-    list.add(new Segment("1", null));
-    list.add(new Segment("2", null));
+    list.add(new Segment("1", null, null));
+    list.add(new Segment("2", null, null));
     String segments = CarbonUtil.convertToString(list);
     assertEquals(segments, "1,2");
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
index e506994..2705b63 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
@@ -46,6 +46,7 @@ import org.apache.carbondata.format.IndexHeader;
 
 import mockit.Mock;
 import mockit.MockUp;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 import static junit.framework.TestCase.assertEquals;
@@ -143,7 +144,7 @@ public class DataFileFooterConverterTest {
     new MockUp<FileFactory>() {
       @SuppressWarnings("unused") @Mock
       public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
-          int bufferSize) {
+          int bufferSize, Configuration configuration) {
         return dataInputStream;
       }
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 0d240c4..1e5b79c 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -237,7 +237,7 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
       }
       for (String shard : shardPaths) {
         BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
-        bloomDM.init(new BloomDataMapModel(shard, cache));
+        bloomDM.init(new BloomDataMapModel(shard, cache, segment.getConfiguration()));
         bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
         dataMaps.add(bloomDM);
       }
@@ -253,7 +253,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
     BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap();
     String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath();
-    bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache));
+    bloomCoarseGrainDataMap
+        .init(new BloomDataMapModel(indexPath, cache, FileFactory.getConfiguration()));
     bloomCoarseGrainDataMap.initIndexColumnConverters(getCarbonTable(),
         dataMapMeta.getIndexedColumns());
     coarseGrainDataMaps.add(bloomCoarseGrainDataMap);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
index 9d5d741..7ae4906 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
@@ -19,13 +19,16 @@ package org.apache.carbondata.datamap.bloom;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 
+import org.apache.hadoop.conf.Configuration;
+
 public class BloomDataMapModel extends DataMapModel {
 
   private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
 
   public BloomDataMapModel(String filePath,
-      Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache) {
-    super(filePath);
+      Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache,
+      Configuration configuration) {
+    super(filePath, configuration);
     this.cache = cache;
   }