You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/08/28 09:00:31 UTC
[1/3] carbondata git commit: [CARBONDATA-2844] [CARBONDATA-2865] Pass
SK/AK to executor by serializing hadoop configuration from driver.
Repository: carbondata
Updated Branches:
refs/heads/master 1fb1f19f2 -> 2a9604cd8
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index b93d80f..30aa415 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -54,6 +54,7 @@ import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.CarbonRecordReader;
import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
+import org.apache.hadoop.conf.Configuration;
import org.apache.spark.search.SearchRequest;
import org.apache.spark.search.SearchResult;
import org.apache.spark.search.ShutdownRequest;
@@ -135,7 +136,7 @@ public class SearchRequestHandler {
// In search mode, reader will read multiple blocks by using a thread pool
CarbonRecordReader<CarbonRow> reader =
- new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport());
+ new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport(), new Configuration());
// read all rows by the reader
List<CarbonRow> rows = new LinkedList<>();
[3/3] carbondata git commit: [CARBONDATA-2844] [CARBONDATA-2865] Pass
SK/AK to executor by serializing hadoop configuration from driver.
Posted by ra...@apache.org.
[CARBONDATA-2844] [CARBONDATA-2865] Pass SK/AK to executor by serializing hadoop configuration from driver.
add SK/AK to thread local so that on each query new SK/AK can be passed to FileFactory
Refactor FileFactory to accept configuration from thread local.
Fixed compatibility issue from 1.3.x to 1.5.x [CARBONDATA-2865].
This closes #2623
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2a9604cd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2a9604cd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2a9604cd
Branch: refs/heads/master
Commit: 2a9604cd840dc1a552afcff23059ac9bf624e161
Parents: 1fb1f19
Author: kunal642 <ku...@gmail.com>
Authored: Wed Aug 8 21:50:44 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Aug 28 14:30:19 2018 +0530
----------------------------------------------------------------------
.../core/datastore/impl/DFSFileReaderImpl.java | 8 ++-
.../datastore/impl/DefaultFileTypeProvider.java | 4 +-
.../core/datastore/impl/FileFactory.java | 27 +++++++---
.../core/datastore/impl/FileTypeInterface.java | 2 +-
.../scan/executor/QueryExecutorFactory.java | 12 +++--
.../executor/impl/AbstractQueryExecutor.java | 5 +-
.../scan/executor/impl/DetailQueryExecutor.java | 6 +++
.../impl/SearchModeDetailQueryExecutor.java | 4 +-
.../SearchModeVectorDetailQueryExecutor.java | 5 +-
.../impl/VectorDetailQueryExecutor.java | 6 +++
.../carbondata/core/util/CarbonProperties.java | 6 +--
.../apache/carbondata/core/util/CarbonUtil.java | 6 +--
.../carbondata/core/util/SessionParams.java | 5 +-
.../core/util/ThreadLocalSessionInfo.java | 20 +++++++
.../store/impl/DFSFileReaderImplUnitTest.java | 3 +-
.../datamap/lucene/LuceneDataMapWriter.java | 2 +-
.../carbondata/hadoop/CarbonRecordReader.java | 10 ++--
.../hadoop/api/CarbonInputFormat.java | 3 +-
.../hadoop/api/CarbonTableInputFormat.java | 1 -
.../hadoop/api/CarbonTableOutputFormat.java | 7 ++-
.../hadoop/util/CarbonInputFormatUtil.java | 17 ------
.../carbondata/hive/CarbonHiveRecordReader.java | 2 +-
.../presto/CarbondataPageSourceProvider.java | 3 +-
.../PrestoCarbonVectorizedRecordReader.java | 3 +-
.../presto/impl/CarbonTableReader.java | 2 +-
.../carbondata/presto/server/PrestoServer.scala | 1 +
...eneFineGrainDataMapWithSearchModeSuite.scala | 3 +-
.../createTable/TestCreateTableAsSelect.scala | 10 ++--
.../carbondata/spark/load/CsvRDDHelper.scala | 9 ++--
.../load/DataLoadProcessBuilderOnSpark.scala | 8 ++-
.../load/DataLoadProcessorStepOnSpark.scala | 10 ++--
.../spark/rdd/AlterTableAddColumnRDD.scala | 9 ++--
.../spark/rdd/AlterTableDropColumnRDD.scala | 11 ++--
.../spark/rdd/AlterTableLoadPartitionRDD.scala | 7 +--
.../spark/rdd/CarbonDropPartitionRDD.scala | 11 ++--
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 28 +++++-----
.../spark/rdd/CarbonIUDMergerRDD.scala | 15 +++---
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 15 +++---
.../apache/carbondata/spark/rdd/CarbonRDD.scala | 55 +++++++++-----------
.../spark/rdd/CarbonScanPartitionRDD.scala | 15 +++---
.../carbondata/spark/rdd/CarbonScanRDD.scala | 15 +++---
.../spark/rdd/NewCarbonDataLoadRDD.scala | 53 ++++++-------------
.../carbondata/spark/rdd/SparkDataMapJob.scala | 21 ++++----
.../carbondata/spark/rdd/StreamHandoffRDD.scala | 14 +++--
.../carbondata/spark/util/CommonUtil.scala | 10 ++--
.../spark/util/GlobalDictionaryUtil.scala | 20 ++++---
.../apache/spark/rdd/CarbonMergeFilesRDD.scala | 11 ++--
.../apache/spark/rdd/DataLoadCoalescedRDD.scala | 7 +--
.../command/carbonTableSchemaCommon.scala | 2 +-
.../apache/spark/sql/util/SparkSQLUtil.scala | 4 ++
.../org/apache/spark/util/PartitionUtils.scala | 2 +-
.../VectorizedCarbonRecordReader.java | 3 +-
.../datasources/SparkCarbonFileFormat.scala | 3 +-
.../datamap/IndexDataMapRebuildRDD.scala | 16 +++---
.../spark/rdd/CarbonDataRDDFactory.scala | 26 +++++----
.../spark/rdd/CarbonTableCompactor.scala | 4 +-
.../org/apache/spark/sql/CarbonCountStar.scala | 6 ++-
.../sql/CarbonDatasourceHadoopRelation.scala | 1 -
.../spark/sql/CarbonDictionaryDecoder.scala | 15 ++++--
.../scala/org/apache/spark/sql/CarbonEnv.scala | 6 +--
.../org/apache/spark/sql/CarbonSession.scala | 3 +-
.../org/apache/spark/sql/CarbonSource.scala | 1 -
.../sql/events/MergeIndexEventListener.scala | 14 ++---
.../management/CarbonInsertIntoCommand.scala | 7 +--
.../management/CarbonLoadDataCommand.scala | 19 ++++---
.../command/mutation/DeleteExecution.scala | 16 +++---
.../command/mutation/HorizontalCompaction.scala | 6 +++
...rbonAlterTableDropHivePartitionCommand.scala | 2 +-
.../CarbonAlterTableAddColumnCommand.scala | 4 +-
.../CarbonAlterTableDropColumnCommand.scala | 2 +-
.../table/CarbonCreateTableCommand.scala | 6 ++-
.../sql/execution/strategy/DDLStrategy.scala | 4 +-
.../org/apache/spark/util/TableLoader.scala | 3 +-
.../merger/CarbonCompactionExecutor.java | 12 +++--
.../spliter/AbstractCarbonQueryExecutor.java | 7 ++-
.../partition/spliter/CarbonSplitExecutor.java | 6 ++-
.../sdk/file/CarbonReaderBuilder.java | 3 ++
.../sdk/file/CarbonWriterBuilder.java | 6 +++
.../carbondata/sdk/file/JsonCarbonWriter.java | 3 +-
.../store/worker/SearchRequestHandler.java | 3 +-
80 files changed, 428 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
index 1a0cd41..e86fa12 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,7 +38,10 @@ public class DFSFileReaderImpl implements FileReader {
private boolean readPageByPage;
- public DFSFileReaderImpl() {
+ private Configuration configuration;
+
+ public DFSFileReaderImpl(Configuration configuration) {
+ this.configuration = configuration;
this.fileNameAndStreamCache =
new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
}
@@ -60,7 +64,7 @@ public class DFSFileReaderImpl implements FileReader {
FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
if (null == fileChannel) {
Path pt = new Path(filePath);
- FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+ FileSystem fs = pt.getFileSystem(configuration);
fileChannel = fs.open(pt);
fileNameAndStreamCache.put(filePath, fileChannel);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
index c4761c9..937b5b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration;
public class DefaultFileTypeProvider implements FileTypeInterface {
- public FileReader getFileHolder(FileFactory.FileType fileType) {
+ public FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration) {
switch (fileType) {
case LOCAL:
return new FileReaderImpl();
@@ -37,7 +37,7 @@ public class DefaultFileTypeProvider implements FileTypeInterface {
case ALLUXIO:
case VIEWFS:
case S3:
- return new DFSFileReaderImpl();
+ return new DFSFileReaderImpl(configuration);
default:
return new FileReaderImpl();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index e353623..b07d11b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
@@ -59,11 +60,23 @@ public final class FileFactory {
}
public static Configuration getConfiguration() {
- return configuration;
+ Configuration conf;
+ Object confObject = ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo()
+ .getNonSerializableExtraInfo().get("carbonConf");
+ if (confObject == null) {
+ conf = configuration;
+ } else {
+ conf = (Configuration) confObject;
+ }
+ return conf;
}
public static FileReader getFileHolder(FileType fileType) {
- return fileFileTypeInterface.getFileHolder(fileType);
+ return fileFileTypeInterface.getFileHolder(fileType, getConfiguration());
+ }
+
+ public static FileReader getFileHolder(FileType fileType, Configuration configuration) {
+ return fileFileTypeInterface.getFileHolder(fileType, configuration);
}
public static FileType getFileType(String path) {
@@ -100,7 +113,7 @@ public final class FileFactory {
public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
throws IOException {
- return getDataInputStream(path, fileType, bufferSize, configuration);
+ return getDataInputStream(path, fileType, bufferSize, getConfiguration());
}
public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize,
Configuration configuration) throws IOException {
@@ -306,7 +319,7 @@ public final class FileFactory {
// this method was new in hadoop 2.7, otherwise use CarbonFile.truncate to do this.
try {
Path pt = new Path(path);
- FileSystem fs = pt.getFileSystem(configuration);
+ FileSystem fs = pt.getFileSystem(getConfiguration());
Method truncateMethod = fs.getClass().getDeclaredMethod("truncate",
new Class[]{Path.class, long.class});
truncateMethod.invoke(fs, new Object[]{pt, newSize});
@@ -414,7 +427,7 @@ public final class FileFactory {
case VIEWFS:
case S3:
Path path = new Path(filePath);
- FileSystem fs = path.getFileSystem(configuration);
+ FileSystem fs = path.getFileSystem(getConfiguration());
return fs.getContentSummary(path).getLength();
case LOCAL:
default:
@@ -442,7 +455,7 @@ public final class FileFactory {
* @throws IOException
*/
public static FileSystem getFileSystem(Path path) throws IOException {
- return path.getFileSystem(configuration);
+ return path.getFileSystem(getConfiguration());
}
@@ -455,7 +468,7 @@ public final class FileFactory {
case VIEWFS:
try {
Path path = new Path(directoryPath);
- FileSystem fs = path.getFileSystem(FileFactory.configuration);
+ FileSystem fs = path.getFileSystem(getConfiguration());
if (!fs.exists(path)) {
fs.mkdirs(path);
fs.setPermission(path, permission);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
index 358d2ef..8b0fcc4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
public interface FileTypeInterface {
- FileReader getFileHolder(FileFactory.FileType fileType);
+ FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration);
CarbonFile getCarbonFile(String path, FileFactory.FileType fileType);
CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
index b790f1c..2a9c7f4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
@@ -23,24 +23,26 @@ import org.apache.carbondata.core.scan.executor.impl.VectorDetailQueryExecutor;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Factory class to get the query executor from RDD
* This will return the executor based on query type
*/
public class QueryExecutorFactory {
- public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
+ public static QueryExecutor getQueryExecutor(QueryModel queryModel, Configuration configuration) {
if (CarbonProperties.isSearchModeEnabled()) {
if (queryModel.isVectorReader()) {
- return new SearchModeVectorDetailQueryExecutor();
+ return new SearchModeVectorDetailQueryExecutor(configuration);
} else {
- return new SearchModeDetailQueryExecutor();
+ return new SearchModeDetailQueryExecutor(configuration);
}
} else {
if (queryModel.isVectorReader()) {
- return new VectorDetailQueryExecutor();
+ return new VectorDetailQueryExecutor(configuration);
} else {
- return new DetailQueryExecutor();
+ return new DetailQueryExecutor(configuration);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 259889b..ece2f8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -68,10 +68,12 @@ import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
/**
* This class provides a skeletal implementation of the {@link QueryExecutor}
@@ -96,7 +98,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
*/
protected CarbonIterator queryIterator;
- public AbstractQueryExecutor() {
+ public AbstractQueryExecutor(Configuration configuration) {
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
queryProperties = new QueryExecutorProperties();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
index 46ef43d..e11c576 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
@@ -27,6 +27,8 @@ import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Below class will be used to execute the detail query
* For executing the detail query it will pass all the block execution
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator
*/
public class DetailQueryExecutor extends AbstractQueryExecutor<RowBatch> {
+ public DetailQueryExecutor(Configuration configuration) {
+ super(configuration);
+ }
+
@Override
public CarbonIterator<RowBatch> execute(QueryModel queryModel)
throws QueryExecutionException, IOException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
index ae14327..6d03540 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
@@ -31,13 +31,15 @@ import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.hadoop.conf.Configuration;
public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> {
private static final LogService LOGGER =
LogServiceFactory.getLogService(SearchModeDetailQueryExecutor.class.getName());
private static ExecutorService executorService = null;
- public SearchModeDetailQueryExecutor() {
+ public SearchModeDetailQueryExecutor(Configuration configuration) {
+ super(configuration);
if (executorService == null) {
initThreadPool();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
index 705c451..418ef42 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
@@ -32,6 +32,8 @@ import org.apache.carbondata.core.util.CarbonProperties;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Below class will be used to execute the detail query and returns columnar vectors.
*/
@@ -40,7 +42,8 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O
LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
private static ExecutorService executorService = null;
- public SearchModeVectorDetailQueryExecutor() {
+ public SearchModeVectorDetailQueryExecutor(Configuration configuration) {
+ super(configuration);
if (executorService == null) {
initThreadPool();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
index 7787e4c..46397c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
@@ -26,11 +26,17 @@ import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.iterator.VectorDetailQueryResultIterator;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Below class will be used to execute the detail query and returns columnar vectors.
*/
public class VectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
+ public VectorDetailQueryExecutor(Configuration configuration) {
+ super(configuration);
+ }
+
@Override
public CarbonIterator<Object> execute(QueryModel queryModel)
throws QueryExecutionException, IOException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index c3a4934..58fef17 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -23,11 +23,11 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.text.SimpleDateFormat;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -94,7 +94,7 @@ public final class CarbonProperties {
/**
* It is purely for testing
*/
- private Map<String, String> addedProperty = new HashMap<>();
+ private Map<String, String> addedProperty = new ConcurrentHashMap<>();
/**
* Private constructor this will call load properties method to load all the
@@ -407,7 +407,7 @@ public final class CarbonProperties {
* @param lockTypeConfigured
*/
private void validateAndConfigureLockType(String lockTypeConfigured) {
- Configuration configuration = new Configuration(true);
+ Configuration configuration = FileFactory.getConfiguration();
String defaultFs = configuration.get("fs.defaultFS");
if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
|| defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 9aaa58c..c5e2e8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -134,8 +134,6 @@ public final class CarbonUtil {
*/
private static final int CONST_HUNDRED = 100;
- private static final Configuration conf = new Configuration(true);
-
/**
* dfs.bytes-per-checksum
* HDFS checksum length, block size for a file should be exactly divisible
@@ -662,7 +660,7 @@ public final class CarbonUtil {
*/
public static String checkAndAppendHDFSUrl(String filePath) {
String currentPath = filePath;
- String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
+ String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS);
String baseDFSUrl = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL, "");
if (checkIfPrefixExists(filePath)) {
@@ -699,7 +697,7 @@ public final class CarbonUtil {
filePath = "/" + filePath;
}
currentPath = filePath;
- String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
+ String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS);
if (defaultFsUrl == null) {
return currentPath;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 169c003..51b157f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.util;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.constants.LoggerAction;
import org.apache.carbondata.common.logging.LogService;
@@ -57,12 +58,12 @@ public class SessionParams implements Serializable, Cloneable {
private static final long serialVersionUID = -7801994600594915264L;
private Map<String, String> sProps;
- private Map<String, String> addedProps;
+ private ConcurrentHashMap<String, String> addedProps;
// below field to be used when we want the objects to be serialized
private Map<String, Object> extraInfo;
public SessionParams() {
sProps = new HashMap<>();
- addedProps = new HashMap<>();
+ addedProps = new ConcurrentHashMap<>();
extraInfo = new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
index df525bc..f85a350 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.util;
+import org.apache.hadoop.conf.Configuration;
+
/**
* This class maintains ThreadLocal session params
*/
@@ -31,4 +33,22 @@ public class ThreadLocalSessionInfo {
public static CarbonSessionInfo getCarbonSessionInfo() {
return threadLocal.get();
}
+
+ public static synchronized CarbonSessionInfo getOrCreateCarbonSessionInfo() {
+ CarbonSessionInfo info = threadLocal.get();
+ if (info == null || info.getSessionParams() == null) {
+ info = new CarbonSessionInfo();
+ info.setSessionParams(new SessionParams());
+ threadLocal.set(info);
+ }
+ return info;
+ }
+
+ public static void setConfigurationToCurrentThread(Configuration configuration) {
+ getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo().put("carbonConf", configuration);
+ }
+
+ public static void unsetAll() {
+ threadLocal.remove();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
index 30144c1..5033713 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.impl.DFSFileReaderImpl;
import mockit.Mock;
import mockit.MockUp;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,7 +46,7 @@ public class DFSFileReaderImplUnitTest {
private static File fileWithEmptyContent;
@BeforeClass public static void setup() {
- dfsFileHolder = new DFSFileReaderImpl();
+ dfsFileHolder = new DFSFileReaderImpl(new Configuration());
file = new File("Test.carbondata");
fileWithEmptyContent = new File("TestEXception.carbondata");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 605ec89..bdb17ed 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -165,7 +165,7 @@ public class LuceneDataMapWriter extends DataMapWriter {
// the indexWriter closes the FileSystem on closing the writer, so for a new configuration
// and disable the cache for the index writer, it will be closed on closing the writer
- Configuration conf = new Configuration();
+ Configuration conf = FileFactory.getConfiguration();
conf.set("fs.hdfs.impl.disable.cache", "true");
// create a index writer
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index a54e7a4..0d38906 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -57,15 +58,16 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
private boolean skipClearDataMapAtClose = false;
public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
- InputMetricsStats inputMetricsStats) {
- this(queryModel, readSupport);
+ InputMetricsStats inputMetricsStats, Configuration configuration) {
+ this(queryModel, readSupport, configuration);
this.inputMetricsStats = inputMetricsStats;
}
- public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) {
+ public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
+ Configuration configuration) {
this.queryModel = queryModel;
this.readSupport = readSupport;
- this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration);
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 21ef6cf..3ebd6d6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -583,7 +583,8 @@ m filterExpression
Configuration configuration = taskAttemptContext.getConfiguration();
QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
- return new CarbonRecordReader<T>(queryModel, readSupport);
+ return new CarbonRecordReader<T>(queryModel, readSupport,
+ taskAttemptContext.getConfiguration());
}
public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index af2cf83..ec201b9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -132,7 +132,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
-
CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 5938c20..5cc275b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
import org.apache.carbondata.processing.loading.DataLoadExecutor;
import org.apache.carbondata.processing.loading.TableProcessingOperations;
@@ -231,7 +232,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
@Override
public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
- TaskAttemptContext taskAttemptContext) throws IOException {
+ final TaskAttemptContext taskAttemptContext) throws IOException {
final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
//if loadModel having taskNo already(like in SDK) then no need to overwrite
short sdkUserCore = loadModel.getSdkUserCores();
@@ -249,10 +250,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext);
final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
final ExecutorService executorService = Executors.newFixedThreadPool(1,
- new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));;
+ new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));
// It should be started in new thread as the underlying iterator uses blocking queue.
Future future = executorService.submit(new Thread() {
@Override public void run() {
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskAttemptContext
+ .getConfiguration());
try {
dataLoadExecutor
.execute(loadModel, tempStoreLocations, iterators);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index af7397b..7641427 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
import org.apache.carbondata.core.datamap.DataMapJob;
import org.apache.carbondata.core.datamap.DataMapUtil;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -177,20 +176,4 @@ public class CarbonInputFormatUtil {
return new JobID(jobtrackerID, batch);
}
- public static void setS3Configurations(Configuration hadoopConf) {
- FileFactory.getConfiguration()
- .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", ""));
- FileFactory.getConfiguration()
- .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", ""));
- FileFactory.getConfiguration()
- .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", ""));
- FileFactory.getConfiguration().set(CarbonCommonConstants.S3_ACCESS_KEY,
- hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, ""));
- FileFactory.getConfiguration().set(CarbonCommonConstants.S3_SECRET_KEY,
- hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, ""));
- FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_ACCESS_KEY,
- hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, ""));
- FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_SECRET_KEY,
- hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, ""));
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
index 57bcca3..4ed2b91 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -63,7 +63,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport,
InputSplit inputSplit, JobConf jobConf) throws IOException {
- super(queryModel, readSupport);
+ super(queryModel, readSupport, jobConf);
initialize(inputSplit, jobConf);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index 5b15b22..3ec815d 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -100,7 +100,8 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
checkArgument(carbondataSplit.getConnectorId().equals(connectorId),
"split is not for this connector");
QueryModel queryModel = createQueryModel(carbondataSplit, columns);
- QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ QueryExecutor queryExecutor =
+ QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration());
try {
CarbonIterator iterator = queryExecutor.execute(queryModel);
readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
index 32e163a..9935b54 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
@@ -114,7 +114,8 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
queryModel.setTableBlockInfos(tableBlockInfoList);
queryModel.setVectorReader(true);
try {
- queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ queryExecutor =
+ QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration());
iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
} catch (QueryExecutionException e) {
throw new InterruptedException(e.getMessage());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 7916932..5a1e140 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -414,7 +414,7 @@ public class CarbonTableReader {
List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
CarbonTable carbonTable = tableCacheModel.carbonTable;
TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo();
- Configuration config = new Configuration();
+ Configuration config = FileFactory.getConfiguration();
config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
index 127c4c9..2f3b8f4 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
@@ -32,6 +32,7 @@ import com.facebook.presto.tests.DistributedQueryRunner
import com.google.common.collect.ImmutableMap
import org.slf4j.{Logger, LoggerFactory}
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.presto.CarbondataPlugin
object PrestoServer {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
index 0ac6e72..6cbe747 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
@@ -41,8 +41,7 @@ class LuceneFineGrainDataMapWithSearchModeSuite extends QueryTest with BeforeAnd
val n = 500000
sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
CarbonProperties
- .getInstance()
- .addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s")
+ .getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s")
LuceneFineGrainDataMapSuite.createFile(file2, n)
sql("create database if not exists lucene")
sql("use lucene")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
index 062e5ba..c95e5a4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
@@ -146,13 +146,13 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
}
test("test create table as select with TBLPROPERTIES") {
- sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
+ sql("DROP TABLE IF EXISTS ctas_tblproperties_testt")
sql(
- "create table ctas_tblproperties_test stored by 'carbondata' TBLPROPERTIES" +
+ "create table ctas_tblproperties_testt stored by 'carbondata' TBLPROPERTIES" +
"('DICTIONARY_INCLUDE'='key', 'sort_scope'='global_sort') as select * from carbon_ctas_test")
- checkAnswer(sql("select * from ctas_tblproperties_test"), sql("select * from carbon_ctas_test"))
+ checkAnswer(sql("select * from ctas_tblproperties_testt"), sql("select * from carbon_ctas_test"))
val carbonTable = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore
- .lookupRelation(Option("default"), "ctas_tblproperties_test")(Spark2TestQueryExecutor.spark)
+ .lookupRelation(Option("default"), "ctas_tblproperties_testt")(Spark2TestQueryExecutor.spark)
.asInstanceOf[CarbonRelation].carbonTable
val metadataFolderPath: CarbonFile = FileFactory.getCarbonFile(carbonTable.getMetadataPath)
assert(metadataFolderPath.exists())
@@ -419,7 +419,7 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
sql("DROP TABLE IF EXISTS ctas_select_where_orc")
- sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
+ sql("DROP TABLE IF EXISTS ctas_tblproperties_testt")
sql("DROP TABLE IF EXISTS ctas_if_table_name")
sql("DROP TABLE IF EXISTS source_table")
sql("DROP TABLE IF EXISTS target_table")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index 36d8c51..8d6dd32 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -37,6 +37,8 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
import org.apache.spark.sql.util.SparkSQLUtil.sessionState
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.rdd.SerializableConfiguration
@@ -108,17 +110,18 @@ object CsvRDDHelper {
closePartition()
// 2. read function
- val serializableConfiguration = new SerializableConfiguration(jobConf)
+ val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
override def apply(file: PartitionedFile): Iterator[InternalRow] = {
new Iterator[InternalRow] {
- val hadoopConf = serializableConfiguration.value
val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
formatter.format(new Date())
}
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(serializableConfiguration.value)
val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
- val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration,
+ attemptId)
val inputSplit =
new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
var finished = false
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index dc238fb..be40b13 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -29,11 +29,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
/**
* Use sortBy operator in spark to load the data
@@ -64,6 +66,7 @@ object DataLoadProcessBuilderOnSpark {
val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
+ val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// 1. Input
val inputRDD = originRDD
.mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
@@ -73,6 +76,7 @@ object DataLoadProcessBuilderOnSpark {
// 2. Convert
val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
convertStepRowCounter)
}.filter(_ != null)// Filter the bad record
@@ -116,7 +120,7 @@ object DataLoadProcessBuilderOnSpark {
// 4. Write
sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
- writeStepRowCounter))
+ writeStepRowCounter, conf))
// clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
// not have any functional impact as spark automatically monitors the cache usage on each node
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 73ed769..f17bd91 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.load
import scala.util.Random
import com.univocity.parsers.common.TextParsingException
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.Row
@@ -29,7 +30,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -40,7 +42,7 @@ import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImp
import org.apache.carbondata.processing.sort.sortdata.SortParameters
import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
-import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
+import org.apache.carbondata.spark.rdd.{NewRddIterator, SerializableConfiguration, StringArrayRow}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
object DataLoadProcessorStepOnSpark {
@@ -227,7 +229,9 @@ object DataLoadProcessorStepOnSpark {
rows: Iterator[CarbonRow],
index: Int,
modelBroadcast: Broadcast[CarbonLoadModel],
- rowCounter: Accumulator[Int]) {
+ rowCounter: Accumulator[Int],
+ conf: Broadcast[SerializableConfiguration]) {
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
var model: CarbonLoadModel = null
var tableName: String = null
var rowConverter: RowConverterImpl = null
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 7c1edea..f7aa623 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -17,7 +17,8 @@
package org.apache.carbondata.spark.rdd
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -47,15 +48,15 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par
/**
* This class is aimed at generating dictionary file for the newly added columns
*/
-class AlterTableAddColumnRDD[K, V](sc: SparkContext,
+class AlterTableAddColumnRDD[K, V](@transient sparkSession: SparkSession,
@transient newColumns: Seq[ColumnSchema],
identifier: AbsoluteTableIdentifier)
- extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
+ extends CarbonRDD[(Int, SegmentStatus)](sparkSession, Nil) {
val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
newColumns.zipWithIndex.map { column =>
new AddColumnPartition(id, column._2, column._1)
}.toArray
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index 0dbb4f0..a0d06b8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -17,11 +17,12 @@
package org.apache.carbondata.spark.rdd
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.statusmanager.SegmentStatus
@@ -44,12 +45,12 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa
/**
* This class is aimed at generating dictionary file for the newly added columns
*/
-class AlterTableDropColumnRDD[K, V](sc: SparkContext,
+class AlterTableDropColumnRDD[K, V](@transient ss: SparkSession,
@transient newColumns: Seq[ColumnSchema],
carbonTableIdentifier: AbsoluteTableIdentifier)
- extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
+ extends CarbonRDD[(Int, SegmentStatus)](ss, Nil) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
newColumns.zipWithIndex.map { column =>
new DropColumnPartition(id, column._2, column._1)
}.toArray
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 85a6f41..86a5043 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -39,7 +39,8 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
partitionIds: Seq[String],
bucketId: Int,
identifier: AbsoluteTableIdentifier,
- prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
+ prev: RDD[Array[AnyRef]])
+ extends CarbonRDD[(K, V)](alterPartitionModel.sqlContext.sparkSession, prev) {
var storeLocation: String = null
val carbonLoadModel = alterPartitionModel.carbonLoadModel
@@ -50,14 +51,14 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
val factTableName = carbonTable.getTableName
val partitionInfo = carbonTable.getPartitionInfo(factTableName)
- override protected def getPartitions: Array[Partition] = {
+ override protected def internalGetPartitions: Array[Partition] = {
val sc = alterPartitionModel.sqlContext.sparkContext
sc.setLocalProperty("spark.scheduler.pool", "DDL")
sc.setLocalProperty("spark.job.interruptOnCancel", "true")
firstParent[Array[AnyRef]].partitions
}
- override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+ override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
val iter = new Iterator[(K, V)] {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index d56e1c2..e2d1eff 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -21,7 +21,8 @@ import java.util
import scala.collection.JavaConverters._
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.indexstore.PartitionSpec
@@ -37,19 +38,19 @@ case class CarbonDropPartition(rddId: Int, val idx: Int, segment: Segment)
/**
* RDD to drop the partitions from segment files of all segments.
- * @param sc
+ * @param ss
* @param tablePath
* @param segments segments to be cleaned
*/
class CarbonDropPartitionRDD(
- sc: SparkContext,
+ @transient ss: SparkSession,
tablePath: String,
segments: Seq[Segment],
partitions: util.List[PartitionSpec],
uniqueId: String)
- extends CarbonRDD[(String, String)](sc, Nil, sc.hadoopConfiguration) {
+ extends CarbonRDD[(String, String)](ss, Nil) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
segments.zipWithIndex.map {s =>
CarbonDropPartition(id, s._2, s._1)
}.toArray
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 2ec8b9c..9265c7f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -31,7 +31,7 @@ import com.univocity.parsers.common.TextParsingException
import org.apache.commons.lang3.{ArrayUtils, StringUtils}
import org.apache.spark._
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -174,11 +174,12 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
* @param model a model package load info
*/
class CarbonAllDictionaryCombineRDD(
+ @transient sparkSession: SparkSession,
prev: RDD[(String, Iterable[String])],
model: DictionaryLoadModel)
- extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
+ extends CarbonRDD[(Int, ColumnDistinctValues)](sparkSession, prev) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
firstParent[(String, Iterable[String])].partitions
}
@@ -267,11 +268,12 @@ class StringArrayRow(var values: Array[String]) extends Row {
* @param model a model package load info
*/
class CarbonBlockDistinctValuesCombineRDD(
+ @transient ss: SparkSession,
prev: RDD[Row],
model: DictionaryLoadModel)
- extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
+ extends CarbonRDD[(Int, ColumnDistinctValues)](ss, prev) {
- override def getPartitions: Array[Partition] = firstParent[Row].partitions
+ override def internalGetPartitions: Array[Partition] = firstParent[Row].partitions
override def internalCompute(split: Partition,
context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -325,11 +327,14 @@ class CarbonBlockDistinctValuesCombineRDD(
* @param model a model package load info
*/
class CarbonGlobalDictionaryGenerateRDD(
+ @transient sparkSession: SparkSession,
prev: RDD[(Int, ColumnDistinctValues)],
model: DictionaryLoadModel)
- extends CarbonRDD[(Int, SegmentStatus)](prev) {
+ extends CarbonRDD[(Int, SegmentStatus)](sparkSession, prev) {
+
+ override def internalGetPartitions: Array[Partition] =
+ firstParent[(Int, ColumnDistinctValues)].partitions
- override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
override def internalCompute(split: Partition,
context: TaskContext): Iterator[(Int, SegmentStatus)] = {
@@ -492,21 +497,20 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
* Use external column dict to generate global dictionary
*
* @param carbonLoadModel carbon load model
- * @param sparkContext spark context
+ * @param sparkSession spark context
* @param table carbon table identifier
* @param dimensions carbon dimenisons having predefined dict
* @param dictFolderPath path of dictionary folder
*/
class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
dictionaryLoadModel: DictionaryLoadModel,
- sparkContext: SparkContext,
+ @transient ss: SparkSession,
table: CarbonTableIdentifier,
dimensions: Array[CarbonDimension],
dictFolderPath: String)
- extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil,
- sparkContext.hadoopConfiguration) {
+ extends CarbonRDD[(Int, ColumnDistinctValues)](ss, Nil) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
val primDimensions = dictionaryLoadModel.primDimensions
val primDimLength = primDimensions.length
val result = new Array[Partition](primDimLength)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 3aaf0ae..762b920 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -24,14 +24,15 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.Partition
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.CarbonMergerMapping
-import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.api.CarbonInputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -41,23 +42,23 @@ import org.apache.carbondata.spark.MergeResult
* IUD carbon merger RDD
* */
class CarbonIUDMergerRDD[K, V](
- sc: SparkContext,
+ @transient ss: SparkSession,
result: MergeResult[K, V],
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping,
confExecutorsTemp: String)
- extends CarbonMergerRDD[K, V](sc,
+ extends CarbonMergerRDD[K, V](ss,
result,
carbonLoadModel,
carbonMergerMapping,
confExecutorsTemp) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
- val jobConf: JobConf = new JobConf(new Configuration)
+ val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
SparkHadoopUtil.get.addCredentials(jobConf)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index f9f65a7..a0425b7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block._
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
@@ -60,15 +61,15 @@ import org.apache.carbondata.spark.MergeResult
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
class CarbonMergerRDD[K, V](
- sc: SparkContext,
+ @transient ss: SparkSession,
result: MergeResult[K, V],
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping,
confExecutorsTemp: String)
- extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
+ extends CarbonRDD[(K, V)](ss, Nil) {
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
- sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+ ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
+ ss.sparkContext.setLocalProperty("spark.job.interruptOnCancel", "true")
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
var storeLocation: String = null
@@ -183,7 +184,7 @@ class CarbonMergerRDD[K, V](
}
try {
// fire a query and get the results.
- rawResultIteratorList = exec.processTableBlocks()
+ rawResultIteratorList = exec.processTableBlocks(FileFactory.getConfiguration)
} catch {
case e: Throwable =>
LOGGER.error(e)
@@ -269,7 +270,7 @@ class CarbonMergerRDD[K, V](
iter
}
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
@@ -277,7 +278,7 @@ class CarbonMergerRDD[K, V](
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
carbonTable)
- val jobConf: JobConf = new JobConf(new Configuration)
+ val jobConf: JobConf = new JobConf(getConf)
SparkHadoopUtil.get.addCredentials(jobConf)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 54a7530..04f20b1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -17,28 +17,24 @@
package org.apache.carbondata.spark.rdd
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
-
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.util.SparkSQLUtil
-import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util._
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
/**
* This RDD maintains session level ThreadLocal
*/
-abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
- @transient private var deps: Seq[Dependency[_]],
- @transient hadoopConf: Configuration) extends RDD[T](sc, deps) {
+abstract class CarbonRDD[T: ClassTag](@transient ss: SparkSession,
+ @transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) {
val carbonSessionInfo: CarbonSessionInfo = {
var info = ThreadLocalSessionInfo.getCarbonSessionInfo
@@ -50,24 +46,27 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
info
}
- private val confBytes = {
- val bao = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(bao)
- hadoopConf.write(oos)
- oos.close()
- CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
- }
+ val config: Broadcast[SerializableConfiguration] = sparkContext
+ .broadcast(new SerializableConfiguration(SparkSQLUtil.sessionState(ss).newHadoopConf()))
/** Construct an RDD with just a one-to-one dependency on one parent */
- def this(@transient oneParent: RDD[_]) =
- this (oneParent.context, List(new OneToOneDependency(oneParent)),
- oneParent.sparkContext.hadoopConfiguration)
+ def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) =
+ this (sparkSession, List(new OneToOneDependency(oneParent)))
+
+ protected def internalGetPartitions: Array[Partition]
+
+ override def getPartitions: Array[Partition] = {
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(config.value.value)
+ internalGetPartitions
+ }
// RDD compute logic should be here
def internalCompute(split: Partition, context: TaskContext): Iterator[T]
final def compute(split: Partition, context: TaskContext): Iterator[T] = {
- CarbonInputFormatUtil.setS3Configurations(getConf)
+ TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll())
+ carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", config
+ .value.value)
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
val carbonTaskInfo = new CarbonTaskInfo
@@ -79,13 +78,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
}
def getConf: Configuration = {
- val configuration = new Configuration(false)
- val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
- .unCompressByte(confBytes))
- val ois = new ObjectInputStream(bai)
- configuration.readFields(ois)
- ois.close()
- configuration
+ config.value.value
}
}
@@ -93,12 +86,14 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
* This RDD contains TableInfo object which is serialized and deserialized in driver and executor
*/
abstract class CarbonRDDWithTableInfo[T: ClassTag](
- @transient sc: SparkContext,
+ @transient ss: SparkSession,
@transient private var deps: Seq[Dependency[_]],
- serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps, sc.hadoopConfiguration) {
+ serializedTableInfo: Array[Byte]) extends CarbonRDD[T](ss, deps) {
- def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) =
- this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo)
+ def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_],
+ serializedTableInfo: Array[Byte]) = {
+ this (sparkSession, List(new OneToOneDependency(oneParent)), serializedTableInfo)
+ }
def getTableInfo: TableInfo = TableInfo.deserialize(serializedTableInfo)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index 9452777..241720a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.AlterPartitionModel
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.util.CarbonException
@@ -36,6 +37,7 @@ import org.apache.spark.util.PartitionUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -65,7 +67,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
absoluteTableIdentifier: AbsoluteTableIdentifier,
partitionIds: Seq[String],
bucketId: Int)
- extends RDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkContext, Nil) {
+ extends CarbonRDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkSession, Nil) {
private val queryId = alterPartitionModel.sqlContext.sparkContext.getConf
.get("queryId", System.nanoTime() + "")
@@ -91,9 +93,9 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
val dictionaryIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]()
val measureIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]()
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
val parallelism = sparkContext.defaultParallelism
- val jobConf = new JobConf(new Configuration)
+ val jobConf = new JobConf(FileFactory.getConfiguration)
val job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonTableInputFormat(absoluteTableIdentifier,
partitionIds.toList.asJava, job)
@@ -127,8 +129,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
result.toArray(new Array[Partition](result.size()))
}
- override def compute(split: Partition, context: TaskContext):
- Iterator[(AnyRef, Array[AnyRef])] = {
+ override def internalCompute(split: Partition, context: TaskContext):
+ Iterator[(AnyRef, Array[AnyRef])] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var exec : CarbonSplitExecutor = null
val rows : java.util.List[(AnyRef, Array[AnyRef])] = new ArrayList[(AnyRef, Array[AnyRef])]()
@@ -142,7 +144,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
var result : java.util.List[PartitionSpliterRawResultIterator] = null
try {
exec = new CarbonSplitExecutor(segmentMapping, carbonTable)
- result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl())
+ result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl(),
+ FileFactory.getConfiguration)
} catch {
case e: Throwable =>
LOGGER.error(e)
[2/3] carbondata git commit: [CARBONDATA-2844] [CARBONDATA-2865] Pass
SK/AK to executor by serializing hadoop configuration from driver.
Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e60d5b8..f5d96fc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -44,6 +44,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
import org.apache.carbondata.core.datastore.block.Distributable
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.TableInfo
@@ -79,7 +80,7 @@ class CarbonScanRDD[T: ClassTag](
@transient val partitionNames: Seq[PartitionSpec],
val dataTypeConverterClz: Class[_ <: DataTypeConverter] = classOf[SparkDataTypeConverterImpl],
val readSupportClz: Class[_ <: CarbonReadSupport[_]] = SparkReadSupport.readSupportClass)
- extends CarbonRDDWithTableInfo[T](spark.sparkContext, Nil, serializedTableInfo) {
+ extends CarbonRDDWithTableInfo[T](spark, Nil, serializedTableInfo) {
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
private val jobTrackerId: String = {
@@ -92,7 +93,7 @@ class CarbonScanRDD[T: ClassTag](
@transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
var partitions: Array[Partition] = Array.empty[Partition]
var getSplitsStartTime: Long = -1
@@ -105,7 +106,7 @@ class CarbonScanRDD[T: ClassTag](
var numBlocks = 0
try {
- val conf = new Configuration()
+ val conf = FileFactory.getConfiguration
val jobConf = new JobConf(conf)
SparkHadoopUtil.get.addCredentials(jobConf)
val job = Job.getInstance(jobConf)
@@ -405,7 +406,7 @@ class CarbonScanRDD[T: ClassTag](
val executionId = context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val taskId = split.index
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
- val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
+ val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
TaskMetricsMap.getInstance().registerThreadCallback()
@@ -436,14 +437,16 @@ class CarbonScanRDD[T: ClassTag](
"true")
if (carbonRecordReader == null) {
new CarbonRecordReader(model,
- format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats)
+ format.getReadSupportClass(attemptContext.getConfiguration),
+ inputMetricsStats,
+ attemptContext.getConfiguration)
} else {
carbonRecordReader
}
} else {
new CarbonRecordReader(model,
format.getReadSupportClass(attemptContext.getConfiguration),
- inputMetricsStats)
+ inputMetricsStats, attemptContext.getConfiguration)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 3848bad..1ada51b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.command.ExecutionErrors
import org.apache.spark.util.SparkUtil
@@ -41,11 +41,11 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalSessionInfo, ThreadLocalTaskInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations}
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -160,29 +160,20 @@ class SparkPartitionLoader(model: CarbonLoadModel,
* It loads the data to carbon using @AbstractDataLoadProcessorStep
*/
class NewCarbonDataLoadRDD[K, V](
- sc: SparkContext,
+ @transient ss: SparkSession,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
- blocksGroupBy: Array[(String, Array[BlockDetails])],
- @transient hadoopConf: Configuration)
- extends CarbonRDD[(K, V)](sc, Nil, hadoopConf) {
+ blocksGroupBy: Array[(String, Array[BlockDetails])])
+ extends CarbonRDD[(K, V)](ss, Nil) {
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
+ ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
formatter.format(new Date())
}
- private val confBytes = {
- val bao = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(bao)
- hadoopConf.write(oos)
- oos.close()
- CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
- }
-
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
blocksGroupBy.zipWithIndex.map { b =>
new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
}
@@ -245,10 +236,7 @@ class NewCarbonDataLoadRDD[K, V](
def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, theSplit.index, 0)
- var configuration: Configuration = getConf
- if (configuration == null) {
- configuration = new Configuration()
- }
+ val configuration: Configuration = FileFactory.getConfiguration
CommonUtil.configureCSVInputFormat(configuration, carbonLoadModel)
val hadoopAttemptContext = new TaskAttemptContextImpl(configuration, attemptId)
val format = new CSVInputFormat
@@ -319,24 +307,13 @@ class NewCarbonDataLoadRDD[K, V](
* @see org.apache.carbondata.processing.newflow.DataLoadExecutor
*/
class NewDataFrameLoaderRDD[K, V](
- sc: SparkContext,
+ @transient ss: SparkSession,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
- prev: DataLoadCoalescedRDD[Row],
- @transient hadoopConf: Configuration) extends CarbonRDD[(K, V)](prev) {
-
- private val confBytes = {
- val bao = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(bao)
- hadoopConf.write(oos)
- oos.close()
- CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
- }
+ prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](ss, prev) {
override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val hadoopConf = getConf
- CarbonInputFormatUtil.setS3Configurations(hadoopConf)
val iter = new Iterator[(K, V)] {
val loadMetadataDetails = new LoadMetadataDetails()
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
@@ -404,7 +381,7 @@ class NewDataFrameLoaderRDD[K, V](
}
iter
}
- override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+ override protected def internalGetPartitions: Array[Partition] = firstParent[Row].partitions
}
/**
@@ -528,10 +505,10 @@ class LazyRddIterator(serializer: SerializerInstance,
* @see org.apache.carbondata.processing.newflow.DataLoadExecutor
*/
class PartitionTableDataLoaderRDD[K, V](
- sc: SparkContext,
+ @transient ss: SparkSession,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
- prev: RDD[Row]) extends CarbonRDD[(K, V)](prev) {
+ prev: RDD[Row]) extends CarbonRDD[(K, V)](ss, prev) {
override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -596,6 +573,6 @@ class PartitionTableDataLoaderRDD[K, V](
iter
}
- override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+ override protected def internalGetPartitions: Array[Partition] = firstParent[Row].partitions
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 43ee31b..b8e73d5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -25,9 +25,12 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException}
+import org.apache.spark.{Partition, TaskContext, TaskKilledException}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.core.datamap.{AbstractDataMapJob, DistributableDataMapFormat}
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.ExtendedBlocklet
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
@@ -38,8 +41,8 @@ class SparkDataMapJob extends AbstractDataMapJob {
override def execute(dataMapFormat: DistributableDataMapFormat,
filter: FilterResolverIntf): util.List[ExtendedBlocklet] = {
- new DataMapPruneRDD(SparkContext.getOrCreate(), dataMapFormat, filter).collect().toList
- .asJava
+ new DataMapPruneRDD(SparkSQLUtil.getSparkSession, dataMapFormat, filter).collect()
+ .toList.asJava
}
}
@@ -51,13 +54,13 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte
/**
* RDD to prune the datamaps across spark cluster
- * @param sc
+ * @param ss
* @param dataMapFormat
*/
-class DataMapPruneRDD(sc: SparkContext,
+class DataMapPruneRDD(@transient ss: SparkSession,
dataMapFormat: DistributableDataMapFormat,
resolverIntf: FilterResolverIntf)
- extends CarbonRDD[(ExtendedBlocklet)](sc, Nil, sc.hadoopConfiguration) {
+ extends CarbonRDD[(ExtendedBlocklet)](ss, Nil) {
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -67,7 +70,7 @@ class DataMapPruneRDD(sc: SparkContext,
override def internalCompute(split: Partition,
context: TaskContext): Iterator[ExtendedBlocklet] = {
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
- val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
+ val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
reader.initialize(inputSplit, attemptContext)
@@ -102,8 +105,8 @@ class DataMapPruneRDD(sc: SparkContext,
iter
}
- override protected def getPartitions: Array[Partition] = {
- val job = Job.getInstance(new Configuration())
+ override protected def internalGetPartitions: Array[Partition] = {
+ val job = Job.getInstance(FileFactory.getConfiguration)
val splits = dataMapFormat.getSplits(job)
splits.asScala.zipWithIndex.map(f => new DataMapRDDPartition(id, f._2, f._1)).toArray
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index f7bbf06..39e1875 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -98,11 +98,10 @@ class StreamingRawResultIterator(
* execute streaming segment handoff
*/
class StreamHandoffRDD[K, V](
- sc: SparkContext,
+ @transient ss: SparkSession,
result: HandoffResult[K, V],
carbonLoadModel: CarbonLoadModel,
- handOffSegmentId: String
-) extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
+ handOffSegmentId: String) extends CarbonRDD[(K, V)](ss, Nil) {
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -111,8 +110,7 @@ class StreamHandoffRDD[K, V](
override def internalCompute(
split: Partition,
- context: TaskContext
- ): Iterator[(K, V)] = {
+ context: TaskContext): Iterator[(K, V)] = {
carbonLoadModel.setTaskNo("" + split.index)
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
@@ -148,7 +146,7 @@ class StreamHandoffRDD[K, V](
): util.ArrayList[RawResultIterator] = {
val inputSplit = split.asInstanceOf[HandoffPartition].split.value
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
- val hadoopConf = new Configuration()
+ val hadoopConf = getConf
CarbonInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
CarbonInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
CarbonInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
@@ -200,7 +198,7 @@ class StreamHandoffRDD[K, V](
/**
* get the partitions of the handoff segment
*/
- override protected def getPartitions: Array[Partition] = {
+ override protected def internalGetPartitions: Array[Partition] = {
val job = Job.getInstance(FileFactory.getConfiguration)
val inputFormat = new CarbonTableInputFormat[Array[Object]]()
val segmentList = new util.ArrayList[Segment](1)
@@ -323,7 +321,7 @@ object StreamHandoffRDD {
// convert a streaming segment to columnar segment
val status = new StreamHandoffRDD(
- sparkSession.sparkContext,
+ sparkSession,
new HandoffResultImpl(),
carbonLoadModel,
handoffSegmenId).collect()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 1cd4d77..e79c63b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.rdd.CarbonMergeFilesRDD
-import org.apache.spark.sql.{Row, RowFactory}
+import org.apache.spark.sql.{Row, RowFactory, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
@@ -839,7 +839,7 @@ object CommonUtil {
* which do not store the blocklet info to current
* version
*/
- def mergeIndexFiles(sparkContext: SparkContext,
+ def mergeIndexFiles(sparkSession: SparkSession,
segmentIds: Seq[String],
segmentFileNameToSegmentIdMap: java.util.Map[String, String],
tablePath: String,
@@ -848,7 +848,7 @@ object CommonUtil {
readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
if (mergeIndexProperty) {
new CarbonMergeFilesRDD(
- sparkContext,
+ sparkSession,
carbonTable,
segmentIds,
segmentFileNameToSegmentIdMap,
@@ -860,7 +860,7 @@ object CommonUtil {
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
new CarbonMergeFilesRDD(
- sparkContext,
+ sparkSession,
carbonTable,
segmentIds,
segmentFileNameToSegmentIdMap,
@@ -871,7 +871,7 @@ object CommonUtil {
case _: Exception =>
if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
new CarbonMergeFilesRDD(
- sparkContext,
+ sparkSession,
carbonTable,
segmentIds,
segmentFileNameToSegmentIdMap,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 1bb3912..67c4c9b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -461,9 +461,11 @@ object GlobalDictionaryUtil {
dictFolderPath, forPreDefDict = true)
// new RDD to achieve distributed column dict generation
val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel,
- sqlContext.sparkContext, table, dimensions, dictFolderPath)
+ sqlContext.sparkSession, table, dimensions, dictFolderPath)
.partitionBy(new ColumnPartitioner(dictLoadModel.primDimensions.length))
- val statusList = new CarbonGlobalDictionaryGenerateRDD(extInputRDD, dictLoadModel).collect()
+ val statusList = new CarbonGlobalDictionaryGenerateRDD(sqlContext.sparkSession, extInputRDD,
+ dictLoadModel)
+ .collect()
// check result status
checkStatus(carbonLoadModel, sqlContext, dictLoadModel, statusList)
}
@@ -670,10 +672,13 @@ object GlobalDictionaryUtil {
val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
requireDimension, dictfolderPath, false)
// combine distinct value in a block and partition by column
- val inputRDD = new CarbonBlockDistinctValuesCombineRDD(dictRdd, model)
+ val inputRDD = new CarbonBlockDistinctValuesCombineRDD(sqlContext.sparkSession, dictRdd,
+ model)
.partitionBy(new ColumnPartitioner(model.primDimensions.length))
// generate global dictionary files
- val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
+ val statusList = new CarbonGlobalDictionaryGenerateRDD(sqlContext.sparkSession,
+ inputRDD, model)
+ .collect()
// check result status
checkStatus(carbonLoadModel, sqlContext, model, statusList)
} else {
@@ -731,10 +736,13 @@ object GlobalDictionaryUtil {
val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers,
requireColumnNames, allDictionaryPathAppended, accumulator)
// read exist dictionary and combine
- val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)
+ val inputRDD = new CarbonAllDictionaryCombineRDD(sqlContext.sparkSession,
+ allDictionaryRdd, model)
.partitionBy(new ColumnPartitioner(model.primDimensions.length))
// generate global dictionary files
- val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
+ val statusList = new CarbonGlobalDictionaryGenerateRDD(sqlContext.sparkSession, inputRDD,
+ model)
+ .collect()
// check result status
checkStatus(carbonLoadModel, sqlContext, model, statusList)
// if the dictionary contains wrong format record, throw ex
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index 1acdf7e..b5147f0 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -17,7 +17,8 @@
package org.apache.spark.rdd
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -35,20 +36,20 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String)
/**
* RDD to merge all carbonindex files of each segment to carbonindex file into the same segment.
- * @param sc
+ * @param ss
* @param carbonTable
* @param segments segments to be merged
*/
class CarbonMergeFilesRDD(
- sc: SparkContext,
+ @transient ss: SparkSession,
carbonTable: CarbonTable,
segments: Seq[String],
segmentFileNameToSegmentIdMap: java.util.Map[String, String],
isHivePartitionedTable: Boolean,
readFileFooterFromCarbonDataFile: Boolean)
- extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) {
+ extends CarbonRDD[String](ss, Nil) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
segments.zipWithIndex.map {s =>
CarbonMergeFilePartition(id, s._2, s._1)
}.toArray
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
index 6a97477..2854c91 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
import org.apache.spark._
+import org.apache.spark.sql.SparkSession
import org.apache.carbondata.spark.rdd.CarbonRDD
@@ -27,12 +28,12 @@ import org.apache.carbondata.spark.rdd.CarbonRDD
case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
class DataLoadCoalescedRDD[T: ClassTag](
+ @transient sparkSession: SparkSession,
@transient var prev: RDD[T],
nodeList: Array[String])
- extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil,
- prev.sparkContext.hadoopConfiguration) {
+ extends CarbonRDD[DataLoadPartitionWrap[T]](sparkSession, Nil) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
new DataLoadPartitionCoalescer(prev, nodeList).run
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 1b48c08..da22658 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -117,7 +117,7 @@ case class CarbonMergerMapping(
var maxSegmentColCardinality: Array[Int],
// maxSegmentColumnSchemaList is list of column schema of last segment of compaction
var maxSegmentColumnSchemaList: List[ColumnSchema],
- currentPartitions: Option[Seq[PartitionSpec]])
+ @transient currentPartitions: Option[Seq[PartitionSpec]])
case class NodeInfo(TaskId: String, noOfBlocks: Int)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 2e31a82..b5fda85 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -27,4 +27,8 @@ object SparkSQLUtil {
def execute(logicalPlan: LogicalPlan, sparkSession: SparkSession): DataFrame = {
Dataset.ofRows(sparkSession, logicalPlan)
}
+
+ def getSparkSession: SparkSession = {
+ SparkSession.getDefaultSession.get
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index fdbf400..e2aa9ae 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -159,7 +159,7 @@ object PartitionUtils {
partitionIds: List[String], oldPartitionIdList: List[Int],
partitionInfo: PartitionInfo,
carbonTable: CarbonTable): java.util.List[TableBlockInfo] = {
- val jobConf = new JobConf(new Configuration)
+ val jobConf = new JobConf(FileFactory.getConfiguration)
val job = new Job(jobConf)
val format = CarbonInputFormatUtil
.createCarbonTableInputFormat(identifier, partitionIds.asJava, job)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 2c53d20..f237552 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -136,7 +136,8 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
queryModel.setTableBlockInfos(tableBlockInfoList);
queryModel.setVectorReader(true);
try {
- queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ queryExecutor =
+ QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration());
iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
} catch (QueryExecutionException e) {
if (ExceptionUtils.indexOfThrowable(e, FileNotFoundException.class) > 0) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index d321cab..81b395e 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -341,7 +341,8 @@ class SparkCarbonFileFormat extends FileFormat
split.setDetailInfo(info)
info.setBlockSize(file.length)
// Read the footer offset and set.
- val reader = FileFactory.getFileHolder(FileFactory.getFileType(file.filePath))
+ val reader = FileFactory.getFileHolder(FileFactory.getFileType(file.filePath),
+ broadcastedHadoopConf.value.value)
val buffer = reader
.readByteBuffer(FileFactory.getUpdatedFilePath(file.filePath), file.length - 8, 8)
info.setBlockFooterOffset(buffer.getLong)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 8309065..82c64a4 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -292,14 +292,13 @@ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Ar
}
class IndexDataMapRebuildRDD[K, V](
- session: SparkSession,
+ @transient session: SparkSession,
result: RefreshResult[K, V],
@transient tableInfo: TableInfo,
dataMapName: String,
indexColumns: Array[CarbonColumn],
- segments: Set[Segment]
-) extends CarbonRDDWithTableInfo[(K, V)](
- session.sparkContext, Nil, tableInfo.serialize()) {
+ segments: Set[Segment])
+ extends CarbonRDDWithTableInfo[(K, V)](session, Nil, tableInfo.serialize()) {
private val dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName)
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
@@ -323,7 +322,7 @@ class IndexDataMapRebuildRDD[K, V](
inputMetrics.initBytesReadCallback(context, inputSplit)
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
- val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
+ val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
val format = createInputFormat(segment.get, attemptContext)
val model = format.createQueryModel(inputSplit, attemptContext)
@@ -351,7 +350,8 @@ class IndexDataMapRebuildRDD[K, V](
} else {
new OriginalReadSupport(indexColumns.map(_.getDataType))
}
- reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics)
+ reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics,
+ attemptContext.getConfiguration)
reader.initialize(inputSplit, attemptContext)
// skip clear datamap and we will do this adter rebuild
reader.setSkipClearDataMapAtClose(true)
@@ -439,11 +439,11 @@ class IndexDataMapRebuildRDD[K, V](
format
}
- override protected def getPartitions = {
+ override protected def internalGetPartitions = {
if (!dataMapSchema.isIndexDataMap) {
throw new UnsupportedOperationException
}
- val conf = new Configuration()
+ val conf = FileFactory.getConfiguration
val jobConf = new JobConf(conf)
SparkHadoopUtil.get.addCredentials(jobConf)
val job = Job.getInstance(jobConf)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6e322b3..0fd4e34 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -62,9 +62,10 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
@@ -329,7 +330,8 @@ object CarbonDataRDDFactory {
dataFrame,
carbonLoadModel,
updateModel,
- carbonTable)
+ carbonTable,
+ hadoopConf)
res.foreach { resultOfSeg =>
resultOfSeg.foreach { resultOfBlock =>
if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
@@ -680,7 +682,8 @@ object CarbonDataRDDFactory {
dataFrame: Option[DataFrame],
carbonLoadModel: CarbonLoadModel,
updateModel: Option[UpdateTableModel],
- carbonTable: CarbonTable): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
+ carbonTable: CarbonTable,
+ hadoopConf: Configuration): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate
val updateRdd = dataFrame.get.rdd
@@ -720,7 +723,9 @@ object CarbonDataRDDFactory {
// because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
// so segmentIdIndex=partitionId/parallelism, this has been verified.
+ val conf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
partitionByRdd.map(_._2).mapPartitions { partition =>
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
val partitionId = TaskContext.getPartitionId()
val segIdIndex = partitionId / segmentUpdateParallelism
val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
@@ -1070,7 +1075,7 @@ object CarbonDataRDDFactory {
try {
val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
new PartitionTableDataLoaderRDD(
- sqlContext.sparkContext,
+ sqlContext.sparkSession,
new DataLoadResultImpl(),
carbonLoadModel,
rdd
@@ -1099,14 +1104,14 @@ object CarbonDataRDDFactory {
val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
nodeNumOfData,
sqlContext.sparkContext)
- val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
+ val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray
+ .distinct)
new NewDataFrameLoaderRDD(
- sqlContext.sparkContext,
+ sqlContext.sparkSession,
new DataLoadResultImpl(),
carbonLoadModel,
- newRdd,
- sqlContext.sparkContext.hadoopConfiguration
+ newRdd
).collect()
} catch {
case ex: Exception =>
@@ -1207,11 +1212,10 @@ object CarbonDataRDDFactory {
}.toArray
new NewCarbonDataLoadRDD(
- sqlContext.sparkContext,
+ sqlContext.sparkSession,
new DataLoadResultImpl(),
carbonLoadModel,
- blocksGroupBy,
- hadoopConf
+ blocksGroupBy
).collect()
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index fcc649e..d9884e1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -186,7 +186,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
val mergeStatus =
if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
new CarbonIUDMergerRDD(
- sc.sparkContext,
+ sc.sparkSession,
new MergeResultImpl(),
carbonLoadModel,
carbonMergerMapping,
@@ -194,7 +194,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
).collect
} else {
new CarbonMergerRDD(
- sc.sparkContext,
+ sc.sparkSession,
new MergeResultImpl(),
carbonLoadModel,
carbonMergerMapping,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 6622246..9b78db0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -31,9 +31,11 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -44,6 +46,8 @@ case class CarbonCountStar(
outUnsafeRows: Boolean = true) extends LeafExecNode {
override def doExecute(): RDD[InternalRow] = {
+ ThreadLocalSessionInfo
+ .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val (job, tableInputFormat) = createCarbonInputFormat(absoluteTableIdentifier)
CarbonInputFormat.setQuerySegment(job.getConfiguration, absoluteTableIdentifier)
@@ -73,7 +77,7 @@ case class CarbonCountStar(
private def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier
): (Job, CarbonTableInputFormat[Array[Object]]) = {
val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
- val jobConf: JobConf = new JobConf(new Configuration)
+ val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
SparkHadoopUtil.get.addCredentials(jobConf)
CarbonInputFormat.setTableInfo(jobConf, carbonTable.getTableInfo)
val job = new Job(jobConf)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index b5842a9..8a0404c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -56,7 +56,6 @@ case class CarbonDatasourceHadoopRelation(
caseInsensitiveMap("tablename"))
lazy val databaseName: String = carbonTable.getDatabaseName
lazy val tableName: String = carbonTable.getTableName
- CarbonInputFormatUtil.setS3Configurations(sparkSession.sessionState.newHadoopConf())
CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
@transient lazy val carbonRelation: CarbonRelation =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 34a8dce..d0ed56e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -41,9 +41,10 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.scan.executor.util.QueryUtil
-import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.rdd.CarbonRDDWithTableInfo
+import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, SerializableConfiguration}
/**
* It decodes the data.
@@ -75,9 +76,12 @@ case class CarbonDictionaryDecoder(
(carbonTable.getTableName, carbonTable)
}.toMap
+ val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
+ .sessionState.newHadoopConf()))
if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
val dataTypes = child.output.map { attr => attr.dataType }
child.execute().mapPartitions { iter =>
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
@@ -439,7 +443,9 @@ class CarbonDecoderRDD(
val prev: RDD[InternalRow],
output: Seq[Attribute],
serializedTableInfo: Array[Byte])
- extends CarbonRDDWithTableInfo[InternalRow](prev, serializedTableInfo) {
+ extends CarbonRDDWithTableInfo[InternalRow](relations.head.carbonRelation.sparkSession,
+ prev,
+ serializedTableInfo) {
def canBeDecoded(attr: Attribute): Boolean = {
profile match {
@@ -543,7 +549,8 @@ class CarbonDecoderRDD(
dicts
}
- override protected def getPartitions: Array[Partition] = firstParent[InternalRow].partitions
+ override protected def internalGetPartitions: Array[Partition] =
+ firstParent[InternalRow].partitions
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 7f26888..8253c4d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,11 +17,8 @@
package org.apache.spark.sql
-import java.io.File
import java.util.concurrent.ConcurrentHashMap
-import scala.util.Try
-
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -29,7 +26,6 @@ import org.apache.spark.sql.events.MergeIndexEventListener
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
import org.apache.spark.sql.hive._
-import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -101,6 +97,8 @@ class CarbonEnv {
threadLevelCarbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams)
}
ThreadLocalSessionInfo.setCarbonSessionInfo(threadLevelCarbonSessionInfo)
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(sparkSession
+ .sessionState.newHadoopConf())
val config = new CarbonSQLConf(sparkSession)
if (sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT).isEmpty) {
config.addDefaultCarbonParams()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index c59bb08..5af64ff 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -40,7 +40,6 @@ import org.apache.carbondata.common.annotations.InterfaceAudience
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.store.SparkCarbonStore
import org.apache.carbondata.streaming.CarbonStreamingQueryListener
@@ -432,6 +431,8 @@ object CarbonSession {
}
// preserve thread parameters across call
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+ ThreadLocalSessionInfo
+ .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index b162294..693a8c4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -329,7 +329,6 @@ object CarbonSource {
.contains("true")
tableInfo.setTransactionalTable(isTransactionalTable)
if (isTransactionalTable && !metaStore.isReadFromHiveMetaStore) {
- CarbonInputFormatUtil.setS3Configurations(sparkSession.sessionState.newHadoopConf())
// save to disk
metaStore.saveToDisk(tableInfo, properties("tablePath"))
// remove schema string from map as we don't store carbon schema to hive metastore
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 5bff9aa..24ef0db 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -54,7 +54,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
val sparkSession = SparkSession.getActiveSession.get
if(!carbonTable.isStreamingSink) {
if (null != compactedSegments && !compactedSegments.isEmpty) {
- mergeIndexFilesForCompactedSegments(sparkSession.sparkContext,
+ mergeIndexFilesForCompactedSegments(sparkSession,
carbonTable,
compactedSegments)
} else {
@@ -63,7 +63,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
segmentFileNameMap
.put(loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp))
- CommonUtil.mergeIndexFiles(sparkSession.sparkContext,
+ CommonUtil.mergeIndexFiles(sparkSession,
Seq(loadModel.getSegmentId),
segmentFileNameMap,
carbonTable.getTablePath,
@@ -77,9 +77,9 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
val alterTableCompactionPostEvent = event.asInstanceOf[AlterTableCompactionPostEvent]
val carbonTable = alterTableCompactionPostEvent.carbonTable
val mergedLoads = alterTableCompactionPostEvent.compactedLoads
- val sparkContext = alterTableCompactionPostEvent.sparkSession.sparkContext
+ val sparkSession = alterTableCompactionPostEvent.sparkSession
if(!carbonTable.isStreamingSink) {
- mergeIndexFilesForCompactedSegments(sparkContext, carbonTable, mergedLoads)
+ mergeIndexFilesForCompactedSegments(sparkSession, carbonTable, mergedLoads)
}
case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
val exceptionEvent = event.asInstanceOf[AlterTableMergeIndexEvent]
@@ -123,7 +123,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
// store (store <= 1.1 version) and create merge Index file as per new store so that
// old store is also upgraded to new store
CommonUtil.mergeIndexFiles(
- sparkContext = sparkSession.sparkContext,
+ sparkSession = sparkSession,
segmentIds = validSegmentIds,
segmentFileNameToSegmentIdMap = segmentFileNameMap,
tablePath = carbonMainTable.getTablePath,
@@ -155,7 +155,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
}
}
- def mergeIndexFilesForCompactedSegments(sparkContext: SparkContext,
+ def mergeIndexFilesForCompactedSegments(sparkSession: SparkSession,
carbonTable: CarbonTable,
mergedLoads: util.List[String]): Unit = {
// get only the valid segments of the table
@@ -182,7 +182,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
val validMergedSegIds = validSegments
.filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) }.map(_.getSegmentNo)
if (null != validMergedSegIds && !validMergedSegIds.isEmpty) {
- CommonUtil.mergeIndexFiles(sparkContext,
+ CommonUtil.mergeIndexFiles(sparkSession,
validMergedSegIds,
segmentFileNameMap,
carbonTable.getTablePath,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 6c74ad2..7cf8c1e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -18,15 +18,13 @@
package org.apache.spark.sql.execution.command.management
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
import org.apache.spark.storage.StorageLevel
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.util.CarbonSparkUtil
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
case class CarbonInsertIntoCommand(
relation: CarbonDatasourceHadoopRelation,
@@ -45,6 +43,9 @@ case class CarbonInsertIntoCommand(
case other => false
} isDefined
}
+
+ ThreadLocalSessionInfo
+ .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
val isPersistEnabledUserValue = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 460d7e6..516f9af 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.management
import java.text.SimpleDateFormat
import java.util
-import java.util.{List, UUID}
+import java.util.UUID
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -33,7 +33,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Expression, GenericInternalRow, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
@@ -59,16 +59,16 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICar
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil, ObjectSerializationUtil}
+import org.apache.carbondata.core.util._
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.events.exception.PreEventException
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.TableProcessingOperations
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -77,8 +77,8 @@ import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataPro
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, SerializableConfiguration}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
case class CarbonLoadDataCommand(
databaseNameOp: Option[String],
@@ -977,8 +977,11 @@ case class CarbonLoadDataCommand(
array
}
}
- val finalRDD = convertRDD.mapPartitionsWithIndex {case(index, rows) =>
+ val conf = sparkSession.sparkContext
+ .broadcast(new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()))
+ val finalRDD = convertRDD.mapPartitionsWithIndex { case(index, rows) =>
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
DataLoadProcessorStepOnSpark.inputAndconvertFunc(
rows,
index,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 8633243..b77632d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -41,13 +41,15 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.exception.MultipleMatchingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.spark.DeleteDelataResultImpl
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
object DeleteExecution {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
@@ -118,17 +120,18 @@ object DeleteExecution {
blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
keyRdd.partitions.length)
+ val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
+ .sessionState.newHadoopConf()))
+
val rdd = rowContRdd.join(keyRdd)
res = rdd.mapPartitionsWithIndex(
(index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
Iterator[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] {
-
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
var result = List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]()
while (records.hasNext) {
val ((key), (rowCountDetailsVO, groupedRows)) = records.next
val segmentId = key.substring(0, key.indexOf(CarbonCommonConstants.FILE_SEPARATOR))
- val loadDetail =
- metadataDetails.find(_.getLoadName.equals(segmentId)).get
result = result ++
deleteDeltaFunc(index,
key,
@@ -138,8 +141,7 @@ object DeleteExecution {
isStandardTable)
}
result
- }
- ).collect()
+ }).collect()
// if no loads are present then no need to do anything.
if (res.flatten.isEmpty) {
@@ -328,7 +330,7 @@ object DeleteExecution {
private def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
(CarbonTableInputFormat[Array[Object]], Job) = {
val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
- val jobConf: JobConf = new JobConf(new Configuration)
+ val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
val job: Job = new Job(jobConf)
FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
(carbonInputFormat, job)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 8c88d0e..66066ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -32,7 +32,10 @@ import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
+import org.apache.carbondata.core.util.{ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
object HorizontalCompaction {
@@ -188,8 +191,11 @@ object HorizontalCompaction {
val timestamp = factTimeStamp
val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails
+ val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
+ .sessionState.newHadoopConf()))
val result = rdd1.mapPartitions(iter =>
new Iterator[Seq[CarbonDataMergerUtilResult]] {
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
override def hasNext: Boolean = iter.hasNext
override def next(): Seq[CarbonDataMergerUtilResult] = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 45cacfa..1e987b0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -173,7 +173,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
.getValidAndInvalidSegments.getValidSegments
// First drop the partitions from partition mapper files of each segment
- val tuples = new CarbonDropPartitionRDD(sparkSession.sparkContext,
+ val tuples = new CarbonDropPartitionRDD(sparkSession,
table.getTablePath,
segments.asScala,
carbonPartitionsTobeDropped,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 7e11170..22ff5c4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -82,7 +82,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
carbonTable.getAbsoluteTableIdentifier,
sparkSession.sparkContext).process
// generate dictionary files for the newly added columns
- new AlterTableAddColumnRDD(sparkSession.sparkContext,
+ new AlterTableAddColumnRDD(sparkSession,
newCols,
carbonTable.getAbsoluteTableIdentifier).collect()
timeStamp = System.currentTimeMillis
@@ -110,7 +110,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
LOGGER.error(e, "Alter table add columns failed")
if (newCols.nonEmpty) {
LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
- new AlterTableDropColumnRDD(sparkSession.sparkContext,
+ new AlterTableDropColumnRDD(sparkSession,
newCols,
carbonTable.getAbsoluteTableIdentifier).collect()
AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 23dbf9e..1dbe28c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -149,7 +149,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
// TODO: 1. add check for deletion of index tables
// delete dictionary files for dictionary column and clear dictionary cache from memory
- new AlterTableDropColumnRDD(sparkSession.sparkContext,
+ new AlterTableDropColumnRDD(sparkSession,
dictionaryColumns,
carbonTable.getAbsoluteTableIdentifier).collect()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index c403d52..1beda11 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -49,7 +49,9 @@ case class CarbonCreateTableCommand(
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val tableName = tableInfo.getFactTable.getTableName
var databaseOpt : Option[String] = None
- if(tableInfo.getDatabaseName != null) {
+ ThreadLocalSessionInfo
+ .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
+ if (tableInfo.getDatabaseName != null) {
databaseOpt = Some(tableInfo.getDatabaseName)
}
val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 94b988c..f4fb90a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
/**
* Carbon strategies for ddl commands
@@ -91,6 +91,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
case e: NoSuchDatabaseException =>
CarbonProperties.getStorePath
}
+ ThreadLocalSessionInfo
+ .setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
FileUtils.createDatabaseDirectory(dbName, dbLocation, sparkSession.sparkContext)
ExecutedCommandExec(createDb) :: Nil
case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 80f781e..b6667df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
/**
@@ -39,7 +40,7 @@ object TableLoader {
def extractOptions(propertiesFile: String): immutable.Map[String, String] = {
val props = new Properties
val path = new Path(propertiesFile)
- val fs = path.getFileSystem(new Configuration())
+ val fs = path.getFileSystem(FileFactory.getConfiguration)
props.load(fs.open(path))
val elments = props.entrySet().iterator()
val map = new mutable.HashMap[String, String]()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index b0711ba..8e68ef3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -50,6 +50,8 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeConverter;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Executor class for executing the query on the selected segments to be merged.
* This will fire a select * query and get the raw result.
@@ -103,7 +105,8 @@ public class CarbonCompactionExecutor {
*
* @return List of Carbon iterators
*/
- public List<RawResultIterator> processTableBlocks() throws QueryExecutionException, IOException {
+ public List<RawResultIterator> processTableBlocks(Configuration configuration) throws
+ QueryExecutionException, IOException {
List<RawResultIterator> resultList =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<TableBlockInfo> list = null;
@@ -131,7 +134,8 @@ public class CarbonCompactionExecutor {
.size());
queryModel.setTableBlockInfos(list);
resultList.add(
- new RawResultIterator(executeBlockList(list, segmentId, task), sourceSegProperties,
+ new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
+ sourceSegProperties,
destinationSegProperties));
}
}
@@ -174,14 +178,14 @@ public class CarbonCompactionExecutor {
* @return
*/
private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList,
- String segmentId, String taskId)
+ String segmentId, String taskId, Configuration configuration)
throws QueryExecutionException, IOException {
queryModel.setTableBlockInfos(blockList);
QueryStatisticsRecorder executorRecorder = CarbonTimeStatisticsFactory
.createExecutorRecorder(queryModel.getQueryId() + "_" + segmentId + "_" + taskId);
queryStatisticsRecorders.add(executorRecorder);
queryModel.setStatisticsRecorder(executorRecorder);
- QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration);
queryExecutorList.add(queryExecutor);
return queryExecutor.execute(queryModel);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
index 01db4f6..dd5969f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
@@ -35,6 +35,8 @@ import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.hadoop.conf.Configuration;
+
public abstract class AbstractCarbonQueryExecutor {
private static final LogService LOGGER =
@@ -50,10 +52,11 @@ public abstract class AbstractCarbonQueryExecutor {
* @param blockList
* @return
*/
- CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList)
+ CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList,
+ Configuration configuration)
throws QueryExecutionException, IOException {
queryModel.setTableBlockInfos(blockList);
- this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration);
return queryExecutor.execute(queryModel);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
index daabd24..d32757c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
@@ -34,6 +34,8 @@ import org.apache.carbondata.core.scan.model.QueryModelBuilder;
import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
import org.apache.carbondata.core.util.DataTypeConverter;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Used to read carbon blocks when add/split partition
*/
@@ -48,7 +50,7 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
}
public List<PartitionSpliterRawResultIterator> processDataBlocks(
- String segmentId, DataTypeConverter converter)
+ String segmentId, DataTypeConverter converter, Configuration configuration)
throws QueryExecutionException, IOException {
List<TableBlockInfo> list = null;
queryModel = new QueryModelBuilder(carbonTable)
@@ -64,7 +66,7 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
list = taskBlockInfo.getTableBlockInfoList(task);
LOGGER.info("for task -" + task + "-block size is -" + list.size());
queryModel.setTableBlockInfos(list);
- resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list)));
+ resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list, configuration)));
}
return resultList;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index e3dabb1..4859dd2 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -29,6 +29,8 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.util.CarbonSessionInfo;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
import org.apache.hadoop.conf.Configuration;
@@ -60,6 +62,7 @@ public class CarbonReaderBuilder {
CarbonReaderBuilder(String tablePath, String tableName) {
this.tablePath = tablePath;
this.tableName = tableName;
+ ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index d41e3d0..065b4a9 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -44,6 +44,8 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonSessionInfo;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.ThriftWriter;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -70,6 +72,10 @@ public class CarbonWriterBuilder {
private int localDictionaryThreshold;
private boolean isLocalDictionaryEnabled;
+ public CarbonWriterBuilder() {
+ ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
+ }
+
/**
* Sets the output path of the writer builder
* @param path is the absolute path where output files are written
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
index 73742b0..b6e7ad5 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
@@ -23,6 +23,7 @@ import java.util.Random;
import java.util.UUID;
import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -47,7 +48,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
private ObjectArrayWritable writable;
JsonCarbonWriter(CarbonLoadModel loadModel) throws IOException {
- Configuration OutputHadoopConf = new Configuration();
+ Configuration OutputHadoopConf = FileFactory.getConfiguration();
CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel);
CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat();
JobID jobId = new JobID(UUID.randomUUID().toString(), 0);