You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/14 16:59:43 UTC
[5/7] carbondata git commit: [CARBONDATA-1573] [Integration] Support
Database Location Configuration while Creating Database/ Support Creation of
carbon Table in the database location
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java b/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
index 8bb2052..86ba9c8 100644
--- a/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
@@ -33,6 +33,7 @@ import java.util.Properties;
import java.util.UUID;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnIdentifier;
import org.apache.carbondata.core.util.path.CarbonStorePath;
@@ -65,6 +66,8 @@ public class CarbonDictionaryWriterImplTest {
private CarbonTableIdentifier carbonTableIdentifier;
+ private AbsoluteTableIdentifier absoluteTableIdentifier;
+
private String databaseName;
private String tableName;
@@ -100,8 +103,9 @@ public class CarbonDictionaryWriterImplTest {
this.carbonStorePath = props.getProperty("storePath", "carbonStore");
this.columnIdentifier = new ColumnIdentifier("Name", null, null);
carbonTableIdentifier = new CarbonTableIdentifier(databaseName, tableName, UUID.randomUUID().toString());
+ absoluteTableIdentifier = new AbsoluteTableIdentifier(carbonStorePath, carbonTableIdentifier);
this.dictionaryColumnUniqueIdentifier =
- new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier,
+ new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, columnIdentifier,
columnIdentifier.getDataType(),
CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier));
deleteStorePath();
@@ -183,8 +187,7 @@ public class CarbonDictionaryWriterImplTest {
*/
private CarbonDictionaryWriterImpl prepareWriter() throws IOException {
initDictionaryDirPaths();
- return new CarbonDictionaryWriterImpl(this.carbonStorePath, carbonTableIdentifier,
- dictionaryColumnUniqueIdentifier);
+ return new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
}
/**
@@ -438,8 +441,7 @@ public class CarbonDictionaryWriterImplTest {
*/
private List<CarbonDictionaryColumnMetaChunk> readDictionaryMetadataFile() throws IOException {
CarbonDictionaryMetadataReaderImpl columnMetadataReaderImpl =
- new CarbonDictionaryMetadataReaderImpl(this.carbonStorePath, this.carbonTableIdentifier,
- this.dictionaryColumnUniqueIdentifier);
+ new CarbonDictionaryMetadataReaderImpl(this.dictionaryColumnUniqueIdentifier);
List<CarbonDictionaryColumnMetaChunk> dictionaryMetaChunkList = null;
// read metadata file
try {
@@ -457,8 +459,7 @@ public class CarbonDictionaryWriterImplTest {
private List<byte[]> readDictionaryFile(long dictionaryStartOffset, long dictionaryEndOffset)
throws IOException {
CarbonDictionaryReaderImpl dictionaryReader =
- new CarbonDictionaryReaderImpl(this.carbonStorePath, this.carbonTableIdentifier,
- this.dictionaryColumnUniqueIdentifier);
+ new CarbonDictionaryReaderImpl(this.dictionaryColumnUniqueIdentifier);
List<byte[]> dictionaryValues = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
try {
if (0 == dictionaryEndOffset) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java b/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
index d04d8a2..e64726a 100644
--- a/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnIdentifier;
import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReader;
@@ -45,6 +46,7 @@ public class CarbonDictionarySortIndexWriterImplTest {
private String storePath;
private CarbonTableIdentifier carbonTableIdentifier = null;
+ private AbsoluteTableIdentifier absoluteTableIdentifier = null;
private ColumnIdentifier columnIdentifier = null;
private CarbonDictionaryWriter dictionaryWriter = null;
private CarbonDictionarySortIndexWriter dictionarySortIndexWriter = null;
@@ -54,15 +56,21 @@ public class CarbonDictionarySortIndexWriterImplTest {
storePath = "target/carbonStore";
carbonTableIdentifier =
new CarbonTableIdentifier("testSchema", "carbon", UUID.randomUUID().toString());
+ String tablePath =
+ storePath + "/" + carbonTableIdentifier.getDatabaseName() + "/" + carbonTableIdentifier
+ .getTableName();
+ absoluteTableIdentifier = new AbsoluteTableIdentifier(tablePath, carbonTableIdentifier);
columnIdentifier = new ColumnIdentifier("Name", null, null);
- DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier, columnIdentifier.getDataType(),
- CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
+ DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
+ new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, columnIdentifier,
+ columnIdentifier.getDataType(),
+ CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier));
dictionaryWriter =
- new CarbonDictionaryWriterImpl(storePath, carbonTableIdentifier, dictionaryColumnUniqueIdentifier);
+ new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
dictionarySortIndexWriter =
- new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, dictionaryColumnUniqueIdentifier, storePath);
+ new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier);
carbonDictionarySortIndexReader =
- new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, dictionaryColumnUniqueIdentifier, storePath);
+ new CarbonDictionarySortIndexReaderImpl(dictionaryColumnUniqueIdentifier);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
index 4c5b359..8be1e2e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
@@ -31,9 +31,9 @@ public class CacheClient {
private CacheAccessClient<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>
segmentAccessClient;
- public CacheClient(String storePath) {
+ public CacheClient() {
Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentCache =
- CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE, storePath);
+ CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE);
segmentAccessClient = new CacheAccessClient<>(segmentCache);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index e5aac84..0aa2974 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -82,7 +82,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -96,7 +95,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.util.StringUtils;
/**
* Carbon Input format class representing one carbon table
@@ -116,7 +114,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
-
+ private static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
+ private static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
// a cache for carbon table, it will be used in task side
private CarbonTable carbonTable;
@@ -313,11 +312,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
throws IOException {
String dirs = configuration.get(INPUT_DIR, "");
- String[] inputPaths = StringUtils.split(dirs);
- if (inputPaths.length == 0) {
- throw new InvalidPathException("No input paths specified in job");
- }
- return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
+ return AbsoluteTableIdentifier
+ .from(dirs, getDatabaseName(configuration), getTableName(configuration));
}
/**
@@ -331,7 +327,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
*/
@Override public List<InputSplit> getSplits(JobContext job) throws IOException {
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
- CacheClient cacheClient = new CacheClient(identifier.getStorePath());
+ CacheClient cacheClient = new CacheClient();
try {
List<String> invalidSegments = new ArrayList<>();
List<UpdateVO> invalidTimestampsList = new ArrayList<>();
@@ -727,7 +723,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
*/
public BlockMappingVO getBlockRowCount(JobContext job,
AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException, KeyGenException {
- CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
+ CacheClient cacheClient = new CacheClient();
try {
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(absoluteTableIdentifier);
@@ -978,4 +974,24 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
return new String[] { "0" };
}
+ public static void setDatabaseName(Configuration configuration, String databaseName) {
+ if (null != databaseName) {
+ configuration.set(DATABASE_NAME, databaseName);
+ }
+ }
+
+ public static String getDatabaseName(Configuration configuration) {
+ return configuration.get(DATABASE_NAME);
+ }
+
+ public static void setTableName(Configuration configuration, String tableName) {
+ if (null != tableName) {
+ configuration.set(TABLE_NAME, tableName);
+ }
+ }
+
+ public static String getTableName(Configuration configuration) {
+ return configuration.get(TABLE_NAME);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index f3963ad..6e840e2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
@@ -88,7 +89,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -98,7 +98,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.util.StringUtils;
/**
* Input format of CarbonData file.
@@ -123,6 +122,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
+ public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
+ public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
// a cache for carbon table, it will be used in task side
private CarbonTable carbonTable;
@@ -288,12 +289,13 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
throws IOException {
- String dirs = configuration.get(INPUT_DIR, "");
- String[] inputPaths = StringUtils.split(dirs);
- if (inputPaths.length == 0) {
- throw new InvalidPathException("No input paths specified in job");
+ String tablePath = configuration.get(INPUT_DIR, "");
+ try {
+ return AbsoluteTableIdentifier
+ .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
+ } catch (InvalidConfigurationException e) {
+ throw new IOException(e);
}
- return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
}
/**
@@ -941,4 +943,34 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
}
return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
}
+
+ public static void setDatabaseName(Configuration configuration, String databaseName) {
+ if (null != databaseName) {
+ configuration.set(DATABASE_NAME, databaseName);
+ }
+ }
+
+ public static String getDatabaseName(Configuration configuration)
+ throws InvalidConfigurationException {
+ String databseName = configuration.get(DATABASE_NAME);
+ if (null == databseName) {
+ throw new InvalidConfigurationException("Database name is not set.");
+ }
+ return databseName;
+ }
+
+ public static void setTableName(Configuration configuration, String tableName) {
+ if (null != tableName) {
+ configuration.set(TABLE_NAME, tableName);
+ }
+ }
+
+ public static String getTableName(Configuration configuration)
+ throws InvalidConfigurationException {
+ String tableName = configuration.get(TABLE_NAME);
+ if (tableName == null) {
+ throw new InvalidConfigurationException("Table name is not set");
+ }
+ return tableName;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index 66a06ba..1b875bc 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -99,8 +99,6 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
DataMapDistributable distributable = (DataMapDistributable)inputSplit;
- AbsoluteTableIdentifier identifier =
- AbsoluteTableIdentifier.fromTablePath(distributable.getTablePath());
TableDataMap dataMap = DataMapStoreManager.getInstance()
.getDataMap(identifier, distributable.getDataMapName(),
distributable.getDataMapFactoryClass());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
index 4268ee2..37796db 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
@@ -98,7 +98,7 @@ class InMemoryBTreeIndex implements Index {
private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
JobContext job, AbsoluteTableIdentifier identifier) throws IOException {
Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
- CacheClient cacheClient = new CacheClient(identifier.getStorePath());
+ CacheClient cacheClient = new CacheClient();
TableSegmentUniqueIdentifier segmentUniqueIdentifier =
new TableSegmentUniqueIdentifier(identifier, segment.getId());
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
index 32d879f..f4927dd 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
@@ -61,10 +61,10 @@ public class DictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> {
.hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumns[i].isComplex()) {
CacheProvider cacheProvider = CacheProvider.getInstance();
Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
- .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
+ .createCache(CacheType.FORWARD_DICTIONARY);
dataTypes[i] = carbonColumns[i].getDataType();
dictionaries[i] = forwardDictionaryCache.get(new DictionaryColumnUniqueIdentifier(
- absoluteTableIdentifier.getCarbonTableIdentifier(),
+ absoluteTableIdentifier,
carbonColumns[i].getColumnIdentifier(), dataTypes[i],
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)));
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
index eb07f7e..e6feb93 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
@@ -96,7 +96,7 @@ public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
boolean isDirectDictionary =
CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
DictionaryColumnUniqueIdentifier dictionarIdentifier =
- new DictionaryColumnUniqueIdentifier(carbontable.getCarbonTableIdentifier(),
+ new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
child.getColumnIdentifier(), child.getDataType(),
CarbonStorePath.getCarbonTablePath(carbontable.getAbsoluteTableIdentifier()));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index 4e3e6cf..a22461d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -279,7 +279,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
fileSplit.getStart() == 0);
cacheProvider = CacheProvider.getInstance();
- cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, carbonTable.getStorePath());
+ cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
outputSchema = new StructType(CarbonTypeUtil.convertCarbonSchemaToSparkSchema(projection));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index b4444be..630828a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -82,6 +82,10 @@ public class CarbonInputFormatUtil {
AbsoluteTableIdentifier identifier,
Job job) throws IOException {
CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>();
+ carbonInputFormat.setDatabaseName(job.getConfiguration(),
+ identifier.getCarbonTableIdentifier().getDatabaseName());
+ carbonInputFormat
+ .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
return carbonInputFormat;
}
@@ -90,6 +94,10 @@ public class CarbonInputFormatUtil {
AbsoluteTableIdentifier identifier, List<String> partitionId, Job job) throws IOException {
CarbonTableInputFormat<V> carbonTableInputFormat = new CarbonTableInputFormat<>();
carbonTableInputFormat.setPartitionIdList(job.getConfiguration(), partitionId);
+ carbonTableInputFormat.setDatabaseName(job.getConfiguration(),
+ identifier.getCarbonTableIdentifier().getDatabaseName());
+ carbonTableInputFormat
+ .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName());
FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
return carbonTableInputFormat;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index f6c9e59..f1ce324 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -61,7 +61,7 @@ public class SchemaReader {
TableInfo wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(tableInfo,
identifier.getCarbonTableIdentifier().getDatabaseName(), tableName,
- identifier.getStorePath());
+ identifier.getTablePath());
wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath));
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
return CarbonMetadata.getInstance().getCarbonTable(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
index 865dabe..068d8b3 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
@@ -189,6 +189,10 @@ public class CarbonInputMapperTest extends TestCase {
if (filter != null) {
CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
}
+ CarbonInputFormat.setDatabaseName(job.getConfiguration(),
+ abs.getCarbonTableIdentifier().getDatabaseName());
+ CarbonInputFormat.setTableName(job.getConfiguration(),
+ abs.getCarbonTableIdentifier().getTableName());
FileInputFormat.addInputPath(job, new Path(abs.getTablePath()));
CarbonUtil.deleteFoldersAndFiles(new File(outPath + "1"));
FileOutputFormat.setOutputPath(job, new Path(outPath + "1"));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 3b5d736..b4145ef 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -99,14 +99,15 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
public class StoreCreator {
private static AbsoluteTableIdentifier absoluteTableIdentifier;
+ private static String storePath = null;
static {
try {
- String storePath = new File("target/store").getCanonicalPath();
+ storePath = new File("target/store").getCanonicalPath();
String dbName = "testdb";
String tableName = "testtable";
absoluteTableIdentifier =
- new AbsoluteTableIdentifier(storePath, new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+ new AbsoluteTableIdentifier(storePath +"/testdb/testtable", new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
} catch (IOException ex) {
}
@@ -126,7 +127,7 @@ public class StoreCreator {
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
loadModel.setFactFilePath(factFilePath);
loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
- loadModel.setStorePath(absoluteTableIdentifier.getStorePath());
+ loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
loadModel.setDateFormat(null);
loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -163,17 +164,17 @@ public class StoreCreator {
try {
String factFilePath =
new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
- File storeDir = new File(absoluteTableIdentifier.getStorePath());
+ File storeDir = new File(storePath);
CarbonUtil.deleteFoldersAndFiles(storeDir);
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
- absoluteTableIdentifier.getStorePath());
+ storePath);
CarbonTable table = createTable(absoluteTableIdentifier);
writeDictionary(factFilePath, table);
CarbonLoadModel loadModel =
buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier);
- executeGraph(loadModel, absoluteTableIdentifier.getStorePath());
+ executeGraph(loadModel, storePath);
} catch (Exception e) {
e.printStackTrace();
@@ -183,7 +184,7 @@ public class StoreCreator {
public static CarbonTable createTable(
AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
TableInfo tableInfo = new TableInfo();
- tableInfo.setStorePath(absoluteTableIdentifier.getStorePath());
+ tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
TableSchema tableSchema = new TableSchema();
tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
@@ -276,8 +277,7 @@ public class StoreCreator {
tableInfo.setLastUpdatedTime(System.currentTimeMillis());
tableInfo.setFactTable(tableSchema);
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ .getCarbonTablePath(absoluteTableIdentifier);
String schemaFilePath = carbonTablePath.getSchemaFilePath();
String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
tableInfo.setMetaDataFilepath(schemaMetadataPath);
@@ -327,34 +327,33 @@ public class StoreCreator {
}
Cache dictCache = CacheProvider.getInstance()
- .createCache(CacheType.REVERSE_DICTIONARY, absoluteTableIdentifier.getStorePath());
+ .createCache(CacheType.REVERSE_DICTIONARY);
for (int i = 0; i < set.length; i++) {
ColumnIdentifier columnIdentifier = new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
- new DictionaryColumnUniqueIdentifier(table.getCarbonTableIdentifier(), columnIdentifier,
+ new DictionaryColumnUniqueIdentifier(table.getAbsoluteTableIdentifier(), columnIdentifier,
columnIdentifier.getDataType(), CarbonStorePath
- .getCarbonTablePath(table.getStorePath(), table.getCarbonTableIdentifier()));
+ .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(),
+ table.getCarbonTableIdentifier()));
CarbonDictionaryWriter writer =
- new CarbonDictionaryWriterImpl(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier);
+ new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
for (String value : set[i]) {
writer.write(value);
}
writer.close();
writer.commit();
Dictionary dict = (Dictionary) dictCache.get(
- new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
+ new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
columnIdentifier, dims.get(i).getDataType(),CarbonStorePath
- .getCarbonTablePath(table.getStorePath(), table.getCarbonTableIdentifier())));
+ .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(),
+ table.getCarbonTableIdentifier())));
CarbonDictionarySortInfoPreparator preparator =
new CarbonDictionarySortInfoPreparator();
List<String> newDistinctValues = new ArrayList<String>();
CarbonDictionarySortInfo dictionarySortInfo =
preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType());
CarbonDictionarySortIndexWriter carbonDictionaryWriter =
- new CarbonDictionarySortIndexWriterImpl(
- absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier,
- absoluteTableIdentifier.getStorePath());
+ new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier);
try {
carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex());
carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted());
@@ -379,7 +378,7 @@ public class StoreCreator {
String databaseName = loadModel.getDatabaseName();
String tableName = loadModel.getTableName();
String tempLocationKey = databaseName + '_' + tableName + "_1";
- CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation);
+ CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation + "/" + databaseName + "/" + tableName);
CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
CarbonProperties.getInstance().addProperty("send.signal.load", "false");
CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true");
@@ -421,7 +420,7 @@ public class StoreCreator {
CSVRecordReaderIterator readerIterator = new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext);
new DataLoadExecutor().execute(loadModel,
- new String[] {storeLocation},
+ new String[] {storeLocation + "/" + databaseName + "/" + tableName},
new CarbonIterator[]{readerIterator});
writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
index 2a19271..2f770cd 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -84,12 +83,11 @@ public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T
.hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumns[i].isComplex()) {
CacheProvider cacheProvider = CacheProvider.getInstance();
Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
- .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
+ .createCache(CacheType.FORWARD_DICTIONARY);
dataTypes[i] = carbonColumns[i].getDataType();
dictionaries[i] = forwardDictionaryCache.get(
- new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
- carbonColumns[i].getColumnIdentifier(), dataTypes[i],
- CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)));
+ new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+ carbonColumns[i].getColumnIdentifier()));
} else {
dataTypes[i] = carbonColumns[i].getDataType();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 4cbc692..aabd3df 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -77,8 +77,8 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
}
}
}
- AbsoluteTableIdentifier absoluteTableIdentifier =
- AbsoluteTableIdentifier.fromTablePath(validInputPath);
+ AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier
+ .from(validInputPath, getDatabaseName(configuration), getTableName(configuration));
// read the schema file to get the absoluteTableIdentifier having the correct table id
// persisted in the schema
CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala b/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
index 9c1d51e..11839c9 100644
--- a/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
+++ b/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
@@ -97,7 +97,7 @@ object HiveExample {
statement
.execute(
"ALTER TABLE HIVE_CARBON_EXAMPLE SET LOCATION " +
- s"'file:///$store/default/hive_carbon_example' ")
+ s"'file:///$store/hive_carbon_example' ")
val sql = "SELECT * FROM HIVE_CARBON_EXAMPLE"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index e49dcee..65c7373 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -93,8 +93,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
Configuration conf = new Configuration();
conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
String carbonTablePath = PathFactory.getInstance()
- .getCarbonTablePath(targetTable.getAbsoluteTableIdentifier().getStorePath(),
- targetTable.getCarbonTableIdentifier(), null).getPath();
+ .getCarbonTablePath(targetTable.getAbsoluteTableIdentifier(), null).getPath();
conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
JobConf jobConf = new JobConf(conf);
@@ -123,6 +122,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
try {
CarbonTableInputFormat
.setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath()));
+ CarbonTableInputFormat
+ .setDatabaseName(conf, identifier.getCarbonTableIdentifier().getDatabaseName());
+ CarbonTableInputFormat
+ .setTableName(conf, identifier.getCarbonTableIdentifier().getTableName());
} catch (IOException e) {
throw new RuntimeException("Unable to create the CarbonTableInputFormat", e);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 9839fc8..8e6abd4 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -293,14 +293,20 @@ public class CarbonTableReader {
// If table is not previously cached, then:
// Step 1: get store path of the table and cache it.
- String storePath = config.getStorePath();
// create table identifier. the table id is randomly generated.
cache.carbonTableIdentifier =
new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
UUID.randomUUID().toString());
+ String storePath = config.getStorePath();
+ String tablePath = storePath + "/" + cache.carbonTableIdentifier.getDatabaseName() + "/"
+ + cache.carbonTableIdentifier.getTableName();
+
// get the store path of the table.
+
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ new AbsoluteTableIdentifier(tablePath, cache.carbonTableIdentifier);
cache.carbonTablePath =
- PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier, null);
+ PathFactory.getInstance().getCarbonTablePath(absoluteTableIdentifier, null);
// cache the table
cc.put(table, cache);
@@ -325,8 +331,8 @@ public class CarbonTableReader {
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
// wrapperTableInfo is the code level information of a table in carbondata core, different from the Thrift TableInfo.
TableInfo wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
- storePath);
+ .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+ tablePath);
wrapperTableInfo.setMetaDataFilepath(
CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
@@ -354,9 +360,10 @@ public class CarbonTableReader {
Configuration config = new Configuration();
config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
String carbonTablePath = PathFactory.getInstance()
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier().getStorePath(),
- carbonTable.getCarbonTableIdentifier(), null).getPath();
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier(), null).getPath();
config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+ config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName());
+ config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getFactTableName());
try {
CarbonTableInputFormat.setTableInfo(config, tableInfo);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
index b9a9f0d..c8e74a3 100644
--- a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
+++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
@@ -59,11 +59,11 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
cacheProvider
- .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath)
+ .createCache(CacheType.FORWARD_DICTIONARY)
dataTypes(index) = carbonColumn.getDataType
dictionaries(index) = forwardDictionaryCache
- .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier
- .getCarbonTableIdentifier, carbonColumn.getColumnIdentifier))
+ .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+ carbonColumn.getColumnIdentifier))
dictionarySliceArray(index) = createSliceArrayBlock(dictionaries(index))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index e8f5d6d..17a4188 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -71,16 +71,16 @@ object CarbonDataStoreCreator {
val dbName: String = "testdb"
val tableName: String = "testtable"
val absoluteTableIdentifier = new AbsoluteTableIdentifier(
- storePath,
+ storePath + "/"+ dbName + "/" + tableName,
new CarbonTableIdentifier(dbName,
tableName,
UUID.randomUUID().toString))
val factFilePath: String = new File(dataFilePath).getCanonicalPath
- val storeDir: File = new File(absoluteTableIdentifier.getStorePath)
+ val storeDir: File = new File(absoluteTableIdentifier.getTablePath)
CarbonUtil.deleteFoldersAndFiles(storeDir)
CarbonProperties.getInstance.addProperty(
CarbonCommonConstants.STORE_LOCATION_HDFS,
- absoluteTableIdentifier.getStorePath)
+ absoluteTableIdentifier.getTablePath)
val table: CarbonTable = createTable(absoluteTableIdentifier)
writeDictionary(factFilePath, table, absoluteTableIdentifier)
val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table)
@@ -95,7 +95,7 @@ object CarbonDataStoreCreator {
absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
loadModel.setFactFilePath(factFilePath)
loadModel.setLoadMetadataDetails(new ArrayList[LoadMetadataDetails]())
- loadModel.setStorePath(absoluteTableIdentifier.getStorePath)
+ loadModel.setTablePath(absoluteTableIdentifier.getTablePath)
CarbonProperties.getInstance
.addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING, "true")
@@ -131,7 +131,7 @@ object CarbonDataStoreCreator {
loadModel.setPartitionId("0")
loadModel.setFactTimeStamp(System.currentTimeMillis())
loadModel.setMaxColumns("15")
- executeGraph(loadModel, absoluteTableIdentifier.getStorePath)
+ executeGraph(loadModel, storePath)
} catch {
case e: Exception => e.printStackTrace()
@@ -140,7 +140,7 @@ object CarbonDataStoreCreator {
private def createTable(absoluteTableIdentifier: AbsoluteTableIdentifier): CarbonTable = {
val tableInfo: TableInfo = new TableInfo()
- tableInfo.setStorePath(absoluteTableIdentifier.getStorePath)
+ tableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
tableInfo.setDatabaseName(
absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName)
val tableSchema: TableSchema = new TableSchema()
@@ -293,7 +293,7 @@ object CarbonDataStoreCreator {
tableInfo.setLastUpdatedTime(System.currentTimeMillis())
tableInfo.setFactTable(tableSchema)
val carbonTablePath: CarbonTablePath = CarbonStorePath.getCarbonTablePath(
- absoluteTableIdentifier.getStorePath,
+ absoluteTableIdentifier.getTablePath,
absoluteTableIdentifier.getCarbonTableIdentifier)
val schemaFilePath: String = carbonTablePath.getSchemaFilePath
val schemaMetadataPath: String =
@@ -351,22 +351,19 @@ object CarbonDataStoreCreator {
line = reader.readLine()
}
val dictCache: Cache[DictionaryColumnUniqueIdentifier, ReverseDictionary] = CacheProvider
- .getInstance.createCache(CacheType.REVERSE_DICTIONARY,
- absoluteTableIdentifier.getStorePath)
+ .getInstance.createCache(CacheType.REVERSE_DICTIONARY)
for (i <- set.indices) {
val columnIdentifier: ColumnIdentifier =
new ColumnIdentifier(dims.get(i).getColumnId, null, null)
val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier =
new DictionaryColumnUniqueIdentifier(
- table.getCarbonTableIdentifier,
+ table.getAbsoluteTableIdentifier,
columnIdentifier,
columnIdentifier.getDataType,
- CarbonStorePath.getCarbonTablePath(table.getStorePath,
+ CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier.getTablePath,
table.getCarbonTableIdentifier)
)
val writer: CarbonDictionaryWriter = new CarbonDictionaryWriterImpl(
- absoluteTableIdentifier.getStorePath,
- absoluteTableIdentifier.getCarbonTableIdentifier,
dictionaryColumnUniqueIdentifier)
for (value <- set(i)) {
writer.write(value)
@@ -376,10 +373,10 @@ object CarbonDataStoreCreator {
val dict: Dictionary = dictCache
.get(
new DictionaryColumnUniqueIdentifier(
- absoluteTableIdentifier.getCarbonTableIdentifier,
+ absoluteTableIdentifier,
columnIdentifier,
dims.get(i).getDataType,
- CarbonStorePath.getCarbonTablePath(table.getStorePath,
+ CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier.getTablePath,
table.getCarbonTableIdentifier)
))
.asInstanceOf[Dictionary]
@@ -391,10 +388,7 @@ object CarbonDataStoreCreator {
dict,
dims.get(i).getDataType)
val carbonDictionaryWriter: CarbonDictionarySortIndexWriter =
- new CarbonDictionarySortIndexWriterImpl(
- absoluteTableIdentifier.getCarbonTableIdentifier,
- dictionaryColumnUniqueIdentifier,
- absoluteTableIdentifier.getStorePath)
+ new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier)
try {
carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
carbonDictionaryWriter.writeInvertedSortIndex(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
index 8115e27..f49e475 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
@@ -52,7 +52,7 @@ class CarbonV1toV3CompatabilityTestCase extends QueryTest with BeforeAndAfterAll
localspark.sessionState.asInstanceOf[CarbonSessionState].metadataHive
.runSqlHive(
s"ALTER TABLE default.t3 SET SERDEPROPERTIES" +
- s"('tablePath'='$storeLocation/default/t3')")
+ s"('tablePath'='$storeLocation/default/t3', 'dbname'='default', 'tablename'='t3')")
localspark.sql("show tables").show()
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index 01146ee..2e26d7f 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -25,9 +25,10 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
/**
* Test Class for AlterTableTestCase to verify all scenerios
@@ -106,12 +107,15 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
}
private def getIndexFileCount(dbName: String, tableName: String, segment: String): Int = {
- val identifier = AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sqlContext.sparkSession).storePath, dbName, tableName)
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName)
+ val identifier = carbonTable.getAbsoluteTableIdentifier
val path = CarbonTablePath
.getSegmentPath(identifier.getTablePath, segment)
val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonTablePath
- .INDEX_FILE_EXT)
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.endsWith(CarbonTablePath
+ .INDEX_FILE_EXT)
+ }
})
if (carbonFiles != null) {
carbonFiles.length
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/resources/dblocation/test.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/dblocation/test.csv b/integration/spark-common-test/src/test/resources/dblocation/test.csv
new file mode 100644
index 0000000..1dceb63
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/dblocation/test.csv
@@ -0,0 +1,6 @@
+c1,c2,c3,c5
+a,1,aa,aaa
+b,2,bb,bbb
+c,3,cc,ccc
+d,4,dd,ddd
+e,5,ee,eee
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 23d1292..a34f479 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -49,8 +49,8 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
datbaseName: String,
tableName: String): Boolean = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(datbaseName + "_" + tableName)
- val partitionPath = CarbonStorePath.getCarbonTablePath(storeLocation,
- carbonTable.getCarbonTableIdentifier).getPartitionDir("0")
+ val partitionPath = CarbonStorePath
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
val fileType: FileFactory.FileType = FileFactory.getFileType(partitionPath)
val carbonFile = FileFactory.getCarbonFile(partitionPath, fileType)
val segments: ArrayBuffer[String] = ArrayBuffer()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 34981a2..ea1bbfd 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -24,6 +24,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.path.CarbonStorePath
+
class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
var timeStampPropOrig: String = _
override def beforeAll {
@@ -224,7 +227,10 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("insert overwrite table CarbonOverwrite select * from THive")
sql("insert overwrite table HiveOverwrite select * from THive")
checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite"))
- val folder = new File(s"$storeLocation/default/carbonoverwrite/Fact/Part0/")
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" +"carbonoverwrite")
+ val partitionPath = CarbonStorePath
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+ val folder = new File(partitionPath)
assert(folder.isDirectory)
assert(folder.list().length == 1)
}
@@ -244,7 +250,11 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' overwrite INTO TABLE HiveOverwrite")
checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite"))
- val folder = new File(s"$storeLocation/default/tcarbonsourceoverwrite/Fact/Part0/")
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" +"tcarbonsourceoverwrite")
+ val partitionPath = CarbonStorePath
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+ val folder = new File(partitionPath)
+
assert(folder.isDirectory)
assert(folder.list().length == 1)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index c7b39ad..3873b0d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.util.CarbonProperties
class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -527,7 +528,9 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf
}
private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
- val store = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" + tableName)
+ val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
+ segmentNo
new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
index 78c835a..1de5c73 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -28,6 +28,9 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.path.CarbonStorePath
+
class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
val filePath: String = s"$resourcesPath/globalsort"
val file1: String = resourcesPath + "/globalsort/sample1.csv"
@@ -528,7 +531,9 @@ class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndA
}
private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
- val store = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default" + "_" + tableName)
+ val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
+ segmentNo
new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index 29f3492..508ca6c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -42,14 +42,12 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "compactionlocktesttable", "1")
)
val carbonTablePath: CarbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath,
- absoluteTableIdentifier.getCarbonTableIdentifier
- )
+ .getCarbonTablePath(absoluteTableIdentifier)
val dataPath: String = carbonTablePath.getMetadataDirectoryPath
val carbonLock: ICarbonLock =
CarbonLockFactory
- .getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier, LockUsage.COMPACTION_LOCK)
+ .getCarbonLockObj(absoluteTableIdentifier, LockUsage.COMPACTION_LOCK)
override def beforeAll {
CarbonProperties.getInstance()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 111ede7..9bf916e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -20,7 +20,7 @@ import scala.collection.JavaConverters._
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier
import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper
import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -76,12 +76,13 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
test("delete merged folder and check segments") {
// delete merged segments
sql("clean files for table ignoremajor")
- val identifier = new AbsoluteTableIdentifier(
- CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
- )
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
+ val carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "ignoremajor")
+ val absoluteTableIdentifier = carbonTable
+ .getAbsoluteTableIdentifier
+ val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+ absoluteTableIdentifier)
// merged segment should not be there
val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
@@ -89,9 +90,8 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
assert(segments.contains("2.1"))
assert(!segments.contains("2"))
assert(!segments.contains("3"))
- val cacheClient = new CacheClient(CarbonProperties.getInstance.
- getProperty(CarbonCommonConstants.STORE_LOCATION));
- val segmentIdentifier = new TableSegmentUniqueIdentifier(identifier, "2")
+ val cacheClient = new CacheClient();
+ val segmentIdentifier = new TableSegmentUniqueIdentifier(absoluteTableIdentifier, "2")
val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentAccessClient.
getIfPresent(segmentIdentifier)
assert(null == wrapper)
@@ -109,12 +109,12 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
catch {
case _:Throwable => assert(true)
}
- val carbontablePath = CarbonStorePath
- .getCarbonTablePath(CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
- )
+
+ val carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "ignoremajor")
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+
+ val carbontablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
.getMetadataDirectoryPath
val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
@@ -130,13 +130,11 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
"delete from table ignoremajor where segment.starttime before " +
" '2222-01-01 19:35:01'"
)
+ val carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "ignoremajor")
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val carbontablePath = CarbonStorePath
- .getCarbonTablePath(CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
- )
- .getMetadataDirectoryPath
+ .getCarbonTablePath(absoluteTableIdentifier).getMetadataDirectoryPath
val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
// status should remain as compacted for segment 2.
@@ -171,12 +169,12 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
"('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
)
sql("alter table testmajor compact 'major'")
- val identifier = new AbsoluteTableIdentifier(
- CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME, "testmajor", "ttt")
- )
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
+ val carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "testmajor")
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+ absoluteTableIdentifier)
// merged segment should not be there
val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 4976c24..02560d9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -19,7 +19,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction
import scala.collection.JavaConverters._
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
@@ -77,14 +78,12 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
var status = false
var noOfRetries = 0
while (!status && noOfRetries < 10) {
+ val carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "stopmajor")
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val identifier = new AbsoluteTableIdentifier(
- CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", noOfRetries + "")
- )
-
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+ val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+ absoluteTableIdentifier)
val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
// segments.foreach(seg =>
@@ -111,12 +110,12 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
// delete merged segments
sql("clean files for table stopmajor")
- val identifier = new AbsoluteTableIdentifier(
- CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", "rrr")
- )
+ val carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + "stopmajor")
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+ val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+ absoluteTableIdentifier)
// merged segment should not be there
val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index cb3da40..ae25894 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -27,6 +27,8 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.path.CarbonStorePath
class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
var filePath: String = _
@@ -186,8 +188,12 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
}
def getIndexfileCount(tableName: String, segmentNo: String = "0"): Int = {
- val store = storeLocation +"/default/"+ tableName + "/Fact/Part0/Segment_"+segmentNo
- new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
+ val carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(CarbonCommonConstants.DATABASE_DEFAULT_NAME + "_" + tableName)
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)
+ new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
}
override def afterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index c4152a1..44bb2dd 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -22,10 +22,12 @@ import java.io.{File, FilenameFilter}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.reader.CarbonIndexFileReader
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.metadata.CarbonMetadata
+
class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
var originVersion = ""
@@ -45,7 +47,10 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table test_table_v3")
val indexReader = new CarbonIndexFileReader()
- val carbonIndexPaths = new File(s"$storeLocation/default/test_table_v3/Fact/Part0/Segment_0/")
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_test_table_v3")
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", "0")
+ val carbonIndexPaths = new File(segmentDir)
.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
name.endsWith(CarbonTablePath.getCarbonIndexExtension)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1155d4d8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 5d0c055..9f941f0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -29,6 +29,9 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.path.CarbonStorePath
+
class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
var filePath: String = s"$resourcesPath/globalsort"
@@ -236,11 +239,13 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
| STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort_update")
- sql("UPDATE carbon_localsort_update SET (name) = ('bb') WHERE id = 2").show
+ sql("UPDATE carbon_localsort_update SET (name) = ('bb') WHERE id = 2").show
+ sql("select * from carbon_localsort_update").show()
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ sql("select * from carbon_globalsort").show()
sql("UPDATE carbon_globalsort SET (name) = ('bb') WHERE id = 2").show
-
+ sql("select * from carbon_globalsort").show()
checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort"), Seq(Row(12)))
checkAnswer(sql("SELECT name FROM carbon_globalsort WHERE id = 2"), Seq(Row("bb")))
checkAnswer(sql("SELECT * FROM carbon_globalsort ORDER BY name"),
@@ -326,7 +331,9 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
}
private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
- val store = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo
- new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default"+"_"+tableName)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)
+ new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
}
}