You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/08/28 09:00:33 UTC

[3/3] carbondata git commit: [CARBONDATA-2844] [CARBONDATA-2865] Pass SK/AK to executor by serializing hadoop configuration from driver.

[CARBONDATA-2844] [CARBONDATA-2865] Pass SK/AK to executor by serializing hadoop configuration from driver.

add SK/AK to thread local so that on each query new SK/AK can be passed to FileFactory
Refactor FileFactory to accept configuration from thread local.
Fixed compatibility issue from 1.3.x to 1.5.x [CARBONDATA-2865].

This closes #2623


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2a9604cd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2a9604cd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2a9604cd

Branch: refs/heads/master
Commit: 2a9604cd840dc1a552afcff23059ac9bf624e161
Parents: 1fb1f19
Author: kunal642 <ku...@gmail.com>
Authored: Wed Aug 8 21:50:44 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Aug 28 14:30:19 2018 +0530

----------------------------------------------------------------------
 .../core/datastore/impl/DFSFileReaderImpl.java  |  8 ++-
 .../datastore/impl/DefaultFileTypeProvider.java |  4 +-
 .../core/datastore/impl/FileFactory.java        | 27 +++++++---
 .../core/datastore/impl/FileTypeInterface.java  |  2 +-
 .../scan/executor/QueryExecutorFactory.java     | 12 +++--
 .../executor/impl/AbstractQueryExecutor.java    |  5 +-
 .../scan/executor/impl/DetailQueryExecutor.java |  6 +++
 .../impl/SearchModeDetailQueryExecutor.java     |  4 +-
 .../SearchModeVectorDetailQueryExecutor.java    |  5 +-
 .../impl/VectorDetailQueryExecutor.java         |  6 +++
 .../carbondata/core/util/CarbonProperties.java  |  6 +--
 .../apache/carbondata/core/util/CarbonUtil.java |  6 +--
 .../carbondata/core/util/SessionParams.java     |  5 +-
 .../core/util/ThreadLocalSessionInfo.java       | 20 +++++++
 .../store/impl/DFSFileReaderImplUnitTest.java   |  3 +-
 .../datamap/lucene/LuceneDataMapWriter.java     |  2 +-
 .../carbondata/hadoop/CarbonRecordReader.java   | 10 ++--
 .../hadoop/api/CarbonInputFormat.java           |  3 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  1 -
 .../hadoop/api/CarbonTableOutputFormat.java     |  7 ++-
 .../hadoop/util/CarbonInputFormatUtil.java      | 17 ------
 .../carbondata/hive/CarbonHiveRecordReader.java |  2 +-
 .../presto/CarbondataPageSourceProvider.java    |  3 +-
 .../PrestoCarbonVectorizedRecordReader.java     |  3 +-
 .../presto/impl/CarbonTableReader.java          |  2 +-
 .../carbondata/presto/server/PrestoServer.scala |  1 +
 ...eneFineGrainDataMapWithSearchModeSuite.scala |  3 +-
 .../createTable/TestCreateTableAsSelect.scala   | 10 ++--
 .../carbondata/spark/load/CsvRDDHelper.scala    |  9 ++--
 .../load/DataLoadProcessBuilderOnSpark.scala    |  8 ++-
 .../load/DataLoadProcessorStepOnSpark.scala     | 10 ++--
 .../spark/rdd/AlterTableAddColumnRDD.scala      |  9 ++--
 .../spark/rdd/AlterTableDropColumnRDD.scala     | 11 ++--
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  |  7 +--
 .../spark/rdd/CarbonDropPartitionRDD.scala      | 11 ++--
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 28 +++++-----
 .../spark/rdd/CarbonIUDMergerRDD.scala          | 15 +++---
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 15 +++---
 .../apache/carbondata/spark/rdd/CarbonRDD.scala | 55 +++++++++-----------
 .../spark/rdd/CarbonScanPartitionRDD.scala      | 15 +++---
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 15 +++---
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 53 ++++++-------------
 .../carbondata/spark/rdd/SparkDataMapJob.scala  | 21 ++++----
 .../carbondata/spark/rdd/StreamHandoffRDD.scala | 14 +++--
 .../carbondata/spark/util/CommonUtil.scala      | 10 ++--
 .../spark/util/GlobalDictionaryUtil.scala       | 20 ++++---
 .../apache/spark/rdd/CarbonMergeFilesRDD.scala  | 11 ++--
 .../apache/spark/rdd/DataLoadCoalescedRDD.scala |  7 +--
 .../command/carbonTableSchemaCommon.scala       |  2 +-
 .../apache/spark/sql/util/SparkSQLUtil.scala    |  4 ++
 .../org/apache/spark/util/PartitionUtils.scala  |  2 +-
 .../VectorizedCarbonRecordReader.java           |  3 +-
 .../datasources/SparkCarbonFileFormat.scala     |  3 +-
 .../datamap/IndexDataMapRebuildRDD.scala        | 16 +++---
 .../spark/rdd/CarbonDataRDDFactory.scala        | 26 +++++----
 .../spark/rdd/CarbonTableCompactor.scala        |  4 +-
 .../org/apache/spark/sql/CarbonCountStar.scala  |  6 ++-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  1 -
 .../spark/sql/CarbonDictionaryDecoder.scala     | 15 ++++--
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  6 +--
 .../org/apache/spark/sql/CarbonSession.scala    |  3 +-
 .../org/apache/spark/sql/CarbonSource.scala     |  1 -
 .../sql/events/MergeIndexEventListener.scala    | 14 ++---
 .../management/CarbonInsertIntoCommand.scala    |  7 +--
 .../management/CarbonLoadDataCommand.scala      | 19 ++++---
 .../command/mutation/DeleteExecution.scala      | 16 +++---
 .../command/mutation/HorizontalCompaction.scala |  6 +++
 ...rbonAlterTableDropHivePartitionCommand.scala |  2 +-
 .../CarbonAlterTableAddColumnCommand.scala      |  4 +-
 .../CarbonAlterTableDropColumnCommand.scala     |  2 +-
 .../table/CarbonCreateTableCommand.scala        |  6 ++-
 .../sql/execution/strategy/DDLStrategy.scala    |  4 +-
 .../org/apache/spark/util/TableLoader.scala     |  3 +-
 .../merger/CarbonCompactionExecutor.java        | 12 +++--
 .../spliter/AbstractCarbonQueryExecutor.java    |  7 ++-
 .../partition/spliter/CarbonSplitExecutor.java  |  6 ++-
 .../sdk/file/CarbonReaderBuilder.java           |  3 ++
 .../sdk/file/CarbonWriterBuilder.java           |  6 +++
 .../carbondata/sdk/file/JsonCarbonWriter.java   |  3 +-
 .../store/worker/SearchRequestHandler.java      |  3 +-
 80 files changed, 428 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
index 1a0cd41..e86fa12 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileReader;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,7 +38,10 @@ public class DFSFileReaderImpl implements FileReader {
 
   private boolean readPageByPage;
 
-  public DFSFileReaderImpl() {
+  private Configuration configuration;
+
+  public DFSFileReaderImpl(Configuration configuration) {
+    this.configuration = configuration;
     this.fileNameAndStreamCache =
         new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   }
@@ -60,7 +64,7 @@ public class DFSFileReaderImpl implements FileReader {
     FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
     if (null == fileChannel) {
       Path pt = new Path(filePath);
-      FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+      FileSystem fs = pt.getFileSystem(configuration);
       fileChannel = fs.open(pt);
       fileNameAndStreamCache.put(filePath, fileChannel);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
index c4761c9..937b5b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 
 public class DefaultFileTypeProvider implements FileTypeInterface {
 
-  public FileReader getFileHolder(FileFactory.FileType fileType) {
+  public FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration) {
     switch (fileType) {
       case LOCAL:
         return new FileReaderImpl();
@@ -37,7 +37,7 @@ public class DefaultFileTypeProvider implements FileTypeInterface {
       case ALLUXIO:
       case VIEWFS:
       case S3:
-        return new DFSFileReaderImpl();
+        return new DFSFileReaderImpl(configuration);
       default:
         return new FileReaderImpl();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index e353623..b07d11b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -59,11 +60,23 @@ public final class FileFactory {
   }
 
   public static Configuration getConfiguration() {
-    return configuration;
+    Configuration conf;
+    Object confObject = ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo()
+        .getNonSerializableExtraInfo().get("carbonConf");
+    if (confObject == null) {
+      conf = configuration;
+    } else {
+      conf = (Configuration) confObject;
+    }
+    return conf;
   }
 
   public static FileReader getFileHolder(FileType fileType) {
-    return fileFileTypeInterface.getFileHolder(fileType);
+    return fileFileTypeInterface.getFileHolder(fileType, getConfiguration());
+  }
+
+  public static FileReader getFileHolder(FileType fileType, Configuration configuration) {
+    return fileFileTypeInterface.getFileHolder(fileType, configuration);
   }
 
   public static FileType getFileType(String path) {
@@ -100,7 +113,7 @@ public final class FileFactory {
 
   public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
       throws IOException {
-    return getDataInputStream(path, fileType, bufferSize, configuration);
+    return getDataInputStream(path, fileType, bufferSize, getConfiguration());
   }
   public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize,
       Configuration configuration) throws IOException {
@@ -306,7 +319,7 @@ public final class FileFactory {
         // this method was new in hadoop 2.7, otherwise use CarbonFile.truncate to do this.
         try {
           Path pt = new Path(path);
-          FileSystem fs = pt.getFileSystem(configuration);
+          FileSystem fs = pt.getFileSystem(getConfiguration());
           Method truncateMethod = fs.getClass().getDeclaredMethod("truncate",
               new Class[]{Path.class, long.class});
           truncateMethod.invoke(fs, new Object[]{pt, newSize});
@@ -414,7 +427,7 @@ public final class FileFactory {
       case VIEWFS:
       case S3:
         Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
+        FileSystem fs = path.getFileSystem(getConfiguration());
         return fs.getContentSummary(path).getLength();
       case LOCAL:
       default:
@@ -442,7 +455,7 @@ public final class FileFactory {
    * @throws IOException
    */
   public static FileSystem getFileSystem(Path path) throws IOException {
-    return path.getFileSystem(configuration);
+    return path.getFileSystem(getConfiguration());
   }
 
 
@@ -455,7 +468,7 @@ public final class FileFactory {
       case VIEWFS:
         try {
           Path path = new Path(directoryPath);
-          FileSystem fs = path.getFileSystem(FileFactory.configuration);
+          FileSystem fs = path.getFileSystem(getConfiguration());
           if (!fs.exists(path)) {
             fs.mkdirs(path);
             fs.setPermission(path, permission);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
index 358d2ef..8b0fcc4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 
 public interface FileTypeInterface {
 
-  FileReader getFileHolder(FileFactory.FileType fileType);
+  FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration);
   CarbonFile getCarbonFile(String path, FileFactory.FileType fileType);
   CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
index b790f1c..2a9c7f4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
@@ -23,24 +23,26 @@ import org.apache.carbondata.core.scan.executor.impl.VectorDetailQueryExecutor;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Factory class to get the query executor from RDD
  * This will return the executor based on query type
  */
 public class QueryExecutorFactory {
 
-  public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
+  public static QueryExecutor getQueryExecutor(QueryModel queryModel, Configuration configuration) {
     if (CarbonProperties.isSearchModeEnabled()) {
       if (queryModel.isVectorReader()) {
-        return new SearchModeVectorDetailQueryExecutor();
+        return new SearchModeVectorDetailQueryExecutor(configuration);
       } else {
-        return new SearchModeDetailQueryExecutor();
+        return new SearchModeDetailQueryExecutor(configuration);
       }
     } else {
       if (queryModel.isVectorReader()) {
-        return new VectorDetailQueryExecutor();
+        return new VectorDetailQueryExecutor(configuration);
       } else {
-        return new DetailQueryExecutor();
+        return new DetailQueryExecutor(configuration);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 259889b..ece2f8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -68,10 +68,12 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * This class provides a skeletal implementation of the {@link QueryExecutor}
@@ -96,7 +98,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
    */
   protected CarbonIterator queryIterator;
 
-  public AbstractQueryExecutor() {
+  public AbstractQueryExecutor(Configuration configuration) {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
     queryProperties = new QueryExecutorProperties();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
index 46ef43d..e11c576 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
@@ -27,6 +27,8 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Below class will be used to execute the detail query
  * For executing the detail query it will pass all the block execution
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator
  */
 public class DetailQueryExecutor extends AbstractQueryExecutor<RowBatch> {
 
+  public DetailQueryExecutor(Configuration configuration) {
+    super(configuration);
+  }
+
   @Override
   public CarbonIterator<RowBatch> execute(QueryModel queryModel)
       throws QueryExecutionException, IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
index ae14327..6d03540 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
@@ -31,13 +31,15 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import org.apache.hadoop.conf.Configuration;
 
 public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> {
   private static final LogService LOGGER =
           LogServiceFactory.getLogService(SearchModeDetailQueryExecutor.class.getName());
   private static ExecutorService executorService = null;
 
-  public SearchModeDetailQueryExecutor() {
+  public SearchModeDetailQueryExecutor(Configuration configuration) {
+    super(configuration);
     if (executorService == null) {
       initThreadPool();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
index 705c451..418ef42 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
@@ -32,6 +32,8 @@ import org.apache.carbondata.core.util.CarbonProperties;
 
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Below class will be used to execute the detail query and returns columnar vectors.
  */
@@ -40,7 +42,8 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O
           LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
   private static ExecutorService executorService = null;
 
-  public SearchModeVectorDetailQueryExecutor() {
+  public SearchModeVectorDetailQueryExecutor(Configuration configuration) {
+    super(configuration);
     if (executorService == null) {
       initThreadPool();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
index 7787e4c..46397c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
@@ -26,11 +26,17 @@ import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.VectorDetailQueryResultIterator;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Below class will be used to execute the detail query and returns columnar vectors.
  */
 public class VectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
 
+  public VectorDetailQueryExecutor(Configuration configuration) {
+    super(configuration);
+  }
+
   @Override
   public CarbonIterator<Object> execute(QueryModel queryModel)
       throws QueryExecutionException, IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index c3a4934..58fef17 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -23,11 +23,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.text.SimpleDateFormat;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -94,7 +94,7 @@ public final class CarbonProperties {
   /**
    * It is purely for testing
    */
-  private Map<String, String> addedProperty = new HashMap<>();
+  private Map<String, String> addedProperty = new ConcurrentHashMap<>();
 
   /**
    * Private constructor this will call load properties method to load all the
@@ -407,7 +407,7 @@ public final class CarbonProperties {
    * @param lockTypeConfigured
    */
   private void validateAndConfigureLockType(String lockTypeConfigured) {
-    Configuration configuration = new Configuration(true);
+    Configuration configuration = FileFactory.getConfiguration();
     String defaultFs = configuration.get("fs.defaultFS");
     if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
         || defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 9aaa58c..c5e2e8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -134,8 +134,6 @@ public final class CarbonUtil {
    */
   private static final int CONST_HUNDRED = 100;
 
-  private static final Configuration conf = new Configuration(true);
-
   /**
    * dfs.bytes-per-checksum
    * HDFS checksum length, block size for a file should be exactly divisible
@@ -662,7 +660,7 @@ public final class CarbonUtil {
    */
   public static String checkAndAppendHDFSUrl(String filePath) {
     String currentPath = filePath;
-    String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
+    String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS);
     String baseDFSUrl = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL, "");
     if (checkIfPrefixExists(filePath)) {
@@ -699,7 +697,7 @@ public final class CarbonUtil {
       filePath = "/" + filePath;
     }
     currentPath = filePath;
-    String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
+    String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS);
     if (defaultFsUrl == null) {
       return currentPath;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 169c003..51b157f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.util;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.logging.LogService;
@@ -57,12 +58,12 @@ public class SessionParams implements Serializable, Cloneable {
   private static final long serialVersionUID = -7801994600594915264L;
 
   private Map<String, String> sProps;
-  private Map<String, String> addedProps;
+  private ConcurrentHashMap<String, String> addedProps;
   // below field to be used when we want the objects to be serialized
   private Map<String, Object> extraInfo;
   public SessionParams() {
     sProps = new HashMap<>();
-    addedProps = new HashMap<>();
+    addedProps = new ConcurrentHashMap<>();
     extraInfo = new HashMap<>();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
index df525bc..f85a350 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.util;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * This class maintains ThreadLocal session params
  */
@@ -31,4 +33,22 @@ public class ThreadLocalSessionInfo {
   public static CarbonSessionInfo getCarbonSessionInfo() {
     return threadLocal.get();
   }
+
+  public static synchronized CarbonSessionInfo getOrCreateCarbonSessionInfo() {
+    CarbonSessionInfo info = threadLocal.get();
+    if (info == null || info.getSessionParams() == null) {
+      info = new CarbonSessionInfo();
+      info.setSessionParams(new SessionParams());
+      threadLocal.set(info);
+    }
+    return info;
+  }
+
+  public static void setConfigurationToCurrentThread(Configuration configuration) {
+    getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo().put("carbonConf", configuration);
+  }
+
+  public static void unsetAll() {
+    threadLocal.remove();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
index 30144c1..5033713 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.impl.DFSFileReaderImpl;
 
 import mockit.Mock;
 import mockit.MockUp;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,7 +46,7 @@ public class DFSFileReaderImplUnitTest {
   private static File fileWithEmptyContent;
 
   @BeforeClass public static void setup() {
-    dfsFileHolder = new DFSFileReaderImpl();
+    dfsFileHolder = new DFSFileReaderImpl(new Configuration());
     file = new File("Test.carbondata");
     fileWithEmptyContent = new File("TestEXception.carbondata");
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 605ec89..bdb17ed 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -165,7 +165,7 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
     // the indexWriter closes the FileSystem on closing the writer, so for a new configuration
     // and disable the cache for the index writer, it will be closed on closing the writer
-    Configuration conf = new Configuration();
+    Configuration conf = FileFactory.getConfiguration();
     conf.set("fs.hdfs.impl.disable.cache", "true");
 
     // create a index writer

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index a54e7a4..0d38906 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -57,15 +58,16 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
   private boolean skipClearDataMapAtClose = false;
 
   public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
-      InputMetricsStats inputMetricsStats) {
-    this(queryModel, readSupport);
+      InputMetricsStats inputMetricsStats, Configuration configuration) {
+    this(queryModel, readSupport, configuration);
     this.inputMetricsStats = inputMetricsStats;
   }
 
-  public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) {
+  public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
+      Configuration configuration) {
     this.queryModel = queryModel;
     this.readSupport = readSupport;
-    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 21ef6cf..3ebd6d6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -583,7 +583,8 @@ m filterExpression
     Configuration configuration = taskAttemptContext.getConfiguration();
     QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
     CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
-    return new CarbonRecordReader<T>(queryModel, readSupport);
+    return new CarbonRecordReader<T>(queryModel, readSupport,
+        taskAttemptContext.getConfiguration());
   }
 
   public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index af2cf83..ec201b9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -132,7 +132,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
-
     CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 5938c20..5cc275b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.processing.loading.DataLoadExecutor;
 import org.apache.carbondata.processing.loading.TableProcessingOperations;
@@ -231,7 +232,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
 
   @Override
   public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
-      TaskAttemptContext taskAttemptContext) throws IOException {
+      final TaskAttemptContext taskAttemptContext) throws IOException {
     final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
     //if loadModel having taskNo already(like in SDK) then no need to overwrite
     short sdkUserCore = loadModel.getSdkUserCores();
@@ -249,10 +250,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext);
     final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
     final ExecutorService executorService = Executors.newFixedThreadPool(1,
-        new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));;
+        new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));
     // It should be started in new thread as the underlying iterator uses blocking queue.
     Future future = executorService.submit(new Thread() {
       @Override public void run() {
+        ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskAttemptContext
+            .getConfiguration());
         try {
           dataLoadExecutor
               .execute(loadModel, tempStoreLocations, iterators);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index af7397b..7641427 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datamap.DataMapJob;
 import org.apache.carbondata.core.datamap.DataMapUtil;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -177,20 +176,4 @@ public class CarbonInputFormatUtil {
     return new JobID(jobtrackerID, batch);
   }
 
-  public static void setS3Configurations(Configuration hadoopConf) {
-    FileFactory.getConfiguration()
-        .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", ""));
-    FileFactory.getConfiguration()
-        .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", ""));
-    FileFactory.getConfiguration()
-        .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", ""));
-    FileFactory.getConfiguration().set(CarbonCommonConstants.S3_ACCESS_KEY,
-        hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, ""));
-    FileFactory.getConfiguration().set(CarbonCommonConstants.S3_SECRET_KEY,
-        hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, ""));
-    FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_ACCESS_KEY,
-        hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, ""));
-    FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_SECRET_KEY,
-        hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, ""));
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
index 57bcca3..4ed2b91 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -63,7 +63,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
 
   public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport,
       InputSplit inputSplit, JobConf jobConf) throws IOException {
-    super(queryModel, readSupport);
+    super(queryModel, readSupport, jobConf);
     initialize(inputSplit, jobConf);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index 5b15b22..3ec815d 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -100,7 +100,8 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
     checkArgument(carbondataSplit.getConnectorId().equals(connectorId),
         "split is not for this connector");
     QueryModel queryModel = createQueryModel(carbondataSplit, columns);
-    QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    QueryExecutor queryExecutor =
+        QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration());
     try {
       CarbonIterator iterator = queryExecutor.execute(queryModel);
       readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
index 32e163a..9935b54 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
@@ -114,7 +114,8 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
     queryModel.setTableBlockInfos(tableBlockInfoList);
     queryModel.setVectorReader(true);
     try {
-      queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+      queryExecutor =
+          QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration());
       iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
     } catch (QueryExecutionException e) {
       throw new InterruptedException(e.getMessage());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 7916932..5a1e140 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -414,7 +414,7 @@ public class CarbonTableReader {
     List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
     CarbonTable carbonTable = tableCacheModel.carbonTable;
     TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo();
-    Configuration config = new Configuration();
+    Configuration config = FileFactory.getConfiguration();
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
     String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
index 127c4c9..2f3b8f4 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
@@ -32,6 +32,7 @@ import com.facebook.presto.tests.DistributedQueryRunner
 import com.google.common.collect.ImmutableMap
 import org.slf4j.{Logger, LoggerFactory}
 
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.presto.CarbondataPlugin
 
 object PrestoServer {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
index 0ac6e72..6cbe747 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
@@ -41,8 +41,7 @@ class LuceneFineGrainDataMapWithSearchModeSuite extends QueryTest with BeforeAnd
     val n = 500000
     sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
     CarbonProperties
-      .getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s")
+      .getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s")
     LuceneFineGrainDataMapSuite.createFile(file2, n)
     sql("create database if not exists lucene")
     sql("use lucene")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
index 062e5ba..c95e5a4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
@@ -146,13 +146,13 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test create table as select with TBLPROPERTIES") {
-    sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
+    sql("DROP TABLE IF EXISTS ctas_tblproperties_testt")
     sql(
-      "create table ctas_tblproperties_test stored by 'carbondata' TBLPROPERTIES" +
+      "create table ctas_tblproperties_testt stored by 'carbondata' TBLPROPERTIES" +
         "('DICTIONARY_INCLUDE'='key', 'sort_scope'='global_sort') as select * from carbon_ctas_test")
-    checkAnswer(sql("select * from ctas_tblproperties_test"), sql("select * from carbon_ctas_test"))
+    checkAnswer(sql("select * from ctas_tblproperties_testt"), sql("select * from carbon_ctas_test"))
     val carbonTable = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore
-      .lookupRelation(Option("default"), "ctas_tblproperties_test")(Spark2TestQueryExecutor.spark)
+      .lookupRelation(Option("default"), "ctas_tblproperties_testt")(Spark2TestQueryExecutor.spark)
       .asInstanceOf[CarbonRelation].carbonTable
     val metadataFolderPath: CarbonFile = FileFactory.getCarbonFile(carbonTable.getMetadataPath)
     assert(metadataFolderPath.exists())
@@ -419,7 +419,7 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
     sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
     sql("DROP TABLE IF EXISTS ctas_select_where_orc")
-    sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
+    sql("DROP TABLE IF EXISTS ctas_tblproperties_testt")
     sql("DROP TABLE IF EXISTS ctas_if_table_name")
     sql("DROP TABLE IF EXISTS source_table")
     sql("DROP TABLE IF EXISTS target_table")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index 36d8c51..8d6dd32 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -37,6 +37,8 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.rdd.SerializableConfiguration
@@ -108,17 +110,18 @@ object CsvRDDHelper {
     closePartition()
 
     // 2. read function
-    val serializableConfiguration = new SerializableConfiguration(jobConf)
+    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
     val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
       override def apply(file: PartitionedFile): Iterator[InternalRow] = {
         new Iterator[InternalRow] {
-          val hadoopConf = serializableConfiguration.value
           val jobTrackerId: String = {
             val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
             formatter.format(new Date())
           }
+          ThreadLocalSessionInfo.setConfigurationToCurrentThread(serializableConfiguration.value)
           val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
-          val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+          val hadoopAttemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration,
+            attemptId)
           val inputSplit =
             new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
           var finished = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index dc238fb..be40b13 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -29,11 +29,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 /**
  * Use sortBy operator in spark to load the data
@@ -64,6 +66,7 @@ object DataLoadProcessBuilderOnSpark {
     val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
     val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
 
+    val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     // 1. Input
     val inputRDD = originRDD
       .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
@@ -73,6 +76,7 @@ object DataLoadProcessBuilderOnSpark {
 
     // 2. Convert
     val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
+      ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
       DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
         convertStepRowCounter)
     }.filter(_ != null)// Filter the bad record
@@ -116,7 +120,7 @@ object DataLoadProcessBuilderOnSpark {
     // 4. Write
     sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
       DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
-        writeStepRowCounter))
+        writeStepRowCounter, conf))
 
     // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
     // not have any functional impact as spark automatically monitors the cache usage on each node

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 73ed769..f17bd91 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.load
 import scala.util.Random
 
 import com.univocity.parsers.common.TextParsingException
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.Row
@@ -29,7 +30,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
 import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -40,7 +42,7 @@ import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImp
 import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
-import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
+import org.apache.carbondata.spark.rdd.{NewRddIterator, SerializableConfiguration, StringArrayRow}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
 
 object DataLoadProcessorStepOnSpark {
@@ -227,7 +229,9 @@ object DataLoadProcessorStepOnSpark {
       rows: Iterator[CarbonRow],
       index: Int,
       modelBroadcast: Broadcast[CarbonLoadModel],
-      rowCounter: Accumulator[Int]) {
+      rowCounter: Accumulator[Int],
+      conf: Broadcast[SerializableConfiguration]) {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
     var model: CarbonLoadModel = null
     var tableName: String = null
     var rowConverter: RowConverterImpl = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 7c1edea..f7aa623 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -17,7 +17,8 @@
 
 package org.apache.carbondata.spark.rdd
 
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -47,15 +48,15 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par
 /**
  * This class is aimed at generating dictionary file for the newly added columns
  */
-class AlterTableAddColumnRDD[K, V](sc: SparkContext,
+class AlterTableAddColumnRDD[K, V](@transient sparkSession: SparkSession,
     @transient newColumns: Seq[ColumnSchema],
     identifier: AbsoluteTableIdentifier)
-  extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
+  extends CarbonRDD[(Int, SegmentStatus)](sparkSession, Nil) {
 
   val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
     CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
       new AddColumnPartition(id, column._2, column._1)
     }.toArray

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index 0dbb4f0..a0d06b8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -17,11 +17,12 @@
 
 package org.apache.carbondata.spark.rdd
 
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.statusmanager.SegmentStatus
@@ -44,12 +45,12 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa
 /**
  * This class is aimed at generating dictionary file for the newly added columns
  */
-class AlterTableDropColumnRDD[K, V](sc: SparkContext,
+class AlterTableDropColumnRDD[K, V](@transient ss: SparkSession,
     @transient newColumns: Seq[ColumnSchema],
     carbonTableIdentifier: AbsoluteTableIdentifier)
-  extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
+  extends CarbonRDD[(Int, SegmentStatus)](ss, Nil) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
       new DropColumnPartition(id, column._2, column._1)
     }.toArray

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 85a6f41..86a5043 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -39,7 +39,8 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     partitionIds: Seq[String],
     bucketId: Int,
     identifier: AbsoluteTableIdentifier,
-    prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
+    prev: RDD[Array[AnyRef]])
+  extends CarbonRDD[(K, V)](alterPartitionModel.sqlContext.sparkSession, prev) {
 
   var storeLocation: String = null
   val carbonLoadModel = alterPartitionModel.carbonLoadModel
@@ -50,14 +51,14 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
   val factTableName = carbonTable.getTableName
   val partitionInfo = carbonTable.getPartitionInfo(factTableName)
 
-  override protected def getPartitions: Array[Partition] = {
+  override protected def internalGetPartitions: Array[Partition] = {
     val sc = alterPartitionModel.sqlContext.sparkContext
     sc.setLocalProperty("spark.scheduler.pool", "DDL")
     sc.setLocalProperty("spark.job.interruptOnCancel", "true")
     firstParent[Array[AnyRef]].partitions
   }
 
-  override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
     val iter = new Iterator[(K, V)] {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index d56e1c2..e2d1eff 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -21,7 +21,8 @@ import java.util
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.indexstore.PartitionSpec
@@ -37,19 +38,19 @@ case class CarbonDropPartition(rddId: Int, val idx: Int, segment: Segment)
 
 /**
  * RDD to drop the partitions from segment files of all segments.
- * @param sc
+ * @param ss
  * @param tablePath
  * @param segments segments to be cleaned
  */
 class CarbonDropPartitionRDD(
-    sc: SparkContext,
+    @transient ss: SparkSession,
     tablePath: String,
     segments: Seq[Segment],
     partitions: util.List[PartitionSpec],
     uniqueId: String)
-  extends CarbonRDD[(String, String)](sc, Nil, sc.hadoopConfiguration) {
+  extends CarbonRDD[(String, String)](ss, Nil) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     segments.zipWithIndex.map {s =>
       CarbonDropPartition(id, s._2, s._1)
     }.toArray

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 2ec8b9c..9265c7f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -31,7 +31,7 @@ import com.univocity.parsers.common.TextParsingException
 import org.apache.commons.lang3.{ArrayUtils, StringUtils}
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SparkSession}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -174,11 +174,12 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
  * @param model a model package load info
  */
 class CarbonAllDictionaryCombineRDD(
+    @transient sparkSession: SparkSession,
     prev: RDD[(String, Iterable[String])],
     model: DictionaryLoadModel)
-  extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
+  extends CarbonRDD[(Int, ColumnDistinctValues)](sparkSession, prev) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     firstParent[(String, Iterable[String])].partitions
   }
 
@@ -267,11 +268,12 @@ class StringArrayRow(var values: Array[String]) extends Row {
  * @param model a model package load info
  */
 class CarbonBlockDistinctValuesCombineRDD(
+    @transient ss: SparkSession,
     prev: RDD[Row],
     model: DictionaryLoadModel)
-  extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
+  extends CarbonRDD[(Int, ColumnDistinctValues)](ss, prev) {
 
-  override def getPartitions: Array[Partition] = firstParent[Row].partitions
+  override def internalGetPartitions: Array[Partition] = firstParent[Row].partitions
   override def internalCompute(split: Partition,
       context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -325,11 +327,14 @@ class CarbonBlockDistinctValuesCombineRDD(
  * @param model a model package load info
  */
 class CarbonGlobalDictionaryGenerateRDD(
+    @transient sparkSession: SparkSession,
     prev: RDD[(Int, ColumnDistinctValues)],
     model: DictionaryLoadModel)
-  extends CarbonRDD[(Int, SegmentStatus)](prev) {
+  extends CarbonRDD[(Int, SegmentStatus)](sparkSession, prev) {
+
+  override def internalGetPartitions: Array[Partition] =
+    firstParent[(Int, ColumnDistinctValues)].partitions
 
-  override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
 
   override def internalCompute(split: Partition,
       context: TaskContext): Iterator[(Int, SegmentStatus)] = {
@@ -492,21 +497,20 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
  * Use external column dict to generate global dictionary
  *
  * @param carbonLoadModel carbon load model
- * @param sparkContext    spark context
+ * @param sparkSession    spark context
  * @param table           carbon table identifier
  * @param dimensions      carbon dimenisons having predefined dict
  * @param dictFolderPath  path of dictionary folder
  */
 class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     dictionaryLoadModel: DictionaryLoadModel,
-    sparkContext: SparkContext,
+    @transient ss: SparkSession,
     table: CarbonTableIdentifier,
     dimensions: Array[CarbonDimension],
     dictFolderPath: String)
-  extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil,
-    sparkContext.hadoopConfiguration) {
+  extends CarbonRDD[(Int, ColumnDistinctValues)](ss, Nil) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     val primDimensions = dictionaryLoadModel.primDimensions
     val primDimLength = primDimensions.length
     val result = new Array[Partition](primDimLength)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 3aaf0ae..762b920 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -24,14 +24,15 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.Partition
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
-import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.api.CarbonInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -41,23 +42,23 @@ import org.apache.carbondata.spark.MergeResult
  * IUD carbon merger RDD
  * */
 class CarbonIUDMergerRDD[K, V](
-    sc: SparkContext,
+    @transient ss: SparkSession,
     result: MergeResult[K, V],
     carbonLoadModel: CarbonLoadModel,
     carbonMergerMapping: CarbonMergerMapping,
     confExecutorsTemp: String)
-  extends CarbonMergerRDD[K, V](sc,
+  extends CarbonMergerRDD[K, V](ss,
     result,
     carbonLoadModel,
     carbonMergerMapping,
     confExecutorsTemp) {
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
       tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
-    val jobConf: JobConf = new JobConf(new Configuration)
+    val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index f9f65a7..a0425b7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block._
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
@@ -60,15 +61,15 @@ import org.apache.carbondata.spark.MergeResult
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
 class CarbonMergerRDD[K, V](
-    sc: SparkContext,
+    @transient ss: SparkSession,
     result: MergeResult[K, V],
     carbonLoadModel: CarbonLoadModel,
     carbonMergerMapping: CarbonMergerMapping,
     confExecutorsTemp: String)
-  extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
+  extends CarbonRDD[(K, V)](ss, Nil) {
 
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-  sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+  ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
+  ss.sparkContext.setLocalProperty("spark.job.interruptOnCancel", "true")
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
   var storeLocation: String = null
@@ -183,7 +184,7 @@ class CarbonMergerRDD[K, V](
         }
         try {
           // fire a query and get the results.
-          rawResultIteratorList = exec.processTableBlocks()
+          rawResultIteratorList = exec.processTableBlocks(FileFactory.getConfiguration)
         } catch {
           case e: Throwable =>
             LOGGER.error(e)
@@ -269,7 +270,7 @@ class CarbonMergerRDD[K, V](
     iter
   }
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
       tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
@@ -277,7 +278,7 @@ class CarbonMergerRDD[K, V](
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
       carbonTable)
-    val jobConf: JobConf = new JobConf(new Configuration)
+    val jobConf: JobConf = new JobConf(getConf)
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 54a7530..04f20b1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -17,28 +17,24 @@
 
 package org.apache.carbondata.spark.rdd
 
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
-
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.util.SparkSQLUtil
 
-import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util._
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 
 /**
  * This RDD maintains session level ThreadLocal
  */
-abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
-    @transient private var deps: Seq[Dependency[_]],
-    @transient hadoopConf: Configuration) extends RDD[T](sc, deps) {
+abstract class CarbonRDD[T: ClassTag](@transient ss: SparkSession,
+    @transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) {
 
   val carbonSessionInfo: CarbonSessionInfo = {
     var info = ThreadLocalSessionInfo.getCarbonSessionInfo
@@ -50,24 +46,27 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
     info
   }
 
-  private val confBytes = {
-    val bao = new ByteArrayOutputStream()
-    val oos = new ObjectOutputStream(bao)
-    hadoopConf.write(oos)
-    oos.close()
-    CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
-  }
+  val config: Broadcast[SerializableConfiguration] = sparkContext
+    .broadcast(new SerializableConfiguration(SparkSQLUtil.sessionState(ss).newHadoopConf()))
 
   /** Construct an RDD with just a one-to-one dependency on one parent */
-  def this(@transient oneParent: RDD[_]) =
-    this (oneParent.context, List(new OneToOneDependency(oneParent)),
-      oneParent.sparkContext.hadoopConfiguration)
+  def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) =
+    this (sparkSession, List(new OneToOneDependency(oneParent)))
+
+  protected def internalGetPartitions: Array[Partition]
+
+  override def getPartitions: Array[Partition] = {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(config.value.value)
+    internalGetPartitions
+  }
 
   // RDD compute logic should be here
   def internalCompute(split: Partition, context: TaskContext): Iterator[T]
 
   final def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    CarbonInputFormatUtil.setS3Configurations(getConf)
+    TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll())
+    carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", config
+      .value.value)
     ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
     TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
     val carbonTaskInfo = new CarbonTaskInfo
@@ -79,13 +78,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
   }
 
   def getConf: Configuration = {
-    val configuration = new Configuration(false)
-    val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
-      .unCompressByte(confBytes))
-    val ois = new ObjectInputStream(bai)
-    configuration.readFields(ois)
-    ois.close()
-    configuration
+    config.value.value
   }
 }
 
@@ -93,12 +86,14 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
  * This RDD contains TableInfo object which is serialized and deserialized in driver and executor
  */
 abstract class CarbonRDDWithTableInfo[T: ClassTag](
-    @transient sc: SparkContext,
+    @transient ss: SparkSession,
     @transient private var deps: Seq[Dependency[_]],
-    serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps, sc.hadoopConfiguration) {
+    serializedTableInfo: Array[Byte]) extends CarbonRDD[T](ss, deps) {
 
-  def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) =
-    this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo)
+  def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_],
+      serializedTableInfo: Array[Byte]) = {
+    this (sparkSession, List(new OneToOneDependency(oneParent)), serializedTableInfo)
+  }
 
   def getTableInfo: TableInfo = TableInfo.deserialize(serializedTableInfo)
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index 9452777..241720a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.AlterPartitionModel
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.util.CarbonException
@@ -36,6 +37,7 @@ import org.apache.spark.util.PartitionUtils
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -65,7 +67,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
     absoluteTableIdentifier: AbsoluteTableIdentifier,
     partitionIds: Seq[String],
     bucketId: Int)
-  extends RDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkContext, Nil) {
+  extends CarbonRDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkSession, Nil) {
 
   private val queryId = alterPartitionModel.sqlContext.sparkContext.getConf
     .get("queryId", System.nanoTime() + "")
@@ -91,9 +93,9 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
   val dictionaryIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]()
   val measureIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]()
 
-  override def getPartitions: Array[Partition] = {
+  override def internalGetPartitions: Array[Partition] = {
     val parallelism = sparkContext.defaultParallelism
-    val jobConf = new JobConf(new Configuration)
+    val jobConf = new JobConf(FileFactory.getConfiguration)
     val job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonTableInputFormat(absoluteTableIdentifier,
       partitionIds.toList.asJava, job)
@@ -127,8 +129,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
     result.toArray(new Array[Partition](result.size()))
   }
 
-  override def compute(split: Partition, context: TaskContext):
-    Iterator[(AnyRef, Array[AnyRef])] = {
+  override def internalCompute(split: Partition, context: TaskContext):
+  Iterator[(AnyRef, Array[AnyRef])] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
       var exec : CarbonSplitExecutor = null
       val rows : java.util.List[(AnyRef, Array[AnyRef])] = new ArrayList[(AnyRef, Array[AnyRef])]()
@@ -142,7 +144,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
         var result : java.util.List[PartitionSpliterRawResultIterator] = null
         try {
           exec = new CarbonSplitExecutor(segmentMapping, carbonTable)
-          result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl())
+          result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl(),
+            FileFactory.getConfiguration)
         } catch {
           case e: Throwable =>
             LOGGER.error(e)