You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/08/28 09:00:31 UTC

[1/3] carbondata git commit: [CARBONDATA-2844] [CARBONDATA-2865] Pass SK/AK to executor by serializing hadoop configuration from driver.

Repository: carbondata
Updated Branches:
  refs/heads/master 1fb1f19f2 -> 2a9604cd8


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index b93d80f..30aa415 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -54,6 +54,7 @@ import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.CarbonRecordReader;
 import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.search.SearchRequest;
 import org.apache.spark.search.SearchResult;
 import org.apache.spark.search.ShutdownRequest;
@@ -135,7 +136,7 @@ public class SearchRequestHandler {
 
     // In search mode, reader will read multiple blocks by using a thread pool
     CarbonRecordReader<CarbonRow> reader =
-        new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport());
+        new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport(), new Configuration());
 
     // read all rows by the reader
     List<CarbonRow> rows = new LinkedList<>();


[3/3] carbondata git commit: [CARBONDATA-2844] [CARBONDATA-2865] Pass SK/AK to executor by serializing hadoop configuration from driver.

Posted by ra...@apache.org.
[CARBONDATA-2844] [CARBONDATA-2865] Pass SK/AK to executor by serializing hadoop configuration from driver.

add SK/AK to thread local so that on each query new SK/AK can be passed to FileFactory
Refactor FileFactory to accept configuration from thread local.
Fixed compatibility issue from 1.3.x to 1.5.x [CARBONDATA-2865].

This closes #2623


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2a9604cd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2a9604cd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2a9604cd

Branch: refs/heads/master
Commit: 2a9604cd840dc1a552afcff23059ac9bf624e161
Parents: 1fb1f19
Author: kunal642 <ku...@gmail.com>
Authored: Wed Aug 8 21:50:44 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Aug 28 14:30:19 2018 +0530

----------------------------------------------------------------------
 .../core/datastore/impl/DFSFileReaderImpl.java  |  8 ++-
 .../datastore/impl/DefaultFileTypeProvider.java |  4 +-
 .../core/datastore/impl/FileFactory.java        | 27 +++++++---
 .../core/datastore/impl/FileTypeInterface.java  |  2 +-
 .../scan/executor/QueryExecutorFactory.java     | 12 +++--
 .../executor/impl/AbstractQueryExecutor.java    |  5 +-
 .../scan/executor/impl/DetailQueryExecutor.java |  6 +++
 .../impl/SearchModeDetailQueryExecutor.java     |  4 +-
 .../SearchModeVectorDetailQueryExecutor.java    |  5 +-
 .../impl/VectorDetailQueryExecutor.java         |  6 +++
 .../carbondata/core/util/CarbonProperties.java  |  6 +--
 .../apache/carbondata/core/util/CarbonUtil.java |  6 +--
 .../carbondata/core/util/SessionParams.java     |  5 +-
 .../core/util/ThreadLocalSessionInfo.java       | 20 +++++++
 .../store/impl/DFSFileReaderImplUnitTest.java   |  3 +-
 .../datamap/lucene/LuceneDataMapWriter.java     |  2 +-
 .../carbondata/hadoop/CarbonRecordReader.java   | 10 ++--
 .../hadoop/api/CarbonInputFormat.java           |  3 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  1 -
 .../hadoop/api/CarbonTableOutputFormat.java     |  7 ++-
 .../hadoop/util/CarbonInputFormatUtil.java      | 17 ------
 .../carbondata/hive/CarbonHiveRecordReader.java |  2 +-
 .../presto/CarbondataPageSourceProvider.java    |  3 +-
 .../PrestoCarbonVectorizedRecordReader.java     |  3 +-
 .../presto/impl/CarbonTableReader.java          |  2 +-
 .../carbondata/presto/server/PrestoServer.scala |  1 +
 ...eneFineGrainDataMapWithSearchModeSuite.scala |  3 +-
 .../createTable/TestCreateTableAsSelect.scala   | 10 ++--
 .../carbondata/spark/load/CsvRDDHelper.scala    |  9 ++--
 .../load/DataLoadProcessBuilderOnSpark.scala    |  8 ++-
 .../load/DataLoadProcessorStepOnSpark.scala     | 10 ++--
 .../spark/rdd/AlterTableAddColumnRDD.scala      |  9 ++--
 .../spark/rdd/AlterTableDropColumnRDD.scala     | 11 ++--
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  |  7 +--
 .../spark/rdd/CarbonDropPartitionRDD.scala      | 11 ++--
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 28 +++++-----
 .../spark/rdd/CarbonIUDMergerRDD.scala          | 15 +++---
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 15 +++---
 .../apache/carbondata/spark/rdd/CarbonRDD.scala | 55 +++++++++-----------
 .../spark/rdd/CarbonScanPartitionRDD.scala      | 15 +++---
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 15 +++---
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 53 ++++++-------------
 .../carbondata/spark/rdd/SparkDataMapJob.scala  | 21 ++++----
 .../carbondata/spark/rdd/StreamHandoffRDD.scala | 14 +++--
 .../carbondata/spark/util/CommonUtil.scala      | 10 ++--
 .../spark/util/GlobalDictionaryUtil.scala       | 20 ++++---
 .../apache/spark/rdd/CarbonMergeFilesRDD.scala  | 11 ++--
 .../apache/spark/rdd/DataLoadCoalescedRDD.scala |  7 +--
 .../command/carbonTableSchemaCommon.scala       |  2 +-
 .../apache/spark/sql/util/SparkSQLUtil.scala    |  4 ++
 .../org/apache/spark/util/PartitionUtils.scala  |  2 +-
 .../VectorizedCarbonRecordReader.java           |  3 +-
 .../datasources/SparkCarbonFileFormat.scala     |  3 +-
 .../datamap/IndexDataMapRebuildRDD.scala        | 16 +++---
 .../spark/rdd/CarbonDataRDDFactory.scala        | 26 +++++----
 .../spark/rdd/CarbonTableCompactor.scala        |  4 +-
 .../org/apache/spark/sql/CarbonCountStar.scala  |  6 ++-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  1 -
 .../spark/sql/CarbonDictionaryDecoder.scala     | 15 ++++--
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  6 +--
 .../org/apache/spark/sql/CarbonSession.scala    |  3 +-
 .../org/apache/spark/sql/CarbonSource.scala     |  1 -
 .../sql/events/MergeIndexEventListener.scala    | 14 ++---
 .../management/CarbonInsertIntoCommand.scala    |  7 +--
 .../management/CarbonLoadDataCommand.scala      | 19 ++++---
 .../command/mutation/DeleteExecution.scala      | 16 +++---
 .../command/mutation/HorizontalCompaction.scala |  6 +++
 ...rbonAlterTableDropHivePartitionCommand.scala |  2 +-
 .../CarbonAlterTableAddColumnCommand.scala      |  4 +-
 .../CarbonAlterTableDropColumnCommand.scala     |  2 +-
 .../table/CarbonCreateTableCommand.scala        |  6 ++-
 .../sql/execution/strategy/DDLStrategy.scala    |  4 +-
 .../org/apache/spark/util/TableLoader.scala     |  3 +-
 .../merger/CarbonCompactionExecutor.java        | 12 +++--
 .../spliter/AbstractCarbonQueryExecutor.java    |  7 ++-
 .../partition/spliter/CarbonSplitExecutor.java  |  6 ++-
 .../sdk/file/CarbonReaderBuilder.java           |  3 ++
 .../sdk/file/CarbonWriterBuilder.java           |  6 +++
 .../carbondata/sdk/file/JsonCarbonWriter.java   |  3 +-
 .../store/worker/SearchRequestHandler.java      |  3 +-
 80 files changed, 428 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
index 1a0cd41..e86fa12 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileReader;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,7 +38,10 @@ public class DFSFileReaderImpl implements FileReader {
 
   private boolean readPageByPage;
 
-  public DFSFileReaderImpl() {
+  private Configuration configuration;
+
+  public DFSFileReaderImpl(Configuration configuration) {
+    this.configuration = configuration;
     this.fileNameAndStreamCache =
         new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   }
@@ -60,7 +64,7 @@ public class DFSFileReaderImpl implements FileReader {
     FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
     if (null == fileChannel) {
       Path pt = new Path(filePath);
-      FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+      FileSystem fs = pt.getFileSystem(configuration);
       fileChannel = fs.open(pt);
       fileNameAndStreamCache.put(filePath, fileChannel);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
index c4761c9..937b5b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 
 public class DefaultFileTypeProvider implements FileTypeInterface {
 
-  public FileReader getFileHolder(FileFactory.FileType fileType) {
+  public FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration) {
     switch (fileType) {
       case LOCAL:
         return new FileReaderImpl();
@@ -37,7 +37,7 @@ public class DefaultFileTypeProvider implements FileTypeInterface {
       case ALLUXIO:
       case VIEWFS:
       case S3:
-        return new DFSFileReaderImpl();
+        return new DFSFileReaderImpl(configuration);
       default:
         return new FileReaderImpl();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index e353623..b07d11b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -59,11 +60,23 @@ public final class FileFactory {
   }
 
   public static Configuration getConfiguration() {
-    return configuration;
+    Configuration conf;
+    Object confObject = ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo()
+        .getNonSerializableExtraInfo().get("carbonConf");
+    if (confObject == null) {
+      conf = configuration;
+    } else {
+      conf = (Configuration) confObject;
+    }
+    return conf;
   }
 
   public static FileReader getFileHolder(FileType fileType) {
-    return fileFileTypeInterface.getFileHolder(fileType);
+    return fileFileTypeInterface.getFileHolder(fileType, getConfiguration());
+  }
+
+  public static FileReader getFileHolder(FileType fileType, Configuration configuration) {
+    return fileFileTypeInterface.getFileHolder(fileType, configuration);
   }
 
   public static FileType getFileType(String path) {
@@ -100,7 +113,7 @@ public final class FileFactory {
 
   public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
       throws IOException {
-    return getDataInputStream(path, fileType, bufferSize, configuration);
+    return getDataInputStream(path, fileType, bufferSize, getConfiguration());
   }
   public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize,
       Configuration configuration) throws IOException {
@@ -306,7 +319,7 @@ public final class FileFactory {
         // this method was new in hadoop 2.7, otherwise use CarbonFile.truncate to do this.
         try {
           Path pt = new Path(path);
-          FileSystem fs = pt.getFileSystem(configuration);
+          FileSystem fs = pt.getFileSystem(getConfiguration());
           Method truncateMethod = fs.getClass().getDeclaredMethod("truncate",
               new Class[]{Path.class, long.class});
           truncateMethod.invoke(fs, new Object[]{pt, newSize});
@@ -414,7 +427,7 @@ public final class FileFactory {
       case VIEWFS:
       case S3:
         Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
+        FileSystem fs = path.getFileSystem(getConfiguration());
         return fs.getContentSummary(path).getLength();
       case LOCAL:
       default:
@@ -442,7 +455,7 @@ public final class FileFactory {
    * @throws IOException
    */
   public static FileSystem getFileSystem(Path path) throws IOException {
-    return path.getFileSystem(configuration);
+    return path.getFileSystem(getConfiguration());
   }
 
 
@@ -455,7 +468,7 @@ public final class FileFactory {
       case VIEWFS:
         try {
           Path path = new Path(directoryPath);
-          FileSystem fs = path.getFileSystem(FileFactory.configuration);
+          FileSystem fs = path.getFileSystem(getConfiguration());
           if (!fs.exists(path)) {
             fs.mkdirs(path);
             fs.setPermission(path, permission);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
index 358d2ef..8b0fcc4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 
 public interface FileTypeInterface {
 
-  FileReader getFileHolder(FileFactory.FileType fileType);
+  FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration);
   CarbonFile getCarbonFile(String path, FileFactory.FileType fileType);
   CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
index b790f1c..2a9c7f4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
@@ -23,24 +23,26 @@ import org.apache.carbondata.core.scan.executor.impl.VectorDetailQueryExecutor;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Factory class to get the query executor from RDD
  * This will return the executor based on query type
  */
 public class QueryExecutorFactory {
 
-  public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
+  public static QueryExecutor getQueryExecutor(QueryModel queryModel, Configuration configuration) {
     if (CarbonProperties.isSearchModeEnabled()) {
       if (queryModel.isVectorReader()) {
-        return new SearchModeVectorDetailQueryExecutor();
+        return new SearchModeVectorDetailQueryExecutor(configuration);
       } else {
-        return new SearchModeDetailQueryExecutor();
+        return new SearchModeDetailQueryExecutor(configuration);
       }
     } else {
       if (queryModel.isVectorReader()) {
-        return new VectorDetailQueryExecutor();
+        return new VectorDetailQueryExecutor(configuration);
       } else {
-        return new DetailQueryExecutor();
+        return new DetailQueryExecutor(configuration);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 259889b..ece2f8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -68,10 +68,12 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * This class provides a skeletal implementation of the {@link QueryExecutor}
@@ -96,7 +98,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
    */
   protected CarbonIterator queryIterator;
 
-  public AbstractQueryExecutor() {
+  public AbstractQueryExecutor(Configuration configuration) {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
     queryProperties = new QueryExecutorProperties();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
index 46ef43d..e11c576 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
@@ -27,6 +27,8 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Below class will be used to execute the detail query
  * For executing the detail query it will pass all the block execution
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator
  */
 public class DetailQueryExecutor extends AbstractQueryExecutor<RowBatch> {
 
+  public DetailQueryExecutor(Configuration configuration) {
+    super(configuration);
+  }
+
   @Override
   public CarbonIterator<RowBatch> execute(QueryModel queryModel)
       throws QueryExecutionException, IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
index ae14327..6d03540 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
@@ -31,13 +31,15 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import org.apache.hadoop.conf.Configuration;
 
 public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> {
   private static final LogService LOGGER =
           LogServiceFactory.getLogService(SearchModeDetailQueryExecutor.class.getName());
   private static ExecutorService executorService = null;
 
-  public SearchModeDetailQueryExecutor() {
+  public SearchModeDetailQueryExecutor(Configuration configuration) {
+    super(configuration);
     if (executorService == null) {
       initThreadPool();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
index 705c451..418ef42 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
@@ -32,6 +32,8 @@ import org.apache.carbondata.core.util.CarbonProperties;
 
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Below class will be used to execute the detail query and returns columnar vectors.
  */
@@ -40,7 +42,8 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O
           LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
   private static ExecutorService executorService = null;
 
-  public SearchModeVectorDetailQueryExecutor() {
+  public SearchModeVectorDetailQueryExecutor(Configuration configuration) {
+    super(configuration);
     if (executorService == null) {
       initThreadPool();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
index 7787e4c..46397c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
@@ -26,11 +26,17 @@ import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.VectorDetailQueryResultIterator;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Below class will be used to execute the detail query and returns columnar vectors.
  */
 public class VectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
 
+  public VectorDetailQueryExecutor(Configuration configuration) {
+    super(configuration);
+  }
+
   @Override
   public CarbonIterator<Object> execute(QueryModel queryModel)
       throws QueryExecutionException, IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index c3a4934..58fef17 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -23,11 +23,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.text.SimpleDateFormat;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -94,7 +94,7 @@ public final class CarbonProperties {
   /**
    * It is purely for testing
    */
-  private Map<String, String> addedProperty = new HashMap<>();
+  private Map<String, String> addedProperty = new ConcurrentHashMap<>();
 
   /**
    * Private constructor this will call load properties method to load all the
@@ -407,7 +407,7 @@ public final class CarbonProperties {
    * @param lockTypeConfigured
    */
   private void validateAndConfigureLockType(String lockTypeConfigured) {
-    Configuration configuration = new Configuration(true);
+    Configuration configuration = FileFactory.getConfiguration();
     String defaultFs = configuration.get("fs.defaultFS");
     if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
         || defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 9aaa58c..c5e2e8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -134,8 +134,6 @@ public final class CarbonUtil {
    */
   private static final int CONST_HUNDRED = 100;
 
-  private static final Configuration conf = new Configuration(true);
-
   /**
    * dfs.bytes-per-checksum
    * HDFS checksum length, block size for a file should be exactly divisible
@@ -662,7 +660,7 @@ public final class CarbonUtil {
    */
   public static String checkAndAppendHDFSUrl(String filePath) {
     String currentPath = filePath;
-    String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
+    String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS);
     String baseDFSUrl = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL, "");
     if (checkIfPrefixExists(filePath)) {
@@ -699,7 +697,7 @@ public final class CarbonUtil {
       filePath = "/" + filePath;
     }
     currentPath = filePath;
-    String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
+    String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS);
     if (defaultFsUrl == null) {
       return currentPath;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 169c003..51b157f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.util;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.logging.LogService;
@@ -57,12 +58,12 @@ public class SessionParams implements Serializable, Cloneable {
   private static final long serialVersionUID = -7801994600594915264L;
 
   private Map<String, String> sProps;
-  private Map<String, String> addedProps;
+  private ConcurrentHashMap<String, String> addedProps;
   // below field to be used when we want the objects to be serialized
   private Map<String, Object> extraInfo;
   public SessionParams() {
     sProps = new HashMap<>();
-    addedProps = new HashMap<>();
+    addedProps = new ConcurrentHashMap<>();
     extraInfo = new HashMap<>();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
index df525bc..f85a350 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.util;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * This class maintains ThreadLocal session params
  */
@@ -31,4 +33,22 @@ public class ThreadLocalSessionInfo {
   public static CarbonSessionInfo getCarbonSessionInfo() {
     return threadLocal.get();
   }
+
+  public static synchronized CarbonSessionInfo getOrCreateCarbonSessionInfo() {
+    CarbonSessionInfo info = threadLocal.get();
+    if (info == null || info.getSessionParams() == null) {
+      info = new CarbonSessionInfo();
+      info.setSessionParams(new SessionParams());
+      threadLocal.set(info);
+    }
+    return info;
+  }
+
+  public static void setConfigurationToCurrentThread(Configuration configuration) {
+    getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo().put("carbonConf", configuration);
+  }
+
+  public static void unsetAll() {
+    threadLocal.remove();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
index 30144c1..5033713 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.impl.DFSFileReaderImpl;
 
 import mockit.Mock;
 import mockit.MockUp;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,7 +46,7 @@ public class DFSFileReaderImplUnitTest {
   private static File fileWithEmptyContent;
 
   @BeforeClass public static void setup() {
-    dfsFileHolder = new DFSFileReaderImpl();
+    dfsFileHolder = new DFSFileReaderImpl(new Configuration());
     file = new File("Test.carbondata");
     fileWithEmptyContent = new File("TestEXception.carbondata");
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 605ec89..bdb17ed 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -165,7 +165,7 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
     // the indexWriter closes the FileSystem on closing the writer, so for a new configuration
     // and disable the cache for the index writer, it will be closed on closing the writer
-    Configuration conf = new Configuration();
+    Configuration conf = FileFactory.getConfiguration();
     conf.set("fs.hdfs.impl.disable.cache", "true");
 
     // create a index writer

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index a54e7a4..0d38906 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -57,15 +58,16 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
   private boolean skipClearDataMapAtClose = false;
 
   public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
-      InputMetricsStats inputMetricsStats) {
-    this(queryModel, readSupport);
+      InputMetricsStats inputMetricsStats, Configuration configuration) {
+    this(queryModel, readSupport, configuration);
     this.inputMetricsStats = inputMetricsStats;
   }
 
-  public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) {
+  public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
+      Configuration configuration) {
     this.queryModel = queryModel;
     this.readSupport = readSupport;
-    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 21ef6cf..3ebd6d6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -583,7 +583,8 @@ m filterExpression
     Configuration configuration = taskAttemptContext.getConfiguration();
     QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
     CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
-    return new CarbonRecordReader<T>(queryModel, readSupport);
+    return new CarbonRecordReader<T>(queryModel, readSupport,
+        taskAttemptContext.getConfiguration());
   }
 
   public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index af2cf83..ec201b9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -132,7 +132,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
-
     CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 5938c20..5cc275b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.processing.loading.DataLoadExecutor;
 import org.apache.carbondata.processing.loading.TableProcessingOperations;
@@ -231,7 +232,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
 
   @Override
   public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
-      TaskAttemptContext taskAttemptContext) throws IOException {
+      final TaskAttemptContext taskAttemptContext) throws IOException {
     final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
     //if loadModel having taskNo already(like in SDK) then no need to overwrite
     short sdkUserCore = loadModel.getSdkUserCores();
@@ -249,10 +250,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext);
     final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
     final ExecutorService executorService = Executors.newFixedThreadPool(1,
-        new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));;
+        new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));
     // It should be started in new thread as the underlying iterator uses blocking queue.
     Future future = executorService.submit(new Thread() {
       @Override public void run() {
+        ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskAttemptContext
+            .getConfiguration());
         try {
           dataLoadExecutor
               .execute(loadModel, tempStoreLocations, iterators);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index af7397b..7641427 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datamap.DataMapJob;
 import org.apache.carbondata.core.datamap.DataMapUtil;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -177,20 +176,4 @@ public class CarbonInputFormatUtil {
     return new JobID(jobtrackerID, batch);
   }
 
-  public static void setS3Configurations(Configuration hadoopConf) {
-    FileFactory.getConfiguration()
-        .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", ""));
-    FileFactory.getConfiguration()
-        .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", ""));
-    FileFactory.getConfiguration()
-        .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", ""));
-    FileFactory.getConfiguration().set(CarbonCommonConstants.S3_ACCESS_KEY,
-        hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, ""));
-    FileFactory.getConfiguration().set(CarbonCommonConstants.S3_SECRET_KEY,
-        hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, ""));
-    FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_ACCESS_KEY,
-        hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, ""));
-    FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_SECRET_KEY,
-        hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, ""));
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
index 57bcca3..4ed2b91 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -63,7 +63,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
 
   public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport,
       InputSplit inputSplit, JobConf jobConf) throws IOException {
-    super(queryModel, readSupport);
+    super(queryModel, readSupport, jobConf);
     initialize(inputSplit, jobConf);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index 5b15b22..3ec815d 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -100,7 +100,8 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
     checkArgument(carbondataSplit.getConnectorId().equals(connectorId),
         "split is not for this connector");
     QueryModel queryModel = createQueryModel(carbondataSplit, columns);
-    QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    QueryExecutor queryExecutor =
+        QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration());
     try {
       CarbonIterator iterator = queryExecutor.execute(queryModel);
       readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
index 32e163a..9935b54 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
@@ -114,7 +114,8 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
     queryModel.setTableBlockInfos(tableBlockInfoList);
     queryModel.setVectorReader(true);
     try {
-      queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+      queryExecutor =
+          QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration());
       iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
     } catch (QueryExecutionException e) {
       throw new InterruptedException(e.getMessage());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 7916932..5a1e140 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -414,7 +414,7 @@ public class CarbonTableReader {
     List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
     CarbonTable carbonTable = tableCacheModel.carbonTable;
     TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo();
-    Configuration config = new Configuration();
+    Configuration config = FileFactory.getConfiguration();
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
     String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
index 127c4c9..2f3b8f4 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
@@ -32,6 +32,7 @@ import com.facebook.presto.tests.DistributedQueryRunner
 import com.google.common.collect.ImmutableMap
 import org.slf4j.{Logger, LoggerFactory}
 
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.presto.CarbondataPlugin
 
 object PrestoServer {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
index 0ac6e72..6cbe747 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
@@ -41,8 +41,7 @@ class LuceneFineGrainDataMapWithSearchModeSuite extends QueryTest with BeforeAnd
     val n = 500000
     sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
     CarbonProperties
-      .getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s")
+      .getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s")
     LuceneFineGrainDataMapSuite.createFile(file2, n)
     sql("create database if not exists lucene")
     sql("use lucene")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
index 062e5ba..c95e5a4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
@@ -146,13 +146,13 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test create table as select with TBLPROPERTIES") {
-    sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
+    sql("DROP TABLE IF EXISTS ctas_tblproperties_testt")
     sql(
-      "create table ctas_tblproperties_test stored by 'carbondata' TBLPROPERTIES" +
+      "create table ctas_tblproperties_testt stored by 'carbondata' TBLPROPERTIES" +
         "('DICTIONARY_INCLUDE'='key', 'sort_scope'='global_sort') as select * from carbon_ctas_test")
-    checkAnswer(sql("select * from ctas_tblproperties_test"), sql("select * from carbon_ctas_test"))
+    checkAnswer(sql("select * from ctas_tblproperties_testt"), sql("select * from carbon_ctas_test"))
     val carbonTable = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore
-      .lookupRelation(Option("default"), "ctas_tblproperties_test")(Spark2TestQueryExecutor.spark)
+      .lookupRelation(Option("default"), "ctas_tblproperties_testt")(Spark2TestQueryExecutor.spark)
       .asInstanceOf[CarbonRelation].carbonTable
     val metadataFolderPath: CarbonFile = FileFactory.getCarbonFile(carbonTable.getMetadataPath)
     assert(metadataFolderPath.exists())
@@ -419,7 +419,7 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
     sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
     sql("DROP TABLE IF EXISTS ctas_select_where_orc")
-    sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
+    sql("DROP TABLE IF EXISTS ctas_tblproperties_testt")
     sql("DROP TABLE IF EXISTS ctas_if_table_name")
     sql("DROP TABLE IF EXISTS source_table")
     sql("DROP TABLE IF EXISTS target_table")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index 36d8c51..8d6dd32 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -37,6 +37,8 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.rdd.SerializableConfiguration
@@ -108,17 +110,18 @@ object CsvRDDHelper {
     closePartition()
 
     // 2. read function
-    val serializableConfiguration = new SerializableConfiguration(jobConf)
+    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
     val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
       override def apply(file: PartitionedFile): Iterator[InternalRow] = {
         new Iterator[InternalRow] {
-          val hadoopConf = serializableConfiguration.value
           val jobTrackerId: String = {
             val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
             formatter.format(new Date())
           }
+          ThreadLocalSessionInfo.setConfigurationToCurrentThread(serializableConfiguration.value)
           val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
-          val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+          val hadoopAttemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration,
+            attemptId)
           val inputSplit =
             new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
           var finished = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index dc238fb..be40b13 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -29,11 +29,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 /**
  * Use sortBy operator in spark to load the data
@@ -64,6 +66,7 @@ object DataLoadProcessBuilderOnSpark {
     val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
     val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
 
+    val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     // 1. Input
     val inputRDD = originRDD
       .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
@@ -73,6 +76,7 @@ object DataLoadProcessBuilderOnSpark {
 
     // 2. Convert
     val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
+      ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
       DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
         convertStepRowCounter)
     }.filter(_ != null)// Filter the bad record
@@ -116,7 +120,7 @@ object DataLoadProcessBuilderOnSpark {
     // 4. Write
     sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
       DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
-        writeStepRowCounter))
+        writeStepRowCounter, conf))
 
     // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
     // not have any functional impact as spark automatically monitors the cache usage on each node

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 73ed769..f17bd91 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.load
 import scala.util.Random
 
 import com.univocity.parsers.common.TextParsingException
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.Row
@@ -29,7 +30,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
 import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -40,7 +42,7 @@ import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImp
 import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
-import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
+import org.apache.carbondata.spark.rdd.{NewRddIterator, SerializableConfiguration, StringArrayRow}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
 
 object DataLoadProcessorStepOnSpark {
@@ -227,7 +229,9 @@ object DataLoadProcessorStepOnSpark {
       rows: Iterator[CarbonRow],
       index: Int,
       modelBroadcast: Broadcast[CarbonLoadModel],
-      rowCounter: Accumulator[Int]) {
+      rowCounter: Accumulator[Int],
+      conf: Broadcast[SerializableConfiguration]) {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
     var model: CarbonLoadModel = null
     var tableName: String = null
     var rowConverter: RowConverterImpl = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 7c1edea..f7aa623 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -17,7 +17,8 @@
 
 package org.apache.carbondata.spark.rdd
 
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -47,15 +48,15 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par
 /**
  * This class is aimed at generating dictionary file for the newly added columns
  */
-class AlterTableAddColumnRDD[K, V](sc: SparkContext,
+class AlterTableAddColumnRDD[K, V](@transient sparkSession: SparkSession,
     @transient newColumns: Seq[ColumnSchema],
     identifier: AbsoluteTableIdentifier)
-  extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
+  extends CarbonRDD[(Int, SegmentStatus)](sparkSession, Nil) {
 
   val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
     CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
       new AddColumnPartition(id, column._2, column._1)
     }.toArray

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index 0dbb4f0..a0d06b8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -17,11 +17,12 @@
 
 package org.apache.carbondata.spark.rdd
 
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.statusmanager.SegmentStatus
@@ -44,12 +45,12 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa
 /**
  * This class is aimed at generating dictionary file for the newly added columns
  */
-class AlterTableDropColumnRDD[K, V](sc: SparkContext,
+class AlterTableDropColumnRDD[K, V](@transient ss: SparkSession,
     @transient newColumns: Seq[ColumnSchema],
     carbonTableIdentifier: AbsoluteTableIdentifier)
-  extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
+  extends CarbonRDD[(Int, SegmentStatus)](ss, Nil) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
       new DropColumnPartition(id, column._2, column._1)
     }.toArray

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 85a6f41..86a5043 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -39,7 +39,8 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     partitionIds: Seq[String],
     bucketId: Int,
     identifier: AbsoluteTableIdentifier,
-    prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
+    prev: RDD[Array[AnyRef]])
+  extends CarbonRDD[(K, V)](alterPartitionModel.sqlContext.sparkSession, prev) {
 
   var storeLocation: String = null
   val carbonLoadModel = alterPartitionModel.carbonLoadModel
@@ -50,14 +51,14 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
   val factTableName = carbonTable.getTableName
   val partitionInfo = carbonTable.getPartitionInfo(factTableName)
 
-  override protected def getPartitions: Array[Partition] = {
+  override protected def internalGetPartitions: Array[Partition] = {
     val sc = alterPartitionModel.sqlContext.sparkContext
     sc.setLocalProperty("spark.scheduler.pool", "DDL")
     sc.setLocalProperty("spark.job.interruptOnCancel", "true")
     firstParent[Array[AnyRef]].partitions
   }
 
-  override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
     val iter = new Iterator[(K, V)] {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index d56e1c2..e2d1eff 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -21,7 +21,8 @@ import java.util
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.indexstore.PartitionSpec
@@ -37,19 +38,19 @@ case class CarbonDropPartition(rddId: Int, val idx: Int, segment: Segment)
 
 /**
  * RDD to drop the partitions from segment files of all segments.
- * @param sc
+ * @param ss
  * @param tablePath
  * @param segments segments to be cleaned
  */
 class CarbonDropPartitionRDD(
-    sc: SparkContext,
+    @transient ss: SparkSession,
     tablePath: String,
     segments: Seq[Segment],
     partitions: util.List[PartitionSpec],
     uniqueId: String)
-  extends CarbonRDD[(String, String)](sc, Nil, sc.hadoopConfiguration) {
+  extends CarbonRDD[(String, String)](ss, Nil) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     segments.zipWithIndex.map {s =>
       CarbonDropPartition(id, s._2, s._1)
     }.toArray

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 2ec8b9c..9265c7f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -31,7 +31,7 @@ import com.univocity.parsers.common.TextParsingException
 import org.apache.commons.lang3.{ArrayUtils, StringUtils}
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SparkSession}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -174,11 +174,12 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
  * @param model a model package load info
  */
 class CarbonAllDictionaryCombineRDD(
+    @transient sparkSession: SparkSession,
     prev: RDD[(String, Iterable[String])],
     model: DictionaryLoadModel)
-  extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
+  extends CarbonRDD[(Int, ColumnDistinctValues)](sparkSession, prev) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     firstParent[(String, Iterable[String])].partitions
   }
 
@@ -267,11 +268,12 @@ class StringArrayRow(var values: Array[String]) extends Row {
  * @param model a model package load info
  */
 class CarbonBlockDistinctValuesCombineRDD(
+    @transient ss: SparkSession,
     prev: RDD[Row],
     model: DictionaryLoadModel)
-  extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
+  extends CarbonRDD[(Int, ColumnDistinctValues)](ss, prev) {
 
-  override def getPartitions: Array[Partition] = firstParent[Row].partitions
+  override def internalGetPartitions: Array[Partition] = firstParent[Row].partitions
   override def internalCompute(split: Partition,
       context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -325,11 +327,14 @@ class CarbonBlockDistinctValuesCombineRDD(
  * @param model a model package load info
  */
 class CarbonGlobalDictionaryGenerateRDD(
+    @transient sparkSession: SparkSession,
     prev: RDD[(Int, ColumnDistinctValues)],
     model: DictionaryLoadModel)
-  extends CarbonRDD[(Int, SegmentStatus)](prev) {
+  extends CarbonRDD[(Int, SegmentStatus)](sparkSession, prev) {
+
+  override def internalGetPartitions: Array[Partition] =
+    firstParent[(Int, ColumnDistinctValues)].partitions
 
-  override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
 
   override def internalCompute(split: Partition,
       context: TaskContext): Iterator[(Int, SegmentStatus)] = {
@@ -492,21 +497,20 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
  * Use external column dict to generate global dictionary
  *
  * @param carbonLoadModel carbon load model
- * @param sparkContext    spark context
+ * @param sparkSession    spark context
  * @param table           carbon table identifier
  * @param dimensions      carbon dimenisons having predefined dict
  * @param dictFolderPath  path of dictionary folder
  */
 class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     dictionaryLoadModel: DictionaryLoadModel,
-    sparkContext: SparkContext,
+    @transient ss: SparkSession,
     table: CarbonTableIdentifier,
     dimensions: Array[CarbonDimension],
     dictFolderPath: String)
-  extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil,
-    sparkContext.hadoopConfiguration) {
+  extends CarbonRDD[(Int, ColumnDistinctValues)](ss, Nil) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     val primDimensions = dictionaryLoadModel.primDimensions
     val primDimLength = primDimensions.length
     val result = new Array[Partition](primDimLength)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 3aaf0ae..762b920 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -24,14 +24,15 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.Partition
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
-import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.api.CarbonInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -41,23 +42,23 @@ import org.apache.carbondata.spark.MergeResult
  * IUD carbon merger RDD
  * */
 class CarbonIUDMergerRDD[K, V](
-    sc: SparkContext,
+    @transient ss: SparkSession,
     result: MergeResult[K, V],
     carbonLoadModel: CarbonLoadModel,
     carbonMergerMapping: CarbonMergerMapping,
     confExecutorsTemp: String)
-  extends CarbonMergerRDD[K, V](sc,
+  extends CarbonMergerRDD[K, V](ss,
     result,
     carbonLoadModel,
     carbonMergerMapping,
     confExecutorsTemp) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
       tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
-    val jobConf: JobConf = new JobConf(new Configuration)
+    val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index f9f65a7..a0425b7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block._
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
@@ -60,15 +61,15 @@ import org.apache.carbondata.spark.MergeResult
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
 class CarbonMergerRDD[K, V](
-    sc: SparkContext,
+    @transient ss: SparkSession,
     result: MergeResult[K, V],
     carbonLoadModel: CarbonLoadModel,
     carbonMergerMapping: CarbonMergerMapping,
     confExecutorsTemp: String)
-  extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
+  extends CarbonRDD[(K, V)](ss, Nil) {
 
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-  sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+  ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
+  ss.sparkContext.setLocalProperty("spark.job.interruptOnCancel", "true")
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
   var storeLocation: String = null
@@ -183,7 +184,7 @@ class CarbonMergerRDD[K, V](
         }
         try {
           // fire a query and get the results.
-          rawResultIteratorList = exec.processTableBlocks()
+          rawResultIteratorList = exec.processTableBlocks(FileFactory.getConfiguration)
         } catch {
           case e: Throwable =>
             LOGGER.error(e)
@@ -269,7 +270,7 @@ class CarbonMergerRDD[K, V](
     iter
   }
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
       tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
@@ -277,7 +278,7 @@ class CarbonMergerRDD[K, V](
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
       carbonTable)
-    val jobConf: JobConf = new JobConf(new Configuration)
+    val jobConf: JobConf = new JobConf(getConf)
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 54a7530..04f20b1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -17,28 +17,24 @@
 
 package org.apache.carbondata.spark.rdd
 
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
-
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.util.SparkSQLUtil
 
-import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util._
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 
 /**
  * This RDD maintains session level ThreadLocal
  */
-abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
-    @transient private var deps: Seq[Dependency[_]],
-    @transient hadoopConf: Configuration) extends RDD[T](sc, deps) {
+abstract class CarbonRDD[T: ClassTag](@transient ss: SparkSession,
+    @transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) {
 
   val carbonSessionInfo: CarbonSessionInfo = {
     var info = ThreadLocalSessionInfo.getCarbonSessionInfo
@@ -50,24 +46,27 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
     info
   }
 
-  private val confBytes = {
-    val bao = new ByteArrayOutputStream()
-    val oos = new ObjectOutputStream(bao)
-    hadoopConf.write(oos)
-    oos.close()
-    CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
-  }
+  val config: Broadcast[SerializableConfiguration] = sparkContext
+    .broadcast(new SerializableConfiguration(SparkSQLUtil.sessionState(ss).newHadoopConf()))
 
   /** Construct an RDD with just a one-to-one dependency on one parent */
-  def this(@transient oneParent: RDD[_]) =
-    this (oneParent.context, List(new OneToOneDependency(oneParent)),
-      oneParent.sparkContext.hadoopConfiguration)
+  def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) =
+    this (sparkSession, List(new OneToOneDependency(oneParent)))
+
+  protected def internalGetPartitions: Array[Partition]
+
+  override def getPartitions: Array[Partition] = {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(config.value.value)
+    internalGetPartitions
+  }
 
   // RDD compute logic should be here
   def internalCompute(split: Partition, context: TaskContext): Iterator[T]
 
   final def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    CarbonInputFormatUtil.setS3Configurations(getConf)
+    TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll())
+    carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", config
+      .value.value)
     ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
     TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
     val carbonTaskInfo = new CarbonTaskInfo
@@ -79,13 +78,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
   }
 
   def getConf: Configuration = {
-    val configuration = new Configuration(false)
-    val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
-      .unCompressByte(confBytes))
-    val ois = new ObjectInputStream(bai)
-    configuration.readFields(ois)
-    ois.close()
-    configuration
+    config.value.value
   }
 }
 
@@ -93,12 +86,14 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
  * This RDD contains TableInfo object which is serialized and deserialized in driver and executor
  */
 abstract class CarbonRDDWithTableInfo[T: ClassTag](
-    @transient sc: SparkContext,
+    @transient ss: SparkSession,
     @transient private var deps: Seq[Dependency[_]],
-    serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps, sc.hadoopConfiguration) {
+    serializedTableInfo: Array[Byte]) extends CarbonRDD[T](ss, deps) {
 
-  def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) =
-    this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo)
+  def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_],
+      serializedTableInfo: Array[Byte]) = {
+    this (sparkSession, List(new OneToOneDependency(oneParent)), serializedTableInfo)
+  }
 
   def getTableInfo: TableInfo = TableInfo.deserialize(serializedTableInfo)
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index 9452777..241720a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.AlterPartitionModel
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.util.CarbonException
@@ -36,6 +37,7 @@ import org.apache.spark.util.PartitionUtils
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -65,7 +67,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
     absoluteTableIdentifier: AbsoluteTableIdentifier,
     partitionIds: Seq[String],
     bucketId: Int)
-  extends RDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkContext, Nil) {
+  extends CarbonRDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkSession, Nil) {
 
   private val queryId = alterPartitionModel.sqlContext.sparkContext.getConf
     .get("queryId", System.nanoTime() + "")
@@ -91,9 +93,9 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
   val dictionaryIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]()
   val measureIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]()
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     val parallelism = sparkContext.defaultParallelism
-    val jobConf = new JobConf(new Configuration)
+    val jobConf = new JobConf(FileFactory.getConfiguration)
     val job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonTableInputFormat(absoluteTableIdentifier,
       partitionIds.toList.asJava, job)
@@ -127,8 +129,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
     result.toArray(new Array[Partition](result.size()))
   }
 
-  override def compute(split: Partition, context: TaskContext):
-    Iterator[(AnyRef, Array[AnyRef])] = {
+  override def internalCompute(split: Partition, context: TaskContext):
+  Iterator[(AnyRef, Array[AnyRef])] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
       var exec : CarbonSplitExecutor = null
       val rows : java.util.List[(AnyRef, Array[AnyRef])] = new ArrayList[(AnyRef, Array[AnyRef])]()
@@ -142,7 +144,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
         var result : java.util.List[PartitionSpliterRawResultIterator] = null
         try {
           exec = new CarbonSplitExecutor(segmentMapping, carbonTable)
-          result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl())
+          result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl(),
+            FileFactory.getConfiguration)
         } catch {
           case e: Throwable =>
             LOGGER.error(e)


[2/3] carbondata git commit: [CARBONDATA-2844] [CARBONDATA-2865] Pass SK/AK to executor by serializing hadoop configuration from driver.

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e60d5b8..f5d96fc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -44,6 +44,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
 import org.apache.carbondata.core.datastore.block.Distributable
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
@@ -79,7 +80,7 @@ class CarbonScanRDD[T: ClassTag](
     @transient val partitionNames: Seq[PartitionSpec],
     val dataTypeConverterClz: Class[_ <: DataTypeConverter] = classOf[SparkDataTypeConverterImpl],
     val readSupportClz: Class[_ <: CarbonReadSupport[_]] = SparkReadSupport.readSupportClass)
-  extends CarbonRDDWithTableInfo[T](spark.sparkContext, Nil, serializedTableInfo) {
+  extends CarbonRDDWithTableInfo[T](spark, Nil, serializedTableInfo) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
   private val jobTrackerId: String = {
@@ -92,7 +93,7 @@ class CarbonScanRDD[T: ClassTag](
 
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
     var partitions: Array[Partition] = Array.empty[Partition]
     var getSplitsStartTime: Long = -1
@@ -105,7 +106,7 @@ class CarbonScanRDD[T: ClassTag](
     var numBlocks = 0
 
     try {
-      val conf = new Configuration()
+      val conf = FileFactory.getConfiguration
       val jobConf = new JobConf(conf)
       SparkHadoopUtil.get.addCredentials(jobConf)
       val job = Job.getInstance(jobConf)
@@ -405,7 +406,7 @@ class CarbonScanRDD[T: ClassTag](
     val executionId = context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     val taskId = split.index
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
-    val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
+    val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
     val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
     val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
     TaskMetricsMap.getInstance().registerThreadCallback()
@@ -436,14 +437,16 @@ class CarbonScanRDD[T: ClassTag](
               "true")
             if (carbonRecordReader == null) {
               new CarbonRecordReader(model,
-                format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats)
+                format.getReadSupportClass(attemptContext.getConfiguration),
+                inputMetricsStats,
+                attemptContext.getConfiguration)
             } else {
               carbonRecordReader
             }
           } else {
             new CarbonRecordReader(model,
               format.getReadSupportClass(attemptContext.getConfiguration),
-              inputMetricsStats)
+              inputMetricsStats, attemptContext.getConfiguration)
           }
       }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 3848bad..1ada51b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.util.SparkUtil
 
@@ -41,11 +41,11 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalSessionInfo, ThreadLocalTaskInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -160,29 +160,20 @@ class SparkPartitionLoader(model: CarbonLoadModel,
  * It loads the data to carbon using @AbstractDataLoadProcessorStep
  */
 class NewCarbonDataLoadRDD[K, V](
-    sc: SparkContext,
+    @transient ss: SparkSession,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    blocksGroupBy: Array[(String, Array[BlockDetails])],
-    @transient hadoopConf: Configuration)
-  extends CarbonRDD[(K, V)](sc, Nil, hadoopConf) {
+    blocksGroupBy: Array[(String, Array[BlockDetails])])
+  extends CarbonRDD[(K, V)](ss, Nil) {
 
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+  ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
 
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     formatter.format(new Date())
   }
 
-  private val confBytes = {
-    val bao = new ByteArrayOutputStream()
-    val oos = new ObjectOutputStream(bao)
-    hadoopConf.write(oos)
-    oos.close()
-    CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
-  }
-
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     blocksGroupBy.zipWithIndex.map { b =>
       new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
     }
@@ -245,10 +236,7 @@ class NewCarbonDataLoadRDD[K, V](
 
       def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
         val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, theSplit.index, 0)
-        var configuration: Configuration = getConf
-        if (configuration == null) {
-          configuration = new Configuration()
-        }
+        val configuration: Configuration = FileFactory.getConfiguration
         CommonUtil.configureCSVInputFormat(configuration, carbonLoadModel)
         val hadoopAttemptContext = new TaskAttemptContextImpl(configuration, attemptId)
         val format = new CSVInputFormat
@@ -319,24 +307,13 @@ class NewCarbonDataLoadRDD[K, V](
  *  @see org.apache.carbondata.processing.newflow.DataLoadExecutor
  */
 class NewDataFrameLoaderRDD[K, V](
-    sc: SparkContext,
+    @transient ss: SparkSession,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    prev: DataLoadCoalescedRDD[Row],
-    @transient hadoopConf: Configuration) extends CarbonRDD[(K, V)](prev) {
-
-  private val confBytes = {
-    val bao = new ByteArrayOutputStream()
-    val oos = new ObjectOutputStream(bao)
-    hadoopConf.write(oos)
-    oos.close()
-    CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
-  }
+    prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](ss, prev) {
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val hadoopConf = getConf
-    CarbonInputFormatUtil.setS3Configurations(hadoopConf)
     val iter = new Iterator[(K, V)] {
       val loadMetadataDetails = new LoadMetadataDetails()
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
@@ -404,7 +381,7 @@ class NewDataFrameLoaderRDD[K, V](
     }
     iter
   }
-  override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+  override protected def internalGetPartitions: Array[Partition] = firstParent[Row].partitions
 }
 
 /**
@@ -528,10 +505,10 @@ class LazyRddIterator(serializer: SerializerInstance,
  *  @see org.apache.carbondata.processing.newflow.DataLoadExecutor
  */
 class PartitionTableDataLoaderRDD[K, V](
-    sc: SparkContext,
+    @transient ss: SparkSession,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    prev: RDD[Row]) extends CarbonRDD[(K, V)](prev) {
+    prev: RDD[Row]) extends CarbonRDD[(K, V)](ss, prev) {
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -596,6 +573,6 @@ class PartitionTableDataLoaderRDD[K, V](
     iter
   }
 
-  override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+  override protected def internalGetPartitions: Array[Partition] = firstParent[Row].partitions
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 43ee31b..b8e73d5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -25,9 +25,12 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException}
+import org.apache.spark.{Partition, TaskContext, TaskKilledException}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.core.datamap.{AbstractDataMapJob, DistributableDataMapFormat}
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
 
@@ -38,8 +41,8 @@ class SparkDataMapJob extends AbstractDataMapJob {
 
   override def execute(dataMapFormat: DistributableDataMapFormat,
       filter: FilterResolverIntf): util.List[ExtendedBlocklet] = {
-    new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, filter).collect().toList
-      .asJava
+    new DataMapPruneRDD(SparkSQLUtil.getSparkSession, dataMapFormat, filter).collect()
+      .toList.asJava
   }
 }
 
@@ -51,13 +54,13 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte
 
 /**
  * RDD to prune the datamaps across spark cluster
- * @param sc
+ * @param ss
  * @param dataMapFormat
  */
-class DataMapPruneRDD(sc: SparkContext,
+class DataMapPruneRDD(@transient ss: SparkSession,
     dataMapFormat: DistributableDataMapFormat,
     resolverIntf: FilterResolverIntf)
-  extends CarbonRDD[(ExtendedBlocklet)](sc, Nil, sc.hadoopConfiguration) {
+  extends CarbonRDD[(ExtendedBlocklet)](ss, Nil) {
 
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -67,7 +70,7 @@ class DataMapPruneRDD(sc: SparkContext,
   override def internalCompute(split: Partition,
       context: TaskContext): Iterator[ExtendedBlocklet] = {
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
-    val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
+    val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
     val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
     val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
     reader.initialize(inputSplit, attemptContext)
@@ -102,8 +105,8 @@ class DataMapPruneRDD(sc: SparkContext,
     iter
   }
 
-  override protected def getPartitions: Array[Partition] = {
-    val job = Job.getInstance(new Configuration())
+  override protected def internalGetPartitions: Array[Partition] = {
+    val job = Job.getInstance(FileFactory.getConfiguration)
     val splits = dataMapFormat.getSplits(job)
     splits.asScala.zipWithIndex.map(f => new DataMapRDDPartition(id, f._2, f._1)).toArray
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index f7bbf06..39e1875 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -98,11 +98,10 @@ class StreamingRawResultIterator(
  * execute streaming segment handoff
  */
 class StreamHandoffRDD[K, V](
-    sc: SparkContext,
+    @transient ss: SparkSession,
     result: HandoffResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    handOffSegmentId: String
-) extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
+    handOffSegmentId: String) extends CarbonRDD[(K, V)](ss, Nil) {
 
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -111,8 +110,7 @@ class StreamHandoffRDD[K, V](
 
   override def internalCompute(
       split: Partition,
-      context: TaskContext
-  ): Iterator[(K, V)] = {
+      context: TaskContext): Iterator[(K, V)] = {
     carbonLoadModel.setTaskNo("" + split.index)
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
@@ -148,7 +146,7 @@ class StreamHandoffRDD[K, V](
   ): util.ArrayList[RawResultIterator] = {
     val inputSplit = split.asInstanceOf[HandoffPartition].split.value
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
-    val hadoopConf = new Configuration()
+    val hadoopConf = getConf
     CarbonInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
     CarbonInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
     CarbonInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
@@ -200,7 +198,7 @@ class StreamHandoffRDD[K, V](
   /**
    * get the partitions of the handoff segment
    */
-  override protected def getPartitions: Array[Partition] = {
+  override protected def internalGetPartitions: Array[Partition] = {
     val job = Job.getInstance(FileFactory.getConfiguration)
     val inputFormat = new CarbonTableInputFormat[Array[Object]]()
     val segmentList = new util.ArrayList[Segment](1)
@@ -323,7 +321,7 @@ object StreamHandoffRDD {
       // convert a streaming segment to columnar segment
 
       val status = new StreamHandoffRDD(
-        sparkSession.sparkContext,
+        sparkSession,
         new HandoffResultImpl(),
         carbonLoadModel,
         handoffSegmenId).collect()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 1cd4d77..e79c63b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, SparkEnv}
 import org.apache.spark.rdd.CarbonMergeFilesRDD
-import org.apache.spark.sql.{Row, RowFactory}
+import org.apache.spark.sql.{Row, RowFactory, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
@@ -839,7 +839,7 @@ object CommonUtil {
    *                                         which do not store the blocklet info to current
    *                                         version
    */
-  def mergeIndexFiles(sparkContext: SparkContext,
+  def mergeIndexFiles(sparkSession: SparkSession,
     segmentIds: Seq[String],
     segmentFileNameToSegmentIdMap: java.util.Map[String, String],
     tablePath: String,
@@ -848,7 +848,7 @@ object CommonUtil {
     readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
     if (mergeIndexProperty) {
       new CarbonMergeFilesRDD(
-        sparkContext,
+        sparkSession,
         carbonTable,
         segmentIds,
         segmentFileNameToSegmentIdMap,
@@ -860,7 +860,7 @@ object CommonUtil {
           CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
           CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
           new CarbonMergeFilesRDD(
-            sparkContext,
+            sparkSession,
             carbonTable,
             segmentIds,
             segmentFileNameToSegmentIdMap,
@@ -871,7 +871,7 @@ object CommonUtil {
         case _: Exception =>
           if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
             new CarbonMergeFilesRDD(
-              sparkContext,
+              sparkSession,
               carbonTable,
               segmentIds,
               segmentFileNameToSegmentIdMap,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 1bb3912..67c4c9b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -461,9 +461,11 @@ object GlobalDictionaryUtil {
       dictFolderPath, forPreDefDict = true)
     // new RDD to achieve distributed column dict generation
     val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel,
-      sqlContext.sparkContext, table, dimensions, dictFolderPath)
+      sqlContext.sparkSession, table, dimensions, dictFolderPath)
       .partitionBy(new ColumnPartitioner(dictLoadModel.primDimensions.length))
-    val statusList = new CarbonGlobalDictionaryGenerateRDD(extInputRDD, dictLoadModel).collect()
+    val statusList = new CarbonGlobalDictionaryGenerateRDD(sqlContext.sparkSession, extInputRDD,
+      dictLoadModel)
+      .collect()
     // check result status
     checkStatus(carbonLoadModel, sqlContext, dictLoadModel, statusList)
   }
@@ -670,10 +672,13 @@ object GlobalDictionaryUtil {
           val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
             requireDimension, dictfolderPath, false)
           // combine distinct value in a block and partition by column
-          val inputRDD = new CarbonBlockDistinctValuesCombineRDD(dictRdd, model)
+          val inputRDD = new CarbonBlockDistinctValuesCombineRDD(sqlContext.sparkSession, dictRdd,
+            model)
             .partitionBy(new ColumnPartitioner(model.primDimensions.length))
           // generate global dictionary files
-          val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
+          val statusList = new CarbonGlobalDictionaryGenerateRDD(sqlContext.sparkSession,
+            inputRDD, model)
+            .collect()
           // check result status
           checkStatus(carbonLoadModel, sqlContext, model, statusList)
         } else {
@@ -731,10 +736,13 @@ object GlobalDictionaryUtil {
         val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers,
           requireColumnNames, allDictionaryPathAppended, accumulator)
         // read exist dictionary and combine
-        val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)
+        val inputRDD = new CarbonAllDictionaryCombineRDD(sqlContext.sparkSession,
+          allDictionaryRdd, model)
           .partitionBy(new ColumnPartitioner(model.primDimensions.length))
         // generate global dictionary files
-        val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
+        val statusList = new CarbonGlobalDictionaryGenerateRDD(sqlContext.sparkSession, inputRDD,
+          model)
+          .collect()
         // check result status
         checkStatus(carbonLoadModel, sqlContext, model, statusList)
         // if the dictionary contains wrong format record, throw ex

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index 1acdf7e..b5147f0 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -35,20 +36,20 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String)
 
 /**
  * RDD to merge all carbonindex files of each segment to carbonindex file into the same segment.
- * @param sc
+ * @param ss
  * @param carbonTable
  * @param segments segments to be merged
  */
 class CarbonMergeFilesRDD(
-  sc: SparkContext,
+  @transient ss: SparkSession,
   carbonTable: CarbonTable,
   segments: Seq[String],
   segmentFileNameToSegmentIdMap: java.util.Map[String, String],
   isHivePartitionedTable: Boolean,
   readFileFooterFromCarbonDataFile: Boolean)
-  extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) {
+  extends CarbonRDD[String](ss, Nil) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     segments.zipWithIndex.map {s =>
       CarbonMergeFilePartition(id, s._2, s._1)
     }.toArray

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
index 6a97477..2854c91 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
 import scala.reflect.ClassTag
 
 import org.apache.spark._
+import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.spark.rdd.CarbonRDD
 
@@ -27,12 +28,12 @@ import org.apache.carbondata.spark.rdd.CarbonRDD
 case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
 
 class DataLoadCoalescedRDD[T: ClassTag](
+    @transient sparkSession: SparkSession,
     @transient var prev: RDD[T],
     nodeList: Array[String])
-  extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil,
-    prev.sparkContext.hadoopConfiguration) {
+  extends CarbonRDD[DataLoadPartitionWrap[T]](sparkSession, Nil) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 1b48c08..da22658 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -117,7 +117,7 @@ case class CarbonMergerMapping(
     var maxSegmentColCardinality: Array[Int],
     // maxSegmentColumnSchemaList is list of column schema of last segment of compaction
     var maxSegmentColumnSchemaList: List[ColumnSchema],
-    currentPartitions: Option[Seq[PartitionSpec]])
+    @transient currentPartitions: Option[Seq[PartitionSpec]])
 
 case class NodeInfo(TaskId: String, noOfBlocks: Int)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 2e31a82..b5fda85 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -27,4 +27,8 @@ object SparkSQLUtil {
   def execute(logicalPlan: LogicalPlan, sparkSession: SparkSession): DataFrame = {
     Dataset.ofRows(sparkSession, logicalPlan)
   }
+
+  def getSparkSession: SparkSession = {
+    SparkSession.getDefaultSession.get
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index fdbf400..e2aa9ae 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -159,7 +159,7 @@ object PartitionUtils {
       partitionIds: List[String], oldPartitionIdList: List[Int],
       partitionInfo: PartitionInfo,
       carbonTable: CarbonTable): java.util.List[TableBlockInfo] = {
-    val jobConf = new JobConf(new Configuration)
+    val jobConf = new JobConf(FileFactory.getConfiguration)
     val job = new Job(jobConf)
     val format = CarbonInputFormatUtil
       .createCarbonTableInputFormat(identifier, partitionIds.asJava, job)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 2c53d20..f237552 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -136,7 +136,8 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
     queryModel.setTableBlockInfos(tableBlockInfoList);
     queryModel.setVectorReader(true);
     try {
-      queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+      queryExecutor =
+          QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration());
       iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
     } catch (QueryExecutionException e) {
       if (ExceptionUtils.indexOfThrowable(e, FileNotFoundException.class) > 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index d321cab..81b395e 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -341,7 +341,8 @@ class SparkCarbonFileFormat extends FileFormat
         split.setDetailInfo(info)
         info.setBlockSize(file.length)
         // Read the footer offset and set.
-        val reader = FileFactory.getFileHolder(FileFactory.getFileType(file.filePath))
+        val reader = FileFactory.getFileHolder(FileFactory.getFileType(file.filePath),
+          broadcastedHadoopConf.value.value)
         val buffer = reader
           .readByteBuffer(FileFactory.getUpdatedFilePath(file.filePath), file.length - 8, 8)
         info.setBlockFooterOffset(buffer.getLong)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 8309065..82c64a4 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -292,14 +292,13 @@ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Ar
 }
 
 class IndexDataMapRebuildRDD[K, V](
-    session: SparkSession,
+    @transient session: SparkSession,
     result: RefreshResult[K, V],
     @transient tableInfo: TableInfo,
     dataMapName: String,
     indexColumns: Array[CarbonColumn],
-    segments: Set[Segment]
-) extends CarbonRDDWithTableInfo[(K, V)](
-  session.sparkContext, Nil, tableInfo.serialize()) {
+    segments: Set[Segment])
+  extends CarbonRDDWithTableInfo[(K, V)](session, Nil, tableInfo.serialize()) {
 
   private val dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
@@ -323,7 +322,7 @@ class IndexDataMapRebuildRDD[K, V](
       inputMetrics.initBytesReadCallback(context, inputSplit)
 
       val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
-      val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
+      val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
       val format = createInputFormat(segment.get, attemptContext)
 
       val model = format.createQueryModel(inputSplit, attemptContext)
@@ -351,7 +350,8 @@ class IndexDataMapRebuildRDD[K, V](
         } else {
           new OriginalReadSupport(indexColumns.map(_.getDataType))
         }
-        reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics)
+        reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics,
+          attemptContext.getConfiguration)
         reader.initialize(inputSplit, attemptContext)
         // skip clear datamap and we will do this adter rebuild
         reader.setSkipClearDataMapAtClose(true)
@@ -439,11 +439,11 @@ class IndexDataMapRebuildRDD[K, V](
     format
   }
 
-  override protected def getPartitions = {
+  override protected def internalGetPartitions = {
     if (!dataMapSchema.isIndexDataMap) {
       throw new UnsupportedOperationException
     }
-    val conf = new Configuration()
+    val conf = FileFactory.getConfiguration
     val jobConf = new JobConf(conf)
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job = Job.getInstance(jobConf)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6e322b3..0fd4e34 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -62,9 +62,10 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
@@ -329,7 +330,8 @@ object CarbonDataRDDFactory {
             dataFrame,
             carbonLoadModel,
             updateModel,
-            carbonTable)
+            carbonTable,
+            hadoopConf)
           res.foreach { resultOfSeg =>
             resultOfSeg.foreach { resultOfBlock =>
               if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
@@ -680,7 +682,8 @@ object CarbonDataRDDFactory {
       dataFrame: Option[DataFrame],
       carbonLoadModel: CarbonLoadModel,
       updateModel: Option[UpdateTableModel],
-      carbonTable: CarbonTable): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
+      carbonTable: CarbonTable,
+      hadoopConf: Configuration): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
     val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate
 
     val updateRdd = dataFrame.get.rdd
@@ -720,7 +723,9 @@ object CarbonDataRDDFactory {
 
       // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
       // so segmentIdIndex=partitionId/parallelism, this has been verified.
+      val conf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
       partitionByRdd.map(_._2).mapPartitions { partition =>
+        ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
         val partitionId = TaskContext.getPartitionId()
         val segIdIndex = partitionId / segmentUpdateParallelism
         val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
@@ -1070,7 +1075,7 @@ object CarbonDataRDDFactory {
     try {
       val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
       new PartitionTableDataLoaderRDD(
-        sqlContext.sparkContext,
+        sqlContext.sparkSession,
         new DataLoadResultImpl(),
         carbonLoadModel,
         rdd
@@ -1099,14 +1104,14 @@ object CarbonDataRDDFactory {
       val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
         nodeNumOfData,
         sqlContext.sparkContext)
-      val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
+      val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray
+        .distinct)
 
       new NewDataFrameLoaderRDD(
-        sqlContext.sparkContext,
+        sqlContext.sparkSession,
         new DataLoadResultImpl(),
         carbonLoadModel,
-        newRdd,
-        sqlContext.sparkContext.hadoopConfiguration
+        newRdd
       ).collect()
     } catch {
       case ex: Exception =>
@@ -1207,11 +1212,10 @@ object CarbonDataRDDFactory {
     }.toArray
 
     new NewCarbonDataLoadRDD(
-      sqlContext.sparkContext,
+      sqlContext.sparkSession,
       new DataLoadResultImpl(),
       carbonLoadModel,
-      blocksGroupBy,
-      hadoopConf
+      blocksGroupBy
     ).collect()
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index fcc649e..d9884e1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -186,7 +186,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     val mergeStatus =
       if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
         new CarbonIUDMergerRDD(
-          sc.sparkContext,
+          sc.sparkSession,
           new MergeResultImpl(),
           carbonLoadModel,
           carbonMergerMapping,
@@ -194,7 +194,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         ).collect
       } else {
         new CarbonMergerRDD(
-          sc.sparkContext,
+          sc.sparkSession,
           new MergeResultImpl(),
           carbonLoadModel,
           carbonMergerMapping,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 6622246..9b78db0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -31,9 +31,11 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.optimizer.CarbonFilters
 
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 
@@ -44,6 +46,8 @@ case class CarbonCountStar(
     outUnsafeRows: Boolean = true) extends LeafExecNode {
 
   override def doExecute(): RDD[InternalRow] = {
+    ThreadLocalSessionInfo
+      .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val (job, tableInputFormat) = createCarbonInputFormat(absoluteTableIdentifier)
     CarbonInputFormat.setQuerySegment(job.getConfiguration, absoluteTableIdentifier)
@@ -73,7 +77,7 @@ case class CarbonCountStar(
   private def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier
   ): (Job, CarbonTableInputFormat[Array[Object]]) = {
     val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
-    val jobConf: JobConf = new JobConf(new Configuration)
+    val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
     SparkHadoopUtil.get.addCredentials(jobConf)
     CarbonInputFormat.setTableInfo(jobConf, carbonTable.getTableInfo)
     val job = new Job(jobConf)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index b5842a9..8a0404c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -56,7 +56,6 @@ case class CarbonDatasourceHadoopRelation(
     caseInsensitiveMap("tablename"))
   lazy val databaseName: String = carbonTable.getDatabaseName
   lazy val tableName: String = carbonTable.getTableName
-  CarbonInputFormatUtil.setS3Configurations(sparkSession.sessionState.newHadoopConf())
   CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
 
   @transient lazy val carbonRelation: CarbonRelation =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 34a8dce..d0ed56e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -41,9 +41,10 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.scan.executor.util.QueryUtil
-import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.rdd.CarbonRDDWithTableInfo
+import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, SerializableConfiguration}
 
 /**
  * It decodes the data.
@@ -75,9 +76,12 @@ case class CarbonDictionaryDecoder(
         (carbonTable.getTableName, carbonTable)
       }.toMap
 
+      val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
+        .sessionState.newHadoopConf()))
       if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
         val dataTypes = child.output.map { attr => attr.dataType }
         child.execute().mapPartitions { iter =>
+          ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
           val cacheProvider: CacheProvider = CacheProvider.getInstance
           val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
             cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
@@ -439,7 +443,9 @@ class CarbonDecoderRDD(
     val prev: RDD[InternalRow],
     output: Seq[Attribute],
     serializedTableInfo: Array[Byte])
-  extends CarbonRDDWithTableInfo[InternalRow](prev, serializedTableInfo) {
+  extends CarbonRDDWithTableInfo[InternalRow](relations.head.carbonRelation.sparkSession,
+    prev,
+    serializedTableInfo) {
 
   def canBeDecoded(attr: Attribute): Boolean = {
     profile match {
@@ -543,7 +549,8 @@ class CarbonDecoderRDD(
     dicts
   }
 
-  override protected def getPartitions: Array[Partition] = firstParent[InternalRow].partitions
+  override protected def internalGetPartitions: Array[Partition] =
+    firstParent[InternalRow].partitions
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 7f26888..8253c4d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,11 +17,8 @@
 
 package org.apache.spark.sql
 
-import java.io.File
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.util.Try
-
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -29,7 +26,6 @@ import org.apache.spark.sql.events.MergeIndexEventListener
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive._
-import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -101,6 +97,8 @@ class CarbonEnv {
           threadLevelCarbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams)
         }
         ThreadLocalSessionInfo.setCarbonSessionInfo(threadLevelCarbonSessionInfo)
+        ThreadLocalSessionInfo.setConfigurationToCurrentThread(sparkSession
+          .sessionState.newHadoopConf())
         val config = new CarbonSQLConf(sparkSession)
         if (sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT).isEmpty) {
           config.addDefaultCarbonParams()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index c59bb08..5af64ff 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -40,7 +40,6 @@ import org.apache.carbondata.common.annotations.InterfaceAudience
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.store.SparkCarbonStore
 import org.apache.carbondata.streaming.CarbonStreamingQueryListener
 
@@ -432,6 +431,8 @@ object CarbonSession {
     }
     // preserve thread parameters across call
     ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+    ThreadLocalSessionInfo
+      .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index b162294..693a8c4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -329,7 +329,6 @@ object CarbonSource {
         .contains("true")
       tableInfo.setTransactionalTable(isTransactionalTable)
       if (isTransactionalTable && !metaStore.isReadFromHiveMetaStore) {
-        CarbonInputFormatUtil.setS3Configurations(sparkSession.sessionState.newHadoopConf())
         // save to disk
         metaStore.saveToDisk(tableInfo, properties("tablePath"))
         // remove schema string from map as we don't store carbon schema to hive metastore

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 5bff9aa..24ef0db 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -54,7 +54,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
         val sparkSession = SparkSession.getActiveSession.get
         if(!carbonTable.isStreamingSink) {
           if (null != compactedSegments && !compactedSegments.isEmpty) {
-            mergeIndexFilesForCompactedSegments(sparkSession.sparkContext,
+            mergeIndexFilesForCompactedSegments(sparkSession,
               carbonTable,
               compactedSegments)
           } else {
@@ -63,7 +63,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
 
             segmentFileNameMap
               .put(loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp))
-            CommonUtil.mergeIndexFiles(sparkSession.sparkContext,
+            CommonUtil.mergeIndexFiles(sparkSession,
               Seq(loadModel.getSegmentId),
               segmentFileNameMap,
               carbonTable.getTablePath,
@@ -77,9 +77,9 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
         val alterTableCompactionPostEvent = event.asInstanceOf[AlterTableCompactionPostEvent]
         val carbonTable = alterTableCompactionPostEvent.carbonTable
         val mergedLoads = alterTableCompactionPostEvent.compactedLoads
-        val sparkContext = alterTableCompactionPostEvent.sparkSession.sparkContext
+        val sparkSession = alterTableCompactionPostEvent.sparkSession
         if(!carbonTable.isStreamingSink) {
-          mergeIndexFilesForCompactedSegments(sparkContext, carbonTable, mergedLoads)
+          mergeIndexFilesForCompactedSegments(sparkSession, carbonTable, mergedLoads)
         }
       case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
         val exceptionEvent = event.asInstanceOf[AlterTableMergeIndexEvent]
@@ -123,7 +123,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
               // store (store <= 1.1 version) and create merge Index file as per new store so that
               // old store is also upgraded to new store
               CommonUtil.mergeIndexFiles(
-                sparkContext = sparkSession.sparkContext,
+                sparkSession = sparkSession,
                 segmentIds = validSegmentIds,
                 segmentFileNameToSegmentIdMap = segmentFileNameMap,
                 tablePath = carbonMainTable.getTablePath,
@@ -155,7 +155,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
     }
   }
 
-  def mergeIndexFilesForCompactedSegments(sparkContext: SparkContext,
+  def mergeIndexFilesForCompactedSegments(sparkSession: SparkSession,
     carbonTable: CarbonTable,
     mergedLoads: util.List[String]): Unit = {
     // get only the valid segments of the table
@@ -182,7 +182,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
     val validMergedSegIds = validSegments
       .filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) }.map(_.getSegmentNo)
     if (null != validMergedSegIds && !validMergedSegIds.isEmpty) {
-      CommonUtil.mergeIndexFiles(sparkContext,
+      CommonUtil.mergeIndexFiles(sparkSession,
           validMergedSegIds,
           segmentFileNameMap,
           carbonTable.getTablePath,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 6c74ad2..7cf8c1e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -18,15 +18,13 @@
 package org.apache.spark.sql.execution.command.management
 
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.util.CarbonSparkUtil
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 
 case class CarbonInsertIntoCommand(
     relation: CarbonDatasourceHadoopRelation,
@@ -45,6 +43,9 @@ case class CarbonInsertIntoCommand(
         case other => false
       } isDefined
     }
+
+    ThreadLocalSessionInfo
+      .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
     val isPersistEnabledUserValue = CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
         CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 460d7e6..516f9af 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.management
 
 import java.text.SimpleDateFormat
 import java.util
-import java.util.{List, UUID}
+import java.util.UUID
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -33,7 +33,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Expression, GenericInternalRow, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
@@ -59,16 +59,16 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICar
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil, ObjectSerializationUtil}
+import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -77,8 +77,8 @@ import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataPro
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, SerializableConfiguration}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
 
 case class CarbonLoadDataCommand(
     databaseNameOp: Option[String],
@@ -977,8 +977,11 @@ case class CarbonLoadDataCommand(
           array
         }
       }
-    val finalRDD = convertRDD.mapPartitionsWithIndex {case(index, rows) =>
+    val conf = sparkSession.sparkContext
+      .broadcast(new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()))
+    val finalRDD = convertRDD.mapPartitionsWithIndex { case(index, rows) =>
         DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+      ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
         DataLoadProcessorStepOnSpark.inputAndconvertFunc(
           rows,
           index,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 8633243..b77632d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -41,13 +41,15 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
 import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.exception.MultipleMatchingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.spark.DeleteDelataResultImpl
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 object DeleteExecution {
   val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
@@ -118,17 +120,18 @@ object DeleteExecution {
         blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
         keyRdd.partitions.length)
 
+    val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
+      .sessionState.newHadoopConf()))
+
     val rdd = rowContRdd.join(keyRdd)
     res = rdd.mapPartitionsWithIndex(
       (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
         Iterator[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] {
-
+          ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
           var result = List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]()
           while (records.hasNext) {
             val ((key), (rowCountDetailsVO, groupedRows)) = records.next
             val segmentId = key.substring(0, key.indexOf(CarbonCommonConstants.FILE_SEPARATOR))
-            val loadDetail =
-              metadataDetails.find(_.getLoadName.equals(segmentId)).get
             result = result ++
                      deleteDeltaFunc(index,
                        key,
@@ -138,8 +141,7 @@ object DeleteExecution {
                        isStandardTable)
           }
           result
-        }
-    ).collect()
+        }).collect()
 
     // if no loads are present then no need to do anything.
     if (res.flatten.isEmpty) {
@@ -328,7 +330,7 @@ object DeleteExecution {
   private def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
   (CarbonTableInputFormat[Array[Object]], Job) = {
     val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
-    val jobConf: JobConf = new JobConf(new Configuration)
+    val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
     val job: Job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
     (carbonInputFormat, job)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 8c88d0e..66066ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -32,7 +32,10 @@ import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
+import org.apache.carbondata.core.util.{ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 object HorizontalCompaction {
 
@@ -188,8 +191,11 @@ object HorizontalCompaction {
 
       val timestamp = factTimeStamp
       val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails
+      val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
+        .sessionState.newHadoopConf()))
       val result = rdd1.mapPartitions(iter =>
         new Iterator[Seq[CarbonDataMergerUtilResult]] {
+          ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
           override def hasNext: Boolean = iter.hasNext
 
           override def next(): Seq[CarbonDataMergerUtilResult] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 45cacfa..1e987b0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -173,7 +173,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
       val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
         .getValidAndInvalidSegments.getValidSegments
       // First drop the partitions from partition mapper files of each segment
-      val tuples = new CarbonDropPartitionRDD(sparkSession.sparkContext,
+      val tuples = new CarbonDropPartitionRDD(sparkSession,
         table.getTablePath,
         segments.asScala,
         carbonPartitionsTobeDropped,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 7e11170..22ff5c4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -82,7 +82,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
         carbonTable.getAbsoluteTableIdentifier,
         sparkSession.sparkContext).process
       // generate dictionary files for the newly added columns
-      new AlterTableAddColumnRDD(sparkSession.sparkContext,
+      new AlterTableAddColumnRDD(sparkSession,
         newCols,
         carbonTable.getAbsoluteTableIdentifier).collect()
       timeStamp = System.currentTimeMillis
@@ -110,7 +110,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
         LOGGER.error(e, "Alter table add columns failed")
         if (newCols.nonEmpty) {
           LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
-          new AlterTableDropColumnRDD(sparkSession.sparkContext,
+          new AlterTableDropColumnRDD(sparkSession,
             newCols,
             carbonTable.getAbsoluteTableIdentifier).collect()
           AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 23dbf9e..1dbe28c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -149,7 +149,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       // TODO: 1. add check for deletion of index tables
       // delete dictionary files for dictionary column and clear dictionary cache from memory
-      new AlterTableDropColumnRDD(sparkSession.sparkContext,
+      new AlterTableDropColumnRDD(sparkSession,
         dictionaryColumns,
         carbonTable.getAbsoluteTableIdentifier).collect()
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index c403d52..1beda11 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
@@ -49,7 +49,9 @@ case class CarbonCreateTableCommand(
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val tableName = tableInfo.getFactTable.getTableName
     var databaseOpt : Option[String] = None
-    if(tableInfo.getDatabaseName != null) {
+    ThreadLocalSessionInfo
+      .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
+    if (tableInfo.getDatabaseName != null) {
       databaseOpt = Some(tableInfo.getDatabaseName)
     }
     val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 94b988c..f4fb90a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 
 /**
  * Carbon strategies for ddl commands
@@ -91,6 +91,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           case e: NoSuchDatabaseException =>
             CarbonProperties.getStorePath
         }
+        ThreadLocalSessionInfo
+          .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
         FileUtils.createDatabaseDirectory(dbName, dbLocation, sparkSession.sparkContext)
         ExecutedCommandExec(createDb) :: Nil
       case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 80f781e..b6667df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 
 /**
@@ -39,7 +40,7 @@ object TableLoader {
   def extractOptions(propertiesFile: String): immutable.Map[String, String] = {
     val props = new Properties
     val path = new Path(propertiesFile)
-    val fs = path.getFileSystem(new Configuration())
+    val fs = path.getFileSystem(FileFactory.getConfiguration)
     props.load(fs.open(path))
     val elments = props.entrySet().iterator()
     val map = new mutable.HashMap[String, String]()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index b0711ba..8e68ef3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -50,6 +50,8 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeConverter;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Executor class for executing the query on the selected segments to be merged.
  * This will fire a select * query and get the raw result.
@@ -103,7 +105,8 @@ public class CarbonCompactionExecutor {
    *
    * @return List of Carbon iterators
    */
-  public List<RawResultIterator> processTableBlocks() throws QueryExecutionException, IOException {
+  public List<RawResultIterator> processTableBlocks(Configuration configuration) throws
+      QueryExecutionException, IOException {
     List<RawResultIterator> resultList =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<TableBlockInfo> list = null;
@@ -131,7 +134,8 @@ public class CarbonCompactionExecutor {
                 .size());
         queryModel.setTableBlockInfos(list);
         resultList.add(
-            new RawResultIterator(executeBlockList(list, segmentId, task), sourceSegProperties,
+            new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
+                sourceSegProperties,
                 destinationSegProperties));
       }
     }
@@ -174,14 +178,14 @@ public class CarbonCompactionExecutor {
    * @return
    */
   private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList,
-      String segmentId, String taskId)
+      String segmentId, String taskId, Configuration configuration)
       throws QueryExecutionException, IOException {
     queryModel.setTableBlockInfos(blockList);
     QueryStatisticsRecorder executorRecorder = CarbonTimeStatisticsFactory
         .createExecutorRecorder(queryModel.getQueryId() + "_" + segmentId + "_" + taskId);
     queryStatisticsRecorders.add(executorRecorder);
     queryModel.setStatisticsRecorder(executorRecorder);
-    QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration);
     queryExecutorList.add(queryExecutor);
     return queryExecutor.execute(queryModel);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
index 01db4f6..dd5969f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
@@ -35,6 +35,8 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+import org.apache.hadoop.conf.Configuration;
+
 public abstract class AbstractCarbonQueryExecutor {
 
   private static final LogService LOGGER =
@@ -50,10 +52,11 @@ public abstract class AbstractCarbonQueryExecutor {
    * @param blockList
    * @return
    */
-  CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList)
+  CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList,
+      Configuration configuration)
       throws QueryExecutionException, IOException {
     queryModel.setTableBlockInfos(blockList);
-    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration);
     return queryExecutor.execute(queryModel);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
index daabd24..d32757c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
@@ -34,6 +34,8 @@ import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
 import org.apache.carbondata.core.util.DataTypeConverter;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Used to read carbon blocks when add/split partition
  */
@@ -48,7 +50,7 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
   }
 
   public List<PartitionSpliterRawResultIterator> processDataBlocks(
-      String segmentId, DataTypeConverter converter)
+      String segmentId, DataTypeConverter converter, Configuration configuration)
       throws QueryExecutionException, IOException {
     List<TableBlockInfo> list = null;
     queryModel = new QueryModelBuilder(carbonTable)
@@ -64,7 +66,7 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
       list = taskBlockInfo.getTableBlockInfoList(task);
       LOGGER.info("for task -" + task + "-block size is -" + list.size());
       queryModel.setTableBlockInfos(list);
-      resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list)));
+      resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list, configuration)));
     }
     return resultList;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index e3dabb1..4859dd2 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -29,6 +29,8 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.util.CarbonSessionInfo;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 
 import org.apache.hadoop.conf.Configuration;
@@ -60,6 +62,7 @@ public class CarbonReaderBuilder {
   CarbonReaderBuilder(String tablePath, String tableName) {
     this.tablePath = tablePath;
     this.tableName = tableName;
+    ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index d41e3d0..065b4a9 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -44,6 +44,8 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonSessionInfo;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.ThriftWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -70,6 +72,10 @@ public class CarbonWriterBuilder {
   private int localDictionaryThreshold;
   private boolean isLocalDictionaryEnabled;
 
+  public CarbonWriterBuilder() {
+    ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
+  }
+
   /**
    * Sets the output path of the writer builder
    * @param path is the absolute path where output files are written

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
index 73742b0..b6e7ad5 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
@@ -23,6 +23,7 @@ import java.util.Random;
 import java.util.UUID;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -47,7 +48,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
   private ObjectArrayWritable writable;
 
   JsonCarbonWriter(CarbonLoadModel loadModel) throws IOException {
-    Configuration OutputHadoopConf = new Configuration();
+    Configuration OutputHadoopConf = FileFactory.getConfiguration();
     CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel);
     CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat();
     JobID jobId = new JobID(UUID.randomUUID().toString(), 0);