You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/11 12:16:16 UTC

[2/3] carbondata git commit: [CARBONDATA-2909] Multi user support for SDK on S3

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index ff2ffdd..809d68b 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -53,8 +54,8 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
     FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema());
     try {
       dataMap.init(new DataMapModel(
-          DataMapWriter.getDefaultDataMapPath(
-              tableIdentifier.getTablePath(), segment.getSegmentNo(), dataMapName)));
+          DataMapWriter.getDefaultDataMapPath(tableIdentifier.getTablePath(),
+              segment.getSegmentNo(), dataMapName), segment.getConfiguration()));
     } catch (MemoryException e) {
       LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage());
       return lstDataMap;
@@ -73,7 +74,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
     FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema());
     String indexPath = ((LuceneDataMapDistributable) distributable).getIndexPath();
     try {
-      dataMap.init(new DataMapModel(indexPath));
+      dataMap.init(new DataMapModel(indexPath, FileFactory.getConfiguration()));
     } catch (MemoryException e) {
       LOGGER.error(String.format("failed to get lucene datamap , detail is %s", e.getMessage()));
       return lstDataMap;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index c80cc75..9c1e18d 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -383,11 +383,12 @@ public CarbonWriterBuilder withTableProperties(Map<String, String> options);
 * this writer is not thread safe, use buildThreadSafeWriterForCSVInput in multi thread environment
 * Build a {@link CarbonWriter}, which accepts row in CSV format object
 * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
+* @param configuration hadoop configuration object.
 * @return CSVCarbonWriter
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema) throws IOException, InvalidLoadOptionException;
+public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException;
 ```
 
 ```
@@ -395,12 +396,13 @@ public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema
 * Can use this writer in multi-thread instance.
 * Build a {@link CarbonWriter}, which accepts row in CSV format
 * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
-* @param numOfThreads number of threads() in which .write will be called.              
+* @param numOfThreads number of threads() in which .write will be called.    
+* @param configuration hadoop configuration object          
 * @return CSVCarbonWriter
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads)
+public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads, Configuration configuration)
   throws IOException, InvalidLoadOptionException;
 ```
 
@@ -410,11 +412,12 @@ public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfT
 * this writer is not thread safe, use buildThreadSafeWriterForAvroInput in multi thread environment
 * Build a {@link CarbonWriter}, which accepts Avro format object
 * @param avroSchema avro Schema object {org.apache.avro.Schema}
+* @param configuration hadoop configuration object
 * @return AvroCarbonWriter 
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throws IOException, InvalidLoadOptionException;
+public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException;
 ```
 
 ```
@@ -423,11 +426,13 @@ public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throw
 * Build a {@link CarbonWriter}, which accepts Avro object
 * @param avroSchema avro Schema object {org.apache.avro.Schema}
 * @param numOfThreads number of threads() in which .write will be called.
+* @param configuration hadoop configuration object
 * @return AvroCarbonWriter
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short numOfThreads)
+public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short 
+numOfThreads, Configuration configuration)
   throws IOException, InvalidLoadOptionException
 ```
 
@@ -437,11 +442,12 @@ public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avr
 * this writer is not thread safe, use buildThreadSafeWriterForJsonInput in multi thread environment
 * Build a {@link CarbonWriter}, which accepts Json object
 * @param carbonSchema carbon Schema object
+* @param configuration hadoop configuration object
 * @return JsonCarbonWriter
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema);
+public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, Configuration configuration);
 ```
 
 ```
@@ -450,11 +456,12 @@ public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema);
 * Build a {@link CarbonWriter}, which accepts Json object
 * @param carbonSchema carbon Schema object
 * @param numOfThreads number of threads() in which .write will be called.
+* @param configuration hadoop configuraiton object.
 * @return JsonCarbonWriter
 * @throws IOException
 * @throws InvalidLoadOptionException
 */
-public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads)
+public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads, Configuration configuration)
 ```
 
 ### Class org.apache.carbondata.sdk.file.CarbonWriter

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
index ada1a8c..4eec4bf 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
@@ -23,6 +23,7 @@ import java.sql.Date;
 import java.sql.Timestamp;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
 
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.sdk.file.CarbonReader;
@@ -55,7 +56,7 @@ public class CarbonReaderExample {
 
             CarbonWriter writer = CarbonWriter.builder()
                 .outputPath(path)
-                .buildWriterForCSVInput(new Schema(fields));
+                .buildWriterForCSVInput(new Schema(fields), new Configuration(false));
 
             for (int i = 0; i < 10; i++) {
                 String[] row2 = new String[]{
@@ -98,7 +99,7 @@ public class CarbonReaderExample {
             CarbonReader reader = CarbonReader
                 .builder(path, "_temp")
                 .projection(strings)
-                .build();
+                .build(new Configuration(false));
 
             System.out.println("\nData:");
             long day = 24L * 3600 * 1000;
@@ -116,7 +117,7 @@ public class CarbonReaderExample {
             // Read data
             CarbonReader reader2 = CarbonReader
                 .builder(path, "_temp")
-                .build();
+                .build(new Configuration(false));
 
             System.out.println("\nData:");
             i = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index 52d51b5..3abc342 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -25,6 +25,8 @@ import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
 import org.apache.carbondata.sdk.file.*;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Example for testing CarbonWriter on S3
  */
@@ -56,7 +58,7 @@ public class SDKS3Example {
                 .setEndPoint(args[2])
                 .outputPath(path);
 
-        CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+        CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), new Configuration(false));
 
         for (int i = 0; i < num; i++) {
             writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});
@@ -75,7 +77,7 @@ public class SDKS3Example {
             .setAccessKey(args[0])
             .setSecretKey(args[1])
             .setEndPoint(args[2])
-            .build();
+            .build(new Configuration(false));
 
         System.out.println("\nData:");
         int i = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
index a011d80..86bf854 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.examples
 import java.io.File
 
 import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.examples.util.ExampleUtils
@@ -36,7 +37,7 @@ object DirectSQLExample {
   def buildTestData(
       path: String,
       num: Int = 3,
-      persistSchema: Boolean = false): Any = {
+      persistSchema: Boolean = false, sparkSession: SparkSession): Any = {
 
     // getCanonicalPath gives path with \, but the code expects /.
     val writerPath = path.replace("\\", "/");
@@ -56,7 +57,8 @@ object DirectSQLExample {
       if (persistSchema) {
         builder.persistSchemaFile(true)
       }
-      val writer = builder.buildWriterForCSVInput(new Schema(fields))
+      val writer = builder
+        .buildWriterForCSVInput(new Schema(fields), sparkSession.sparkContext.hadoopConfiguration)
       var i = 0
       while (i < num) {
         writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -82,7 +84,7 @@ object DirectSQLExample {
     import carbonSession._
     // 1. generate data file
     cleanTestData(path)
-    buildTestData(path, 20)
+    buildTestData(path, 20, sparkSession = carbonSession)
     val readPath = path + "Fact/Part0/Segment_null"
 
     println("Running SQL on carbon files directly")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
index 1795960..c5c9710 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.examples
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
 import org.apache.spark.sql.SparkSession
 import org.slf4j.{Logger, LoggerFactory}
@@ -52,12 +53,12 @@ object S3UsingSDKExample {
           builder.outputPath(writerPath).isTransactionalTable(true)
             .uniqueIdentifier(
               System.currentTimeMillis)
-            .buildWriterForCSVInput(new Schema(fields))
+            .buildWriterForCSVInput(new Schema(fields), new Configuration(false))
         } else {
           builder.outputPath(writerPath).isTransactionalTable(true)
             .uniqueIdentifier(
               System.currentTimeMillis).withBlockSize(2)
-            .buildWriterForCSVInput(new Schema(fields))
+            .buildWriterForCSVInput(new Schema(fields), new Configuration(false))
         }
       var i = 0
       var row = num

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 9e5edc1..fcfb346 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -113,11 +113,14 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
       ReadCommittedScope readCommittedScope = null;
       if (carbonTable.isTransactionalTable()) {
         readCommittedScope = new LatestFilesReadCommittedScope(
-            identifier.getTablePath() + "/Fact/Part0/Segment_null/");
+            identifier.getTablePath() + "/Fact/Part0/Segment_null/", job.getConfiguration());
       } else {
         readCommittedScope = getReadCommittedScope(job.getConfiguration());
         if (readCommittedScope == null) {
-          readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
+          readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(), job
+              .getConfiguration());
+        } else {
+          readCommittedScope.setConfiguration(job.getConfiguration());
         }
       }
       // this will be null in case of corrupt schema file.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index e5e3165..eb9ff7c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -188,8 +188,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
           context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
       List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
       Set<Segment> segmentSet = new HashSet<>(
-          new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
-              .getValidAndInvalidSegments().getValidSegments());
+          new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
+              context.getConfiguration()).getValidAndInvalidSegments().getValidSegments());
       if (updateTime != null) {
         CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true,
             segmentDeleteList);
@@ -223,8 +223,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
 
     if (partitionSpecs != null && partitionSpecs.size() > 0) {
       List<Segment> validSegments =
-          new SegmentStatusManager(table.getAbsoluteTableIdentifier()).getValidAndInvalidSegments()
-              .getValidSegments();
+          new SegmentStatusManager(table.getAbsoluteTableIdentifier())
+              .getValidAndInvalidSegments().getValidSegments();
       String uniqueId = String.valueOf(System.currentTimeMillis());
       List<String> tobeUpdatedSegs = new ArrayList<>();
       List<String> tobeDeletedSegs = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ba6e043..ba3accf 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -143,7 +143,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<Segment> streamSegments = null;
     // get all valid segments and set them into the configuration
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier,
+        readCommittedScope.getConfiguration());
     SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
         .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
 
@@ -583,7 +584,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(
         table, loadMetadataDetails);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
-        new SegmentStatusManager(identifier)
+        new SegmentStatusManager(identifier, readCommittedScope.getConfiguration())
             .getValidAndInvalidSegments(loadMetadataDetails, readCommittedScope);
     Map<String, Long> blockRowCountMapping = new HashMap<>();
     Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
@@ -649,11 +650,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     if (readCommittedScope == null) {
       ReadCommittedScope readCommittedScope;
       if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) {
-        readCommittedScope = new TableStatusReadCommittedScope(identifier);
+        readCommittedScope = new TableStatusReadCommittedScope(identifier, job.getConfiguration());
       } else {
         readCommittedScope = getReadCommittedScope(job.getConfiguration());
         if (readCommittedScope == null) {
-          readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
+          readCommittedScope =
+              new LatestFilesReadCommittedScope(identifier.getTablePath(), job.getConfiguration());
         }
       }
       this.readCommittedScope = readCommittedScope;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 99d8532..2d4f370 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -270,6 +270,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
           TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
 
           throw new RuntimeException(e);
+        } finally {
+          ThreadLocalSessionInfo.unsetAll();
         }
       }
     });
@@ -444,6 +446,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
       } finally {
         executorService.shutdownNow();
         dataLoadExecutor.close();
+        ThreadLocalSessionInfo.unsetAll();
         // clean up the folders and files created locally for data load operation
         TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
index de0d731..40a0a62 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
@@ -112,7 +112,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
             .outputPath(writerPath)
             .isTransactionalTable(false)
             .uniqueIdentifier(System.currentTimeMillis)
-            .buildWriterForCSVInput(Schema.parseJson(schema))
+            .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
         } else {
           if (options != null) {
             builder.outputPath(writerPath)
@@ -120,14 +120,15 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
               .sortBy(sortColumns.toArray)
               .uniqueIdentifier(
                 System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
-              .buildWriterForCSVInput(Schema.parseJson(schema))
+              .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
           } else {
             builder.outputPath(writerPath)
               .isTransactionalTable(false)
               .sortBy(sortColumns.toArray)
               .uniqueIdentifier(
                 System.currentTimeMillis).withBlockSize(2)
-              .buildWriterForCSVInput(Schema.parseJson(schema))
+              .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext
+                .hadoopConfiguration)
           }
         }
       var i = 0
@@ -544,7 +545,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
     try {
       val writer = CarbonWriter.builder
         .outputPath(writerPath).isTransactionalTable(false)
-        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
       var i = 0
       while (i < rows) {
         writer.write(record)
@@ -743,7 +744,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
       .toString()
     val builder = CarbonWriter.builder()
     val writer = builder.outputPath(writerPath)
-      .buildWriterForCSVInput(Schema.parseJson(schema))
+      .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
 
     for (i <- 0 until 5) {
       writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(33000), i.toString))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index a1d4290..63fb2e6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -21,6 +21,7 @@ import java.util
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
@@ -35,6 +36,7 @@ import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.core.scan.expression.{ColumnExpression, LiteralExpression}
@@ -304,7 +306,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
     val resolveFilter: FilterResolverIntf =
       CarbonTable.resolveFilter(andExpression, carbonTable.getAbsoluteTableIdentifier)
     val exprWrapper = DataMapChooser.getDefaultDataMap(carbonTable, resolveFilter)
-    val segment = new Segment("0")
+    val segment = new Segment("0", new TableStatusReadCommittedScope(carbonTable
+      .getAbsoluteTableIdentifier, new Configuration(false)))
     // get the pruned blocklets
     val prunedBlocklets = exprWrapper.prune(List(segment).asJava, null)
     prunedBlocklets.asScala.foreach { blocklet =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 08daa34..1b181bc 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -59,9 +59,11 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
       val writer =
       if (persistSchema) {
         builder.persistSchemaFile(true)
-        builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+        builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema),
+          sqlContext.sparkContext.hadoopConfiguration)
       } else {
-        builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+        builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema),
+          sqlContext.sparkContext.hadoopConfiguration)
       }
 
       var i = 0

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 0b6813f..a03a5eb 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -142,7 +142,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
             .outputPath(writerPath)
             .isTransactionalTable(false)
             .uniqueIdentifier(System.currentTimeMillis)
-            .buildWriterForCSVInput(Schema.parseJson(schema))
+            .buildWriterForCSVInput(Schema.parseJson(schema),
+              sqlContext.sparkContext.hadoopConfiguration)
         } else {
           if (options != null) {
             builder.outputPath(writerPath)
@@ -150,14 +151,16 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
               .sortBy(sortColumns.toArray)
               .uniqueIdentifier(
                 System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
-              .buildWriterForCSVInput(Schema.parseJson(schema))
+              .buildWriterForCSVInput(Schema.parseJson(schema),
+                sqlContext.sparkContext.hadoopConfiguration)
           } else {
             builder.outputPath(writerPath)
               .isTransactionalTable(false)
               .sortBy(sortColumns.toArray)
               .uniqueIdentifier(
                 System.currentTimeMillis).withBlockSize(2)
-              .buildWriterForCSVInput(Schema.parseJson(schema))
+              .buildWriterForCSVInput(Schema.parseJson(schema),
+                sqlContext.sparkContext.hadoopConfiguration)
           }
         }
       var i = 0
@@ -194,7 +197,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         builder.outputPath(writerPath)
           .isTransactionalTable(false)
           .uniqueIdentifier(System.currentTimeMillis()).withBlockSize(2).sortBy(sortColumns)
-          .buildWriterForCSVInput(new Schema(fields))
+          .buildWriterForCSVInput(new Schema(fields),
+            sqlContext.sparkContext.hadoopConfiguration)
 
       var i = 0
       while (i < rows) {
@@ -228,7 +232,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
           .sortBy(sortColumns.toArray)
           .uniqueIdentifier(
             123).withBlockSize(2)
-          .buildWriterForCSVInput(Schema.parseJson(schema))
+          .buildWriterForCSVInput(Schema.parseJson(schema),
+            sqlContext.sparkContext.hadoopConfiguration)
       var i = 0
       while (i < rows) {
         writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -992,7 +997,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val builder: CarbonWriterBuilder = CarbonWriter.builder
       .outputPath(writerPath).isTransactionalTable(false).withLoadOptions(options)
 
-    val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields))
+    val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields),
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(Array("babu","1","02-01-2002","02-01-2002 01:01:00"));
     writer.close()
 
@@ -1117,7 +1123,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     try {
       val writer = CarbonWriter.builder
         .outputPath(writerPath).isTransactionalTable(false)
-        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+        sqlContext.sparkContext.hadoopConfiguration)
       var i = 0
       while (i < rows) {
         writer.write(record)
@@ -2091,7 +2098,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     assert(intercept[RuntimeException] {
       val writer = CarbonWriter.builder.sortBy(Array("name", "id"))
-        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+        sqlContext.sparkContext.hadoopConfiguration)
       writer.write(record)
       writer.close()
     }.getMessage.toLowerCase.contains("column: name specified in sort columns"))
@@ -2131,7 +2139,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
   }
@@ -2169,7 +2178,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder.sortBy(Array("id"))
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
   }
@@ -2213,7 +2223,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
   }
@@ -2253,7 +2264,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -2299,7 +2311,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -2346,7 +2359,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -2366,7 +2380,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val writer: CarbonWriter = CarbonWriter.builder
       .outputPath(writerPath)
       .withTableProperties(options)
-      .buildWriterForCSVInput(new Schema(fields))
+      .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration)
     writer.write(Array("carbon", "1"))
     writer.write(Array("hydrogen", "10"))
     writer.write(Array("boron", "4"))
@@ -2384,7 +2398,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     // write local sort data
     val writer1: CarbonWriter = CarbonWriter.builder
       .outputPath(writerPath)
-      .buildWriterForCSVInput(new Schema(fields))
+      .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration)
     writer1.write(Array("carbon", "1"))
     writer1.write(Array("hydrogen", "10"))
     writer1.write(Array("boron", "4"))
@@ -2493,7 +2507,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     fields(0) = new Field("name", DataTypes.STRING)
     fields(1) = new Field("surname", DataTypes.STRING)
     fields(2) = new Field("age", DataTypes.INT)
-    val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields))
+    val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields),
+      sqlContext.sparkContext.hadoopConfiguration)
     var i = 0
     while (i < 100) {
       {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
index ff5c062..17aae1d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
@@ -98,7 +98,8 @@ class TestNonTransactionalCarbonTableJsonWriter extends QueryTest with BeforeAnd
         .outputPath(writerPath).isTransactionalTable(false)
         .uniqueIdentifier(System.currentTimeMillis())
         .withLoadOptions(options)
-        .buildWriterForJsonInput(carbonSchema)
+        .buildWriterForJsonInput(carbonSchema,
+          sqlContext.sparkContext.hadoopConfiguration)
       writer.write(jsonRow)
       writer.close()
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
index dc13b16..e7fcf95 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
@@ -93,7 +93,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -140,7 +140,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, mySchema)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -184,7 +184,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json, mySchema)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -215,7 +215,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -245,7 +245,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -275,7 +275,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -305,7 +305,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -335,7 +335,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -365,7 +365,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
     val exception1 = intercept[UnsupportedOperationException] {
       val writer = CarbonWriter.builder
-        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
       writer.write(record)
       writer.close()
     }
@@ -402,7 +402,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -438,7 +438,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -476,7 +476,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -509,7 +509,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -568,7 +568,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -663,7 +663,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -716,7 +716,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
 
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -786,7 +786,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     avroRec.put("struct_field_decimal", genericByteArray)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -858,7 +858,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     avroRec.put("dec_fields", genericByteArray)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -905,7 +905,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
 
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false)
+      .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -952,7 +953,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
 
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false)
+      .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -997,7 +999,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val avroRec = new GenericData. Record(nn)
     avroRec.put("id", bytes1)
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1042,7 +1044,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes1)
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1087,7 +1089,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes1)
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1128,7 +1130,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes)
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1170,7 +1172,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     avroRec.put("dec_field", bytes)
     val exception1 = intercept[Exception] {
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     }
@@ -1220,7 +1222,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -1256,7 +1258,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(record)
     writer.close()
     sql(
@@ -1306,7 +1308,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
 
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
     writer.write(avroRec)
     writer.close()
     sql(s"create table sdkOutputTable(union_field struct<union_field0:decimal(10,2),union_field1:int>) " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index 62ba03e..0421ea8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -68,11 +68,13 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
         CarbonWriter.builder
           .outputPath(writerPath).isTransactionalTable(false).enableLocalDictionary(true)
           .localDictionaryThreshold(2000)
-          .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+          .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+          sqlContext.sparkContext.hadoopConfiguration)
       } else {
         CarbonWriter.builder
           .outputPath(writerPath).isTransactionalTable(false)
-          .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+          .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+          sqlContext.sparkContext.hadoopConfiguration)
       }
       var i = 0
       while (i < rows) {
@@ -268,7 +270,8 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
       """.stripMargin
     val pschema= org.apache.avro.Schema.parse(mySchema)
     val records = testUtil.jsonToAvro(jsonvalue, mySchema)
-    val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema)
+    val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema,
+      sqlContext.sparkContext.hadoopConfiguration)
     writer.write(records)
     writer.close()
     sql("DROP TABLE IF EXISTS sdkOutputTable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
index 4e2197d..a8bdb31 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.testsuite.createTable
 import java.io.File
 
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -58,9 +59,11 @@ object TestSparkCarbonFileFormatWithSparkSession {
       val writer =
         if (persistSchema) {
           builder.persistSchemaFile(true)
-          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new
+              Configuration(false))
         } else {
-          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new
+              Configuration(false))
         }
 
       var i = 0

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index 39785a3..6e8e79b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -18,9 +18,10 @@
 package org.apache.carbondata.spark.testsuite.dataload
 
 import scala.collection.JavaConverters._
-
 import java.io.{File, FilenameFilter}
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.reader.CarbonIndexFileReader
 import org.apache.carbondata.core.util.CarbonProperties
@@ -64,7 +65,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
     } else {
       val segment = Segment.getSegment("0", carbonTable.getTablePath)
       val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
-      store.readIndexFiles()
+      store.readIndexFiles(new Configuration(false))
       store.getIndexCarbonFiles.asScala.map(f => new File(f.getAbsolutePath)).toArray
     }
     for (carbonIndexPath <- carbonIndexPaths) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 0c42264..44bc243 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 import java.io.{File, FileWriter}
 
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
@@ -281,7 +282,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     } else {
       val segment = Segment.getSegment("0", carbonTable.getTablePath)
       val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
-      store.readIndexFiles()
+      store.readIndexFiles(new Configuration(false))
       val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
       assertResult(Math.max(4, defaultParallelism) + 1)(size + store.getIndexFilesMap.size())
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 046a2a6..a4bc6f0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -22,12 +22,13 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -71,7 +72,7 @@ class CGDataMapFactory(
     val files = file.listFiles()
     files.map {f =>
       val dataMap: CoarseGrainDataMap = new CGDataMap()
-      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap.init(new DataMapModel(f.getCanonicalPath, new Configuration(false)))
       dataMap
     }.toList.asJava
   }
@@ -83,7 +84,8 @@ class CGDataMapFactory(
   override def getDataMaps(distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = {
     val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
     val dataMap: CoarseGrainDataMap = new CGDataMap()
-    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath, new
+        Configuration(false)))
     Seq(dataMap).asJava
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index b13582b..57b3672 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -22,12 +22,14 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -68,7 +70,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
     val files = file.listFiles()
     files.map { f =>
       val dataMap: FineGrainDataMap = new FGDataMap()
-      dataMap.init(new DataMapModel(f.getCanonicalPath))
+      dataMap.init(new DataMapModel(f.getCanonicalPath, new Configuration(false)))
       dataMap
     }.toList.asJava
   }
@@ -79,7 +81,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
   override def getDataMaps(distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= {
     val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
     val dataMap: FineGrainDataMap = new FGDataMap()
-    dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+    dataMap.init(new DataMapModel(mapDistributable.getFilePath, new Configuration(false)))
     Seq(dataMap).asJava
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 8ebed1f..edd3e9c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -18,9 +18,9 @@
 package org.apache.carbondata.spark.testsuite.datamap
 
 import scala.collection.JavaConverters._
-
 import java.io.{File, FilenameFilter}
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -337,7 +337,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
     } else {
       val segment = Segment.getSegment("0", path)
       val store = new SegmentFileStore(path, segment.getSegmentFileName)
-      store.readIndexFiles()
+      store.readIndexFiles(new Configuration(false))
       val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
       assertResult(true)(size > 0)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index 133454a..f4c725e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -18,6 +18,7 @@ package org.apache.carbondata.spark.testsuite.partition
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
@@ -78,7 +79,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
     } else {
       val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
       val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
-      store.readIndexFiles()
+      store.readIndexFiles(new Configuration(false))
       store.getIndexFilesMap.asScala.flatMap(_._2.asScala).map(f => FileFactory.getCarbonFile(f)).toArray
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index e3e8e68..9a0080c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -21,6 +21,7 @@ import java.util
 import java.util.concurrent.{Callable, ExecutorService, Executors}
 
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
 import org.apache.spark.sql.optimizer.CarbonFilters
@@ -352,7 +353,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree")
     val details = SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
     val store = new SegmentFileStore(carbonTable.getTablePath, details(0).getSegmentFile)
-    store.readIndexFiles()
+    store.readIndexFiles(new Configuration(false))
     store.getIndexFiles
     assert(store.getIndexFiles.size() == 10)
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
index af05613..3a650ec 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -117,7 +117,7 @@ class CarbonFileIndex(
       }
       CarbonInputFormat.setReadCommittedScope(
         hadoopConf,
-        new LatestFilesReadCommittedScope(indexFiles))
+        new LatestFilesReadCommittedScope(indexFiles, hadoopConf))
       filter match {
         case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
         case None => None

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index a5e1b39..62d9903 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -78,9 +78,10 @@ class SparkCarbonFileFormat extends FileFormat
   override def inferSchema(sparkSession: SparkSession,
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
+    val conf = sparkSession.sessionState.newHadoopConf()
     val tablePath = options.get("path") match {
       case Some(path) =>
-        FileFactory.checkAndAppendDefaultFs(path, sparkSession.sparkContext.hadoopConfiguration)
+        FileFactory.checkAndAppendDefaultFs(path, conf)
       case _ if files.nonEmpty =>
         FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
       case _ =>
@@ -89,7 +90,8 @@ class SparkCarbonFileFormat extends FileFormat
     if (options.get(CarbonCommonConstants.SORT_COLUMNS).isDefined) {
       throw new UnsupportedOperationException("Cannot use sort columns during infer schema")
     }
-    val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
+    val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""),
+      false, conf)
     val table = CarbonTable.buildFromTableInfo(tableInfo)
     var schema = new StructType
     val fields = tableInfo.getFactTable.getListOfColumns.asScala.map { col =>
@@ -395,7 +397,7 @@ class SparkCarbonFileFormat extends FileFormat
           vectorizedReader
         } else {
           val reader = new CarbonRecordReader(model,
-            new SparkUnsafeRowReadSuport(requiredSchema), null)
+            new SparkUnsafeRowReadSuport(requiredSchema), broadcastedHadoopConf.value.value)
           reader.initialize(split, hadoopAttemptContext)
           reader
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 66c0224..825cdec 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -868,7 +868,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
         builder.outputPath(writerPath)
           .isTransactionalTable(false)
           .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns)
-          .buildWriterForCSVInput(new Schema(fields))
+          .buildWriterForCSVInput(new Schema(fields), spark.sparkContext.hadoopConfiguration)
 
       var i = 0
       while (i < rows) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 26f67f8..43f04b8 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -22,8 +22,8 @@ import java.io.File
 import org.apache.commons.io.FileUtils
 import org.apache.commons.lang.RandomStringUtils
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
-import org.apache.spark.sql.carbondata.datasource.TestUtil._
 import org.apache.spark.util.SparkUtil
+import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
@@ -70,9 +70,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
       val writer =
         if (persistSchema) {
           builder.persistSchemaFile(true)
-          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark
+            .sparkContext.hadoopConfiguration)
         } else {
-          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark
+            .sparkContext.hadoopConfiguration)
         }
 
       var i = 0
@@ -333,7 +335,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
       .toString()
     val builder = CarbonWriter.builder()
     val writer = builder.outputPath(writerPath)
-      .buildWriterForCSVInput(Schema.parseJson(schema))
+      .buildWriterForCSVInput(Schema.parseJson(schema), spark.sessionState.newHadoopConf())
     for (i <- 0 until 3) {
       // write a varchar with 75,000 length
       writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString))
@@ -348,15 +350,12 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
         s"""CREATE TABLE sdkOutputTable (name string, address string, age int)
            |USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """
           .stripMargin)
-    } else if (spark.sparkContext.version.startsWith("2.2")) {
+    } else {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon
            |OPTIONS("long_String_columns"="address") LOCATION
            |'$writerPath' """.stripMargin)
-    } else {
-      // TODO. spark2.3 ?
-      assert(false)
     }
     assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1)
     val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList()
@@ -371,14 +370,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
         .sql(
           s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
              |'$writerPath', "long_String_columns" "address") """.stripMargin)
-    } else if (spark.sparkContext.version.startsWith("2.2")) {
+    } else {
       //data source file format
       spark.sql(
         s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
            |("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin)
-    } else {
-      // TODO. spark2.3 ?
-      assert(false)
     }
     assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1)
     val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0fd4e34..57887a7 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -436,7 +436,7 @@ object CarbonDataRDDFactory {
         res.foreach { resultOfSeg =>
           resultSize = resultSize + resultOfSeg.size
           resultOfSeg.foreach { resultOfBlock =>
-            segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null))
+            segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName))
           }
         }
         val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index b77632d..4921b33 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -159,14 +159,13 @@ object DeleteExecution {
         resultOfBlock => {
           if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
             blockUpdateDetailsList.add(resultOfBlock._2._1)
-            segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName, null))
+            segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName))
             // if this block is invalid then decrement block count in map.
             if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
               CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
                 blockMappingVO.getSegmentNumberOfBlockMapping)
             }
-          }
-          else {
+          } else {
             // In case of failure , clean all related delete delta files
             CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
             LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index e3da86d..2951283 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1202,7 +1202,7 @@ public final class CarbonDataMergerUtil {
     segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
 
     CarbonFile[] deleteDeltaFiles =
-        segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg, null), blockName);
+        segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg), blockName);
 
     String destFileName =
         blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 19353d1..f6cc485 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -326,7 +326,7 @@ public final class CarbonLoaderUtil {
 
         for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
           // if the segments is in the list of marked for delete then update the status.
-          if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName(), null))) {
+          if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName()))) {
             detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
           } else if (segmentFilesTobeUpdated
               .contains(Segment.toSegment(detail.getLoadName(), null))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index dd70cc9..a183197 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -74,8 +74,7 @@ public class AvroCarbonWriter extends CarbonWriter {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(CarbonTable.class.getName());
 
-  AvroCarbonWriter(CarbonLoadModel loadModel) throws IOException {
-    Configuration hadoopConf = new Configuration();
+  AvroCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws IOException {
     CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
     CarbonTableOutputFormat format = new CarbonTableOutputFormat();
     JobID jobId = new JobID(UUID.randomUUID().toString(), 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
index 627e060..a8899a7 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
@@ -46,8 +46,7 @@ class CSVCarbonWriter extends CarbonWriter {
   private TaskAttemptContext context;
   private ObjectArrayWritable writable;
 
-  CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException {
-    Configuration hadoopConf = new Configuration();
+  CSVCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws IOException {
     CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
     CarbonTableOutputFormat format = new CarbonTableOutputFormat();
     JobID jobId = new JobID(UUID.randomUUID().toString(), 0);