You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/08/28 09:00:33 UTC
[3/3] carbondata git commit: [CARBONDATA-2844] [CARBONDATA-2865] Pass
SK/AK to executor by serializing hadoop configuration from driver.
[CARBONDATA-2844] [CARBONDATA-2865] Pass SK/AK to executor by serializing hadoop configuration from driver.
add SK/AK to thread local so that on each query new SK/AK can be passed to FileFactory
Refactor FileFactory to accept configuration from thread local.
Fixed compatibility issue from 1.3.x to 1.5.x [CARBONDATA-2865].
This closes #2623
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2a9604cd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2a9604cd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2a9604cd
Branch: refs/heads/master
Commit: 2a9604cd840dc1a552afcff23059ac9bf624e161
Parents: 1fb1f19
Author: kunal642 <ku...@gmail.com>
Authored: Wed Aug 8 21:50:44 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Aug 28 14:30:19 2018 +0530
----------------------------------------------------------------------
.../core/datastore/impl/DFSFileReaderImpl.java | 8 ++-
.../datastore/impl/DefaultFileTypeProvider.java | 4 +-
.../core/datastore/impl/FileFactory.java | 27 +++++++---
.../core/datastore/impl/FileTypeInterface.java | 2 +-
.../scan/executor/QueryExecutorFactory.java | 12 +++--
.../executor/impl/AbstractQueryExecutor.java | 5 +-
.../scan/executor/impl/DetailQueryExecutor.java | 6 +++
.../impl/SearchModeDetailQueryExecutor.java | 4 +-
.../SearchModeVectorDetailQueryExecutor.java | 5 +-
.../impl/VectorDetailQueryExecutor.java | 6 +++
.../carbondata/core/util/CarbonProperties.java | 6 +--
.../apache/carbondata/core/util/CarbonUtil.java | 6 +--
.../carbondata/core/util/SessionParams.java | 5 +-
.../core/util/ThreadLocalSessionInfo.java | 20 +++++++
.../store/impl/DFSFileReaderImplUnitTest.java | 3 +-
.../datamap/lucene/LuceneDataMapWriter.java | 2 +-
.../carbondata/hadoop/CarbonRecordReader.java | 10 ++--
.../hadoop/api/CarbonInputFormat.java | 3 +-
.../hadoop/api/CarbonTableInputFormat.java | 1 -
.../hadoop/api/CarbonTableOutputFormat.java | 7 ++-
.../hadoop/util/CarbonInputFormatUtil.java | 17 ------
.../carbondata/hive/CarbonHiveRecordReader.java | 2 +-
.../presto/CarbondataPageSourceProvider.java | 3 +-
.../PrestoCarbonVectorizedRecordReader.java | 3 +-
.../presto/impl/CarbonTableReader.java | 2 +-
.../carbondata/presto/server/PrestoServer.scala | 1 +
...eneFineGrainDataMapWithSearchModeSuite.scala | 3 +-
.../createTable/TestCreateTableAsSelect.scala | 10 ++--
.../carbondata/spark/load/CsvRDDHelper.scala | 9 ++--
.../load/DataLoadProcessBuilderOnSpark.scala | 8 ++-
.../load/DataLoadProcessorStepOnSpark.scala | 10 ++--
.../spark/rdd/AlterTableAddColumnRDD.scala | 9 ++--
.../spark/rdd/AlterTableDropColumnRDD.scala | 11 ++--
.../spark/rdd/AlterTableLoadPartitionRDD.scala | 7 +--
.../spark/rdd/CarbonDropPartitionRDD.scala | 11 ++--
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 28 +++++-----
.../spark/rdd/CarbonIUDMergerRDD.scala | 15 +++---
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 15 +++---
.../apache/carbondata/spark/rdd/CarbonRDD.scala | 55 +++++++++-----------
.../spark/rdd/CarbonScanPartitionRDD.scala | 15 +++---
.../carbondata/spark/rdd/CarbonScanRDD.scala | 15 +++---
.../spark/rdd/NewCarbonDataLoadRDD.scala | 53 ++++++-------------
.../carbondata/spark/rdd/SparkDataMapJob.scala | 21 ++++----
.../carbondata/spark/rdd/StreamHandoffRDD.scala | 14 +++--
.../carbondata/spark/util/CommonUtil.scala | 10 ++--
.../spark/util/GlobalDictionaryUtil.scala | 20 ++++---
.../apache/spark/rdd/CarbonMergeFilesRDD.scala | 11 ++--
.../apache/spark/rdd/DataLoadCoalescedRDD.scala | 7 +--
.../command/carbonTableSchemaCommon.scala | 2 +-
.../apache/spark/sql/util/SparkSQLUtil.scala | 4 ++
.../org/apache/spark/util/PartitionUtils.scala | 2 +-
.../VectorizedCarbonRecordReader.java | 3 +-
.../datasources/SparkCarbonFileFormat.scala | 3 +-
.../datamap/IndexDataMapRebuildRDD.scala | 16 +++---
.../spark/rdd/CarbonDataRDDFactory.scala | 26 +++++----
.../spark/rdd/CarbonTableCompactor.scala | 4 +-
.../org/apache/spark/sql/CarbonCountStar.scala | 6 ++-
.../sql/CarbonDatasourceHadoopRelation.scala | 1 -
.../spark/sql/CarbonDictionaryDecoder.scala | 15 ++++--
.../scala/org/apache/spark/sql/CarbonEnv.scala | 6 +--
.../org/apache/spark/sql/CarbonSession.scala | 3 +-
.../org/apache/spark/sql/CarbonSource.scala | 1 -
.../sql/events/MergeIndexEventListener.scala | 14 ++---
.../management/CarbonInsertIntoCommand.scala | 7 +--
.../management/CarbonLoadDataCommand.scala | 19 ++++---
.../command/mutation/DeleteExecution.scala | 16 +++---
.../command/mutation/HorizontalCompaction.scala | 6 +++
...rbonAlterTableDropHivePartitionCommand.scala | 2 +-
.../CarbonAlterTableAddColumnCommand.scala | 4 +-
.../CarbonAlterTableDropColumnCommand.scala | 2 +-
.../table/CarbonCreateTableCommand.scala | 6 ++-
.../sql/execution/strategy/DDLStrategy.scala | 4 +-
.../org/apache/spark/util/TableLoader.scala | 3 +-
.../merger/CarbonCompactionExecutor.java | 12 +++--
.../spliter/AbstractCarbonQueryExecutor.java | 7 ++-
.../partition/spliter/CarbonSplitExecutor.java | 6 ++-
.../sdk/file/CarbonReaderBuilder.java | 3 ++
.../sdk/file/CarbonWriterBuilder.java | 6 +++
.../carbondata/sdk/file/JsonCarbonWriter.java | 3 +-
.../store/worker/SearchRequestHandler.java | 3 +-
80 files changed, 428 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
index 1a0cd41..e86fa12 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileReaderImpl.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,7 +38,10 @@ public class DFSFileReaderImpl implements FileReader {
private boolean readPageByPage;
- public DFSFileReaderImpl() {
+ private Configuration configuration;
+
+ public DFSFileReaderImpl(Configuration configuration) {
+ this.configuration = configuration;
this.fileNameAndStreamCache =
new HashMap<String, FSDataInputStream>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
}
@@ -60,7 +64,7 @@ public class DFSFileReaderImpl implements FileReader {
FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
if (null == fileChannel) {
Path pt = new Path(filePath);
- FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+ FileSystem fs = pt.getFileSystem(configuration);
fileChannel = fs.open(pt);
fileNameAndStreamCache.put(filePath, fileChannel);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
index c4761c9..937b5b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration;
public class DefaultFileTypeProvider implements FileTypeInterface {
- public FileReader getFileHolder(FileFactory.FileType fileType) {
+ public FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration) {
switch (fileType) {
case LOCAL:
return new FileReaderImpl();
@@ -37,7 +37,7 @@ public class DefaultFileTypeProvider implements FileTypeInterface {
case ALLUXIO:
case VIEWFS:
case S3:
- return new DFSFileReaderImpl();
+ return new DFSFileReaderImpl(configuration);
default:
return new FileReaderImpl();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index e353623..b07d11b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
@@ -59,11 +60,23 @@ public final class FileFactory {
}
public static Configuration getConfiguration() {
- return configuration;
+ Configuration conf;
+ Object confObject = ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo()
+ .getNonSerializableExtraInfo().get("carbonConf");
+ if (confObject == null) {
+ conf = configuration;
+ } else {
+ conf = (Configuration) confObject;
+ }
+ return conf;
}
public static FileReader getFileHolder(FileType fileType) {
- return fileFileTypeInterface.getFileHolder(fileType);
+ return fileFileTypeInterface.getFileHolder(fileType, getConfiguration());
+ }
+
+ public static FileReader getFileHolder(FileType fileType, Configuration configuration) {
+ return fileFileTypeInterface.getFileHolder(fileType, configuration);
}
public static FileType getFileType(String path) {
@@ -100,7 +113,7 @@ public final class FileFactory {
public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
throws IOException {
- return getDataInputStream(path, fileType, bufferSize, configuration);
+ return getDataInputStream(path, fileType, bufferSize, getConfiguration());
}
public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize,
Configuration configuration) throws IOException {
@@ -306,7 +319,7 @@ public final class FileFactory {
// this method was new in hadoop 2.7, otherwise use CarbonFile.truncate to do this.
try {
Path pt = new Path(path);
- FileSystem fs = pt.getFileSystem(configuration);
+ FileSystem fs = pt.getFileSystem(getConfiguration());
Method truncateMethod = fs.getClass().getDeclaredMethod("truncate",
new Class[]{Path.class, long.class});
truncateMethod.invoke(fs, new Object[]{pt, newSize});
@@ -414,7 +427,7 @@ public final class FileFactory {
case VIEWFS:
case S3:
Path path = new Path(filePath);
- FileSystem fs = path.getFileSystem(configuration);
+ FileSystem fs = path.getFileSystem(getConfiguration());
return fs.getContentSummary(path).getLength();
case LOCAL:
default:
@@ -442,7 +455,7 @@ public final class FileFactory {
* @throws IOException
*/
public static FileSystem getFileSystem(Path path) throws IOException {
- return path.getFileSystem(configuration);
+ return path.getFileSystem(getConfiguration());
}
@@ -455,7 +468,7 @@ public final class FileFactory {
case VIEWFS:
try {
Path path = new Path(directoryPath);
- FileSystem fs = path.getFileSystem(FileFactory.configuration);
+ FileSystem fs = path.getFileSystem(getConfiguration());
if (!fs.exists(path)) {
fs.mkdirs(path);
fs.setPermission(path, permission);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
index 358d2ef..8b0fcc4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
public interface FileTypeInterface {
- FileReader getFileHolder(FileFactory.FileType fileType);
+ FileReader getFileHolder(FileFactory.FileType fileType, Configuration configuration);
CarbonFile getCarbonFile(String path, FileFactory.FileType fileType);
CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
index b790f1c..2a9c7f4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
@@ -23,24 +23,26 @@ import org.apache.carbondata.core.scan.executor.impl.VectorDetailQueryExecutor;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Factory class to get the query executor from RDD
* This will return the executor based on query type
*/
public class QueryExecutorFactory {
- public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
+ public static QueryExecutor getQueryExecutor(QueryModel queryModel, Configuration configuration) {
if (CarbonProperties.isSearchModeEnabled()) {
if (queryModel.isVectorReader()) {
- return new SearchModeVectorDetailQueryExecutor();
+ return new SearchModeVectorDetailQueryExecutor(configuration);
} else {
- return new SearchModeDetailQueryExecutor();
+ return new SearchModeDetailQueryExecutor(configuration);
}
} else {
if (queryModel.isVectorReader()) {
- return new VectorDetailQueryExecutor();
+ return new VectorDetailQueryExecutor(configuration);
} else {
- return new DetailQueryExecutor();
+ return new DetailQueryExecutor(configuration);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 259889b..ece2f8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -68,10 +68,12 @@ import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
/**
* This class provides a skeletal implementation of the {@link QueryExecutor}
@@ -96,7 +98,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
*/
protected CarbonIterator queryIterator;
- public AbstractQueryExecutor() {
+ public AbstractQueryExecutor(Configuration configuration) {
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
queryProperties = new QueryExecutorProperties();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
index 46ef43d..e11c576 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/DetailQueryExecutor.java
@@ -27,6 +27,8 @@ import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Below class will be used to execute the detail query
* For executing the detail query it will pass all the block execution
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator
*/
public class DetailQueryExecutor extends AbstractQueryExecutor<RowBatch> {
+ public DetailQueryExecutor(Configuration configuration) {
+ super(configuration);
+ }
+
@Override
public CarbonIterator<RowBatch> execute(QueryModel queryModel)
throws QueryExecutionException, IOException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
index ae14327..6d03540 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
@@ -31,13 +31,15 @@ import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.hadoop.conf.Configuration;
public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> {
private static final LogService LOGGER =
LogServiceFactory.getLogService(SearchModeDetailQueryExecutor.class.getName());
private static ExecutorService executorService = null;
- public SearchModeDetailQueryExecutor() {
+ public SearchModeDetailQueryExecutor(Configuration configuration) {
+ super(configuration);
if (executorService == null) {
initThreadPool();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
index 705c451..418ef42 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
@@ -32,6 +32,8 @@ import org.apache.carbondata.core.util.CarbonProperties;
import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Below class will be used to execute the detail query and returns columnar vectors.
*/
@@ -40,7 +42,8 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O
LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
private static ExecutorService executorService = null;
- public SearchModeVectorDetailQueryExecutor() {
+ public SearchModeVectorDetailQueryExecutor(Configuration configuration) {
+ super(configuration);
if (executorService == null) {
initThreadPool();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
index 7787e4c..46397c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/VectorDetailQueryExecutor.java
@@ -26,11 +26,17 @@ import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.result.iterator.VectorDetailQueryResultIterator;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Below class will be used to execute the detail query and returns columnar vectors.
*/
public class VectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
+ public VectorDetailQueryExecutor(Configuration configuration) {
+ super(configuration);
+ }
+
@Override
public CarbonIterator<Object> execute(QueryModel queryModel)
throws QueryExecutionException, IOException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index c3a4934..58fef17 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -23,11 +23,11 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.text.SimpleDateFormat;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -94,7 +94,7 @@ public final class CarbonProperties {
/**
* It is purely for testing
*/
- private Map<String, String> addedProperty = new HashMap<>();
+ private Map<String, String> addedProperty = new ConcurrentHashMap<>();
/**
* Private constructor this will call load properties method to load all the
@@ -407,7 +407,7 @@ public final class CarbonProperties {
* @param lockTypeConfigured
*/
private void validateAndConfigureLockType(String lockTypeConfigured) {
- Configuration configuration = new Configuration(true);
+ Configuration configuration = FileFactory.getConfiguration();
String defaultFs = configuration.get("fs.defaultFS");
if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
|| defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 9aaa58c..c5e2e8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -134,8 +134,6 @@ public final class CarbonUtil {
*/
private static final int CONST_HUNDRED = 100;
- private static final Configuration conf = new Configuration(true);
-
/**
* dfs.bytes-per-checksum
* HDFS checksum length, block size for a file should be exactly divisible
@@ -662,7 +660,7 @@ public final class CarbonUtil {
*/
public static String checkAndAppendHDFSUrl(String filePath) {
String currentPath = filePath;
- String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
+ String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS);
String baseDFSUrl = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL, "");
if (checkIfPrefixExists(filePath)) {
@@ -699,7 +697,7 @@ public final class CarbonUtil {
filePath = "/" + filePath;
}
currentPath = filePath;
- String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
+ String defaultFsUrl = FileFactory.getConfiguration().get(CarbonCommonConstants.FS_DEFAULT_FS);
if (defaultFsUrl == null) {
return currentPath;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 169c003..51b157f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.util;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.constants.LoggerAction;
import org.apache.carbondata.common.logging.LogService;
@@ -57,12 +58,12 @@ public class SessionParams implements Serializable, Cloneable {
private static final long serialVersionUID = -7801994600594915264L;
private Map<String, String> sProps;
- private Map<String, String> addedProps;
+ private ConcurrentHashMap<String, String> addedProps;
// below field to be used when we want the objects to be serialized
private Map<String, Object> extraInfo;
public SessionParams() {
sProps = new HashMap<>();
- addedProps = new HashMap<>();
+ addedProps = new ConcurrentHashMap<>();
extraInfo = new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
index df525bc..f85a350 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.util;
+import org.apache.hadoop.conf.Configuration;
+
/**
* This class maintains ThreadLocal session params
*/
@@ -31,4 +33,22 @@ public class ThreadLocalSessionInfo {
public static CarbonSessionInfo getCarbonSessionInfo() {
return threadLocal.get();
}
+
+ public static synchronized CarbonSessionInfo getOrCreateCarbonSessionInfo() {
+ CarbonSessionInfo info = threadLocal.get();
+ if (info == null || info.getSessionParams() == null) {
+ info = new CarbonSessionInfo();
+ info.setSessionParams(new SessionParams());
+ threadLocal.set(info);
+ }
+ return info;
+ }
+
+ public static void setConfigurationToCurrentThread(Configuration configuration) {
+ getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo().put("carbonConf", configuration);
+ }
+
+ public static void unsetAll() {
+ threadLocal.remove();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
index 30144c1..5033713 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.impl.DFSFileReaderImpl;
import mockit.Mock;
import mockit.MockUp;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,7 +46,7 @@ public class DFSFileReaderImplUnitTest {
private static File fileWithEmptyContent;
@BeforeClass public static void setup() {
- dfsFileHolder = new DFSFileReaderImpl();
+ dfsFileHolder = new DFSFileReaderImpl(new Configuration());
file = new File("Test.carbondata");
fileWithEmptyContent = new File("TestEXception.carbondata");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 605ec89..bdb17ed 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -165,7 +165,7 @@ public class LuceneDataMapWriter extends DataMapWriter {
// the indexWriter closes the FileSystem on closing the writer, so for a new configuration
// and disable the cache for the index writer, it will be closed on closing the writer
- Configuration conf = new Configuration();
+ Configuration conf = FileFactory.getConfiguration();
conf.set("fs.hdfs.impl.disable.cache", "true");
// create a index writer
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index a54e7a4..0d38906 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -57,15 +58,16 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
private boolean skipClearDataMapAtClose = false;
public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
- InputMetricsStats inputMetricsStats) {
- this(queryModel, readSupport);
+ InputMetricsStats inputMetricsStats, Configuration configuration) {
+ this(queryModel, readSupport, configuration);
this.inputMetricsStats = inputMetricsStats;
}
- public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) {
+ public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
+ Configuration configuration) {
this.queryModel = queryModel;
this.readSupport = readSupport;
- this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel, configuration);
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 21ef6cf..3ebd6d6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -583,7 +583,8 @@ m filterExpression
Configuration configuration = taskAttemptContext.getConfiguration();
QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
- return new CarbonRecordReader<T>(queryModel, readSupport);
+ return new CarbonRecordReader<T>(queryModel, readSupport,
+ taskAttemptContext.getConfiguration());
}
public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index af2cf83..ec201b9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -132,7 +132,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
-
CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 5938c20..5cc275b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
import org.apache.carbondata.processing.loading.DataLoadExecutor;
import org.apache.carbondata.processing.loading.TableProcessingOperations;
@@ -231,7 +232,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
@Override
public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
- TaskAttemptContext taskAttemptContext) throws IOException {
+ final TaskAttemptContext taskAttemptContext) throws IOException {
final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
//if loadModel having taskNo already(like in SDK) then no need to overwrite
short sdkUserCore = loadModel.getSdkUserCores();
@@ -249,10 +250,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext);
final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor();
final ExecutorService executorService = Executors.newFixedThreadPool(1,
- new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));;
+ new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));
// It should be started in new thread as the underlying iterator uses blocking queue.
Future future = executorService.submit(new Thread() {
@Override public void run() {
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(taskAttemptContext
+ .getConfiguration());
try {
dataLoadExecutor
.execute(loadModel, tempStoreLocations, iterators);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index af7397b..7641427 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
import org.apache.carbondata.core.datamap.DataMapJob;
import org.apache.carbondata.core.datamap.DataMapUtil;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -177,20 +176,4 @@ public class CarbonInputFormatUtil {
return new JobID(jobtrackerID, batch);
}
- public static void setS3Configurations(Configuration hadoopConf) {
- FileFactory.getConfiguration()
- .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", ""));
- FileFactory.getConfiguration()
- .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", ""));
- FileFactory.getConfiguration()
- .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", ""));
- FileFactory.getConfiguration().set(CarbonCommonConstants.S3_ACCESS_KEY,
- hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, ""));
- FileFactory.getConfiguration().set(CarbonCommonConstants.S3_SECRET_KEY,
- hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, ""));
- FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_ACCESS_KEY,
- hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, ""));
- FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_SECRET_KEY,
- hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, ""));
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
index 57bcca3..4ed2b91 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -63,7 +63,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport,
InputSplit inputSplit, JobConf jobConf) throws IOException {
- super(queryModel, readSupport);
+ super(queryModel, readSupport, jobConf);
initialize(inputSplit, jobConf);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index 5b15b22..3ec815d 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -100,7 +100,8 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
checkArgument(carbondataSplit.getConnectorId().equals(connectorId),
"split is not for this connector");
QueryModel queryModel = createQueryModel(carbondataSplit, columns);
- QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ QueryExecutor queryExecutor =
+ QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration());
try {
CarbonIterator iterator = queryExecutor.execute(queryModel);
readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
index 32e163a..9935b54 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java
@@ -114,7 +114,8 @@ class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
queryModel.setTableBlockInfos(tableBlockInfoList);
queryModel.setVectorReader(true);
try {
- queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ queryExecutor =
+ QueryExecutorFactory.getQueryExecutor(queryModel, taskAttemptContext.getConfiguration());
iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
} catch (QueryExecutionException e) {
throw new InterruptedException(e.getMessage());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 7916932..5a1e140 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -414,7 +414,7 @@ public class CarbonTableReader {
List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
CarbonTable carbonTable = tableCacheModel.carbonTable;
TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo();
- Configuration config = new Configuration();
+ Configuration config = FileFactory.getConfiguration();
config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
index 127c4c9..2f3b8f4 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
@@ -32,6 +32,7 @@ import com.facebook.presto.tests.DistributedQueryRunner
import com.google.common.collect.ImmutableMap
import org.slf4j.{Logger, LoggerFactory}
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.presto.CarbondataPlugin
object PrestoServer {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
index 0ac6e72..6cbe747 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
@@ -41,8 +41,7 @@ class LuceneFineGrainDataMapWithSearchModeSuite extends QueryTest with BeforeAnd
val n = 500000
sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
CarbonProperties
- .getInstance()
- .addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s")
+ .getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s")
LuceneFineGrainDataMapSuite.createFile(file2, n)
sql("create database if not exists lucene")
sql("use lucene")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
index 062e5ba..c95e5a4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
@@ -146,13 +146,13 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
}
test("test create table as select with TBLPROPERTIES") {
- sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
+ sql("DROP TABLE IF EXISTS ctas_tblproperties_testt")
sql(
- "create table ctas_tblproperties_test stored by 'carbondata' TBLPROPERTIES" +
+ "create table ctas_tblproperties_testt stored by 'carbondata' TBLPROPERTIES" +
"('DICTIONARY_INCLUDE'='key', 'sort_scope'='global_sort') as select * from carbon_ctas_test")
- checkAnswer(sql("select * from ctas_tblproperties_test"), sql("select * from carbon_ctas_test"))
+ checkAnswer(sql("select * from ctas_tblproperties_testt"), sql("select * from carbon_ctas_test"))
val carbonTable = CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore
- .lookupRelation(Option("default"), "ctas_tblproperties_test")(Spark2TestQueryExecutor.spark)
+ .lookupRelation(Option("default"), "ctas_tblproperties_testt")(Spark2TestQueryExecutor.spark)
.asInstanceOf[CarbonRelation].carbonTable
val metadataFolderPath: CarbonFile = FileFactory.getCarbonFile(carbonTable.getMetadataPath)
assert(metadataFolderPath.exists())
@@ -419,7 +419,7 @@ class TestCreateTableAsSelect extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS ctas_select_where_carbon")
sql("DROP TABLE IF EXISTS ctas_select_where_parquet")
sql("DROP TABLE IF EXISTS ctas_select_where_orc")
- sql("DROP TABLE IF EXISTS ctas_tblproperties_test")
+ sql("DROP TABLE IF EXISTS ctas_tblproperties_testt")
sql("DROP TABLE IF EXISTS ctas_if_table_name")
sql("DROP TABLE IF EXISTS source_table")
sql("DROP TABLE IF EXISTS target_table")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index 36d8c51..8d6dd32 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -37,6 +37,8 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
import org.apache.spark.sql.util.SparkSQLUtil.sessionState
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.rdd.SerializableConfiguration
@@ -108,17 +110,18 @@ object CsvRDDHelper {
closePartition()
// 2. read function
- val serializableConfiguration = new SerializableConfiguration(jobConf)
+ val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
override def apply(file: PartitionedFile): Iterator[InternalRow] = {
new Iterator[InternalRow] {
- val hadoopConf = serializableConfiguration.value
val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
formatter.format(new Date())
}
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(serializableConfiguration.value)
val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
- val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration,
+ attemptId)
val inputSplit =
new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
var finished = false
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index dc238fb..be40b13 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -29,11 +29,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
/**
* Use sortBy operator in spark to load the data
@@ -64,6 +66,7 @@ object DataLoadProcessBuilderOnSpark {
val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
+ val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// 1. Input
val inputRDD = originRDD
.mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
@@ -73,6 +76,7 @@ object DataLoadProcessBuilderOnSpark {
// 2. Convert
val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
convertStepRowCounter)
}.filter(_ != null)// Filter the bad record
@@ -116,7 +120,7 @@ object DataLoadProcessBuilderOnSpark {
// 4. Write
sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
- writeStepRowCounter))
+ writeStepRowCounter, conf))
// clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
// not have any functional impact as spark automatically monitors the cache usage on each node
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 73ed769..f17bd91 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.load
import scala.util.Random
import com.univocity.parsers.common.TextParsingException
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.Row
@@ -29,7 +30,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -40,7 +42,7 @@ import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImp
import org.apache.carbondata.processing.sort.sortdata.SortParameters
import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
-import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
+import org.apache.carbondata.spark.rdd.{NewRddIterator, SerializableConfiguration, StringArrayRow}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
object DataLoadProcessorStepOnSpark {
@@ -227,7 +229,9 @@ object DataLoadProcessorStepOnSpark {
rows: Iterator[CarbonRow],
index: Int,
modelBroadcast: Broadcast[CarbonLoadModel],
- rowCounter: Accumulator[Int]) {
+ rowCounter: Accumulator[Int],
+ conf: Broadcast[SerializableConfiguration]) {
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
var model: CarbonLoadModel = null
var tableName: String = null
var rowConverter: RowConverterImpl = null
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 7c1edea..f7aa623 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -17,7 +17,8 @@
package org.apache.carbondata.spark.rdd
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -47,15 +48,15 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par
/**
* This class is aimed at generating dictionary file for the newly added columns
*/
-class AlterTableAddColumnRDD[K, V](sc: SparkContext,
+class AlterTableAddColumnRDD[K, V](@transient sparkSession: SparkSession,
@transient newColumns: Seq[ColumnSchema],
identifier: AbsoluteTableIdentifier)
- extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
+ extends CarbonRDD[(Int, SegmentStatus)](sparkSession, Nil) {
val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
newColumns.zipWithIndex.map { column =>
new AddColumnPartition(id, column._2, column._1)
}.toArray
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index 0dbb4f0..a0d06b8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -17,11 +17,12 @@
package org.apache.carbondata.spark.rdd
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.statusmanager.SegmentStatus
@@ -44,12 +45,12 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa
/**
* This class is aimed at generating dictionary file for the newly added columns
*/
-class AlterTableDropColumnRDD[K, V](sc: SparkContext,
+class AlterTableDropColumnRDD[K, V](@transient ss: SparkSession,
@transient newColumns: Seq[ColumnSchema],
carbonTableIdentifier: AbsoluteTableIdentifier)
- extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
+ extends CarbonRDD[(Int, SegmentStatus)](ss, Nil) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
newColumns.zipWithIndex.map { column =>
new DropColumnPartition(id, column._2, column._1)
}.toArray
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 85a6f41..86a5043 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -39,7 +39,8 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
partitionIds: Seq[String],
bucketId: Int,
identifier: AbsoluteTableIdentifier,
- prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
+ prev: RDD[Array[AnyRef]])
+ extends CarbonRDD[(K, V)](alterPartitionModel.sqlContext.sparkSession, prev) {
var storeLocation: String = null
val carbonLoadModel = alterPartitionModel.carbonLoadModel
@@ -50,14 +51,14 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
val factTableName = carbonTable.getTableName
val partitionInfo = carbonTable.getPartitionInfo(factTableName)
- override protected def getPartitions: Array[Partition] = {
+ override protected def internalGetPartitions: Array[Partition] = {
val sc = alterPartitionModel.sqlContext.sparkContext
sc.setLocalProperty("spark.scheduler.pool", "DDL")
sc.setLocalProperty("spark.job.interruptOnCancel", "true")
firstParent[Array[AnyRef]].partitions
}
- override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+ override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
val iter = new Iterator[(K, V)] {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index d56e1c2..e2d1eff 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -21,7 +21,8 @@ import java.util
import scala.collection.JavaConverters._
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.indexstore.PartitionSpec
@@ -37,19 +38,19 @@ case class CarbonDropPartition(rddId: Int, val idx: Int, segment: Segment)
/**
* RDD to drop the partitions from segment files of all segments.
- * @param sc
+ * @param ss
* @param tablePath
* @param segments segments to be cleaned
*/
class CarbonDropPartitionRDD(
- sc: SparkContext,
+ @transient ss: SparkSession,
tablePath: String,
segments: Seq[Segment],
partitions: util.List[PartitionSpec],
uniqueId: String)
- extends CarbonRDD[(String, String)](sc, Nil, sc.hadoopConfiguration) {
+ extends CarbonRDD[(String, String)](ss, Nil) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
segments.zipWithIndex.map {s =>
CarbonDropPartition(id, s._2, s._1)
}.toArray
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 2ec8b9c..9265c7f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -31,7 +31,7 @@ import com.univocity.parsers.common.TextParsingException
import org.apache.commons.lang3.{ArrayUtils, StringUtils}
import org.apache.spark._
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -174,11 +174,12 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
* @param model a model package load info
*/
class CarbonAllDictionaryCombineRDD(
+ @transient sparkSession: SparkSession,
prev: RDD[(String, Iterable[String])],
model: DictionaryLoadModel)
- extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
+ extends CarbonRDD[(Int, ColumnDistinctValues)](sparkSession, prev) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
firstParent[(String, Iterable[String])].partitions
}
@@ -267,11 +268,12 @@ class StringArrayRow(var values: Array[String]) extends Row {
* @param model a model package load info
*/
class CarbonBlockDistinctValuesCombineRDD(
+ @transient ss: SparkSession,
prev: RDD[Row],
model: DictionaryLoadModel)
- extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
+ extends CarbonRDD[(Int, ColumnDistinctValues)](ss, prev) {
- override def getPartitions: Array[Partition] = firstParent[Row].partitions
+ override def internalGetPartitions: Array[Partition] = firstParent[Row].partitions
override def internalCompute(split: Partition,
context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -325,11 +327,14 @@ class CarbonBlockDistinctValuesCombineRDD(
* @param model a model package load info
*/
class CarbonGlobalDictionaryGenerateRDD(
+ @transient sparkSession: SparkSession,
prev: RDD[(Int, ColumnDistinctValues)],
model: DictionaryLoadModel)
- extends CarbonRDD[(Int, SegmentStatus)](prev) {
+ extends CarbonRDD[(Int, SegmentStatus)](sparkSession, prev) {
+
+ override def internalGetPartitions: Array[Partition] =
+ firstParent[(Int, ColumnDistinctValues)].partitions
- override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
override def internalCompute(split: Partition,
context: TaskContext): Iterator[(Int, SegmentStatus)] = {
@@ -492,21 +497,20 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
* Use external column dict to generate global dictionary
*
* @param carbonLoadModel carbon load model
- * @param sparkContext spark context
+ * @param sparkSession spark context
* @param table carbon table identifier
* @param dimensions carbon dimenisons having predefined dict
* @param dictFolderPath path of dictionary folder
*/
class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
dictionaryLoadModel: DictionaryLoadModel,
- sparkContext: SparkContext,
+ @transient ss: SparkSession,
table: CarbonTableIdentifier,
dimensions: Array[CarbonDimension],
dictFolderPath: String)
- extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil,
- sparkContext.hadoopConfiguration) {
+ extends CarbonRDD[(Int, ColumnDistinctValues)](ss, Nil) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
val primDimensions = dictionaryLoadModel.primDimensions
val primDimLength = primDimensions.length
val result = new Array[Partition](primDimLength)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 3aaf0ae..762b920 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -24,14 +24,15 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.Partition
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.CarbonMergerMapping
-import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.api.CarbonInputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -41,23 +42,23 @@ import org.apache.carbondata.spark.MergeResult
* IUD carbon merger RDD
* */
class CarbonIUDMergerRDD[K, V](
- sc: SparkContext,
+ @transient ss: SparkSession,
result: MergeResult[K, V],
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping,
confExecutorsTemp: String)
- extends CarbonMergerRDD[K, V](sc,
+ extends CarbonMergerRDD[K, V](ss,
result,
carbonLoadModel,
carbonMergerMapping,
confExecutorsTemp) {
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
- val jobConf: JobConf = new JobConf(new Configuration)
+ val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
SparkHadoopUtil.get.addCredentials(jobConf)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index f9f65a7..a0425b7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block._
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
@@ -60,15 +61,15 @@ import org.apache.carbondata.spark.MergeResult
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
class CarbonMergerRDD[K, V](
- sc: SparkContext,
+ @transient ss: SparkSession,
result: MergeResult[K, V],
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping,
confExecutorsTemp: String)
- extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
+ extends CarbonRDD[(K, V)](ss, Nil) {
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
- sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+ ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
+ ss.sparkContext.setLocalProperty("spark.job.interruptOnCancel", "true")
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
var storeLocation: String = null
@@ -183,7 +184,7 @@ class CarbonMergerRDD[K, V](
}
try {
// fire a query and get the results.
- rawResultIteratorList = exec.processTableBlocks()
+ rawResultIteratorList = exec.processTableBlocks(FileFactory.getConfiguration)
} catch {
case e: Throwable =>
LOGGER.error(e)
@@ -269,7 +270,7 @@ class CarbonMergerRDD[K, V](
iter
}
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
@@ -277,7 +278,7 @@ class CarbonMergerRDD[K, V](
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
carbonTable)
- val jobConf: JobConf = new JobConf(new Configuration)
+ val jobConf: JobConf = new JobConf(getConf)
SparkHadoopUtil.get.addCredentials(jobConf)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 54a7530..04f20b1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -17,28 +17,24 @@
package org.apache.carbondata.spark.rdd
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
-
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.util.SparkSQLUtil
-import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util._
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
/**
* This RDD maintains session level ThreadLocal
*/
-abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
- @transient private var deps: Seq[Dependency[_]],
- @transient hadoopConf: Configuration) extends RDD[T](sc, deps) {
+abstract class CarbonRDD[T: ClassTag](@transient ss: SparkSession,
+ @transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) {
val carbonSessionInfo: CarbonSessionInfo = {
var info = ThreadLocalSessionInfo.getCarbonSessionInfo
@@ -50,24 +46,27 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
info
}
- private val confBytes = {
- val bao = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(bao)
- hadoopConf.write(oos)
- oos.close()
- CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
- }
+ val config: Broadcast[SerializableConfiguration] = sparkContext
+ .broadcast(new SerializableConfiguration(SparkSQLUtil.sessionState(ss).newHadoopConf()))
/** Construct an RDD with just a one-to-one dependency on one parent */
- def this(@transient oneParent: RDD[_]) =
- this (oneParent.context, List(new OneToOneDependency(oneParent)),
- oneParent.sparkContext.hadoopConfiguration)
+ def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) =
+ this (sparkSession, List(new OneToOneDependency(oneParent)))
+
+ protected def internalGetPartitions: Array[Partition]
+
+ override def getPartitions: Array[Partition] = {
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(config.value.value)
+ internalGetPartitions
+ }
// RDD compute logic should be here
def internalCompute(split: Partition, context: TaskContext): Iterator[T]
final def compute(split: Partition, context: TaskContext): Iterator[T] = {
- CarbonInputFormatUtil.setS3Configurations(getConf)
+ TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll())
+ carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", config
+ .value.value)
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
val carbonTaskInfo = new CarbonTaskInfo
@@ -79,13 +78,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
}
def getConf: Configuration = {
- val configuration = new Configuration(false)
- val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
- .unCompressByte(confBytes))
- val ois = new ObjectInputStream(bai)
- configuration.readFields(ois)
- ois.close()
- configuration
+ config.value.value
}
}
@@ -93,12 +86,14 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
* This RDD contains TableInfo object which is serialized and deserialized in driver and executor
*/
abstract class CarbonRDDWithTableInfo[T: ClassTag](
- @transient sc: SparkContext,
+ @transient ss: SparkSession,
@transient private var deps: Seq[Dependency[_]],
- serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps, sc.hadoopConfiguration) {
+ serializedTableInfo: Array[Byte]) extends CarbonRDD[T](ss, deps) {
- def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) =
- this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo)
+ def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_],
+ serializedTableInfo: Array[Byte]) = {
+ this (sparkSession, List(new OneToOneDependency(oneParent)), serializedTableInfo)
+ }
def getTableInfo: TableInfo = TableInfo.deserialize(serializedTableInfo)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9604cd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index 9452777..241720a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.AlterPartitionModel
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.util.CarbonException
@@ -36,6 +37,7 @@ import org.apache.spark.util.PartitionUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -65,7 +67,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
absoluteTableIdentifier: AbsoluteTableIdentifier,
partitionIds: Seq[String],
bucketId: Int)
- extends RDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkContext, Nil) {
+ extends CarbonRDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkSession, Nil) {
private val queryId = alterPartitionModel.sqlContext.sparkContext.getConf
.get("queryId", System.nanoTime() + "")
@@ -91,9 +93,9 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
val dictionaryIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]()
val measureIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]()
- override def getPartitions: Array[Partition] = {
+ override def internalGetPartitions: Array[Partition] = {
val parallelism = sparkContext.defaultParallelism
- val jobConf = new JobConf(new Configuration)
+ val jobConf = new JobConf(FileFactory.getConfiguration)
val job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonTableInputFormat(absoluteTableIdentifier,
partitionIds.toList.asJava, job)
@@ -127,8 +129,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
result.toArray(new Array[Partition](result.size()))
}
- override def compute(split: Partition, context: TaskContext):
- Iterator[(AnyRef, Array[AnyRef])] = {
+ override def internalCompute(split: Partition, context: TaskContext):
+ Iterator[(AnyRef, Array[AnyRef])] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var exec : CarbonSplitExecutor = null
val rows : java.util.List[(AnyRef, Array[AnyRef])] = new ArrayList[(AnyRef, Array[AnyRef])]()
@@ -142,7 +144,8 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
var result : java.util.List[PartitionSpliterRawResultIterator] = null
try {
exec = new CarbonSplitExecutor(segmentMapping, carbonTable)
- result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl())
+ result = exec.processDataBlocks(segmentId, new SparkDataTypeConverterImpl(),
+ FileFactory.getConfiguration)
} catch {
case e: Throwable =>
LOGGER.error(e)