You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2021/04/22 09:55:28 UTC
[carbondata] branch master updated: [CARBONDATA-4037] Improve the
table status and segment file writing
This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 71910fb [CARBONDATA-4037] Improve the table status and segment file writing
71910fb is described below
commit 71910fba7929353b2ee9b6582b1fb39b44abe137
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Fri Oct 16 23:33:15 2020 +0530
[CARBONDATA-4037] Improve the table status and segment file writing
Why is this PR needed?
Currently, we update table status and segment files multiple times for a
single iud/merge/compact operation and delete the index files immediately after
merge. When concurrent queries are run, there may be situations like user query
is trying to access the segment index files and they are not present, which is
availability issue.
What changes were proposed in this PR?
1. Generate segment file after merge index and update table status at beginning
and after merge index. If mergeindex/ table status update fails , load will also fail.
order:
create table status file => index files => merge index => generate segment file => update table status
* Same order is now maintained for SI, compaction, IUD, addHivePartition, addSegment scenarios.
* Whenever segment file needs to be updated for main table, a new segment file is created
instead of updating existing one.
2. When compact 'segment_index' is triggered,
For new tables - if no index files to merge, then logs warn message and exits.
For old tables - index files not deleted.
3. After SI small files merge,
For newly loaded SI segments - DeleteOldIndexOrMergeFiles deletes immediately after merge.
For segments that are already present (rebuild) - old index files and data files are not deleted.
4. Removed carbon.merge.index.in.segment property from config-parameters. This property
to be used for debugging/test purposes.
Note: Cleaning of stale index/segment files to be handled in - CARBONDATA -4074
This closes #3988
---
.../core/constants/CarbonCommonConstants.java | 12 -
.../carbondata/core/index/IndexStoreManager.java | 20 +-
.../blockletindex/SegmentIndexFileStore.java | 16 --
.../carbondata/core/metadata/SegmentFileStore.java | 217 +++++++++++------
.../carbondata/core/mutate/CarbonUpdateUtil.java | 14 +-
.../LatestFilesReadCommittedScope.java | 31 ++-
.../TableStatusReadCommittedScope.java | 38 ++-
.../carbondata/core/util/CarbonTestUtil.java | 40 +++
.../apache/carbondata/core/util/CarbonUtil.java | 18 +-
.../core/writer/CarbonIndexFileMergeWriter.java | 142 ++++++-----
docs/configuration-parameters.md | 1 -
.../hadoop/api/CarbonOutputCommitter.java | 7 +-
.../CarbonDataFileMergeTestCaseOnSI.scala | 16 +-
.../CarbonIndexFileMergeTestCaseWithSI.scala | 116 ++++-----
.../hive/MapredCarbonOutputCommitter.java | 10 +-
.../PrestoInsertIntoTableTestCase.scala | 6 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 14 +-
.../spark/rdd/CarbonTableCompactor.scala | 3 +-
.../org/apache/spark/rdd/CarbonMergeFilesRDD.scala | 38 ++-
.../spark/sql/events/MergeIndexEventListener.scala | 37 ++-
.../command/management/CarbonAddLoadCommand.scala | 25 +-
.../CarbonAlterTableAddHivePartitionCommand.scala | 143 ++++++-----
.../AlterTableMergeIndexSIEventListener.scala | 21 +-
.../events/SILoadEventListener.scala | 4 -
.../SILoadEventListenerForFailedSegments.scala | 5 -
.../spark/sql/secondaryindex/load/Compactor.scala | 17 +-
.../secondaryindex/rdd/SecondaryIndexCreator.scala | 51 ++--
.../secondaryindex/util/SecondaryIndexUtil.scala | 146 +++++------
.../CarbonIndexFileMergeTestCase.scala | 267 +++++++++++++--------
.../testsuite/iud/UpdateCarbonTableTestCase.scala | 10 +-
.../merger/CompactionResultSortProcessor.java | 9 +-
.../merger/RowResultMergerProcessor.java | 8 +-
.../processing/util/CarbonLoaderUtil.java | 22 +-
33 files changed, 891 insertions(+), 633 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 9840540..7598dea9 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -372,18 +372,6 @@ public final class CarbonCommonConstants {
public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
/**
- * It is the user defined property to specify whether to throw exception or not in case
- * if the MERGE INDEX JOB is failed. Default value - TRUE
- * TRUE - throws exception and fails the corresponding LOAD job
- * FALSE - Logs the exception and continue with the LOAD
- */
- @CarbonProperty
- public static final String CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION =
- "carbon.merge.index.failure.throw.exception";
-
- public static final String CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION_DEFAULT = "true";
-
- /**
* property to be used for specifying the max byte limit for string/varchar data type till
* where storing min/max in data file will be considered
*/
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java b/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java
index 321a774..760a3d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java
@@ -30,7 +30,6 @@ import org.apache.carbondata.common.exceptions.sql.MalformedIndexCommandExceptio
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.dev.IndexFactory;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
@@ -44,7 +43,6 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
@@ -507,16 +505,16 @@ public final class IndexStoreManager {
UpdateVO updateVO =
SegmentUpdateStatusManager.getInvalidTimestampRange(segment.getLoadMetadataDetails());
SegmentRefreshInfo segmentRefreshInfo;
+ String segmentFileName = segment.getSegmentFileName();
if ((updateVO != null && updateVO.getLatestUpdateTimestamp() != null)
- || segment.getSegmentFileName() != null) {
- long segmentFileTimeStamp;
- if (null != segment.getLoadMetadataDetails()) {
- segmentFileTimeStamp = segment.getLoadMetadataDetails().getLastModifiedTime();
- } else {
- segmentFileTimeStamp = FileFactory.getCarbonFile(CarbonTablePath
- .getSegmentFilePath(table.getTablePath(), segment.getSegmentFileName()))
- .getLastModifiedTime();
- }
+ || segmentFileName != null) {
+ // Do not use getLastModifiedTime API on segment file carbon file object as it will
+ // slow down operation in Object stores like S3. Now the segment file is always written
+ // for operations which was overwriting earlier, so this timestamp can be checked always
+ // to check whether to refresh the cache or not.
+ long segmentFileTimeStamp = Long.parseLong(segmentFileName
+ .substring(segmentFileName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1,
+ segmentFileName.lastIndexOf(CarbonCommonConstants.POINT)));
segmentRefreshInfo =
new SegmentRefreshInfo(updateVO.getLatestUpdateTimestamp(), 0, segmentFileTimeStamp);
} else {
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index a82ba8e..60c8981 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -338,22 +338,6 @@ public class SegmentIndexFileStore {
* List all the index files of the segment.
*
* @param carbonFile directory
- * @return
- */
- public static CarbonFile[] getCarbonIndexFiles(CarbonFile carbonFile) {
- return carbonFile.listFiles(new CarbonFileFilter() {
- @Override
- public boolean accept(CarbonFile file) {
- return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
- .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0);
- }
- });
- }
-
- /**
- * List all the index files of the segment.
- *
- * @param carbonFile directory
*/
public static void getCarbonIndexFilesRecursively(CarbonFile carbonFile,
List<CarbonFile> indexFiles) {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 50ba335..0bd517c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -37,6 +37,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -69,7 +70,6 @@ import org.apache.carbondata.core.util.DataFileFooterConverter;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.carbondata.core.util.TrashUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
@@ -103,9 +103,10 @@ public class SegmentFileStore {
* Write segment information to the segment folder with index file name and
* corresponding partitions.
*/
- public static void writeSegmentFile(String tablePath, final String taskNo, String location,
- String timeStamp, List<String> partitionNames) throws IOException {
- writeSegmentFile(tablePath, taskNo, location, timeStamp, partitionNames, false);
+ public static void writeSegmentFileForPartitionTable(String tablePath, final String taskNo,
+ String location, String timeStamp, List<String> partitionNames) throws IOException {
+ writeSegmentFileForPartitionTable(tablePath, taskNo, location, timeStamp, partitionNames,
+ false);
}
/**
@@ -162,8 +163,9 @@ public class SegmentFileStore {
* Write segment information to the segment folder with index file name and
* corresponding partitions.
*/
- public static void writeSegmentFile(String tablePath, final String taskNo, String location,
- String timeStamp, List<String> partitionNames, boolean isMergeIndexFlow) throws IOException {
+ public static void writeSegmentFileForPartitionTable(String tablePath, final String taskNo,
+ String location, String timeStamp, List<String> partitionNames, boolean isMergeIndexFlow)
+ throws IOException {
String tempFolderLoc = timeStamp + ".tmp";
String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
@@ -172,8 +174,12 @@ public class SegmentFileStore {
}
CarbonFile tempFolder;
if (isMergeIndexFlow) {
+ // To get the index files from the partition path directly.
+ // For old stores after merge index is triggered, we wont have '.tmp' folder.
tempFolder = FileFactory.getCarbonFile(location);
} else {
+ // When mergeindex property is disabled during compaction, we write temporary segment files
+ // with index files present in '.tmp' folder of partition path.
tempFolder = FileFactory
.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
}
@@ -199,13 +205,7 @@ public class SegmentFileStore {
folderDetails.setRelative(isRelative);
folderDetails.setPartitions(partitionNames);
folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
- for (CarbonFile file : carbonFiles) {
- if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- folderDetails.setMergeFileName(file.getName());
- } else {
- folderDetails.getFiles().add(file.getName());
- }
- }
+ setIndexFileNamesToFolderDetails(folderDetails, carbonFiles);
segmentFile.addPath(location, folderDetails);
String path = null;
if (isMergeIndexFlow) {
@@ -283,13 +283,7 @@ public class SegmentFileStore {
folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
folderDetails.setRelative(false);
segmentFile.addPath(segment.getSegmentPath(), folderDetails);
- for (CarbonFile file : indexFiles) {
- if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- folderDetails.setMergeFileName(file.getName());
- } else {
- folderDetails.getFiles().add(file.getName());
- }
- }
+ setIndexFileNamesToFolderDetails(folderDetails, indexFiles);
String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath);
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
if (!carbonFile.exists()) {
@@ -304,6 +298,17 @@ public class SegmentFileStore {
return false;
}
+ public static void setIndexFileNamesToFolderDetails(FolderDetails folderDetails,
+ CarbonFile[] indexFiles) {
+ for (CarbonFile file : indexFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ }
+
public static boolean writeSegmentFileForOthers(
CarbonTable carbonTable,
Segment segment,
@@ -350,21 +355,6 @@ public class SegmentFileStore {
return false;
}
- public static void mergeIndexAndWriteSegmentFile(CarbonTable carbonTable, String segmentId,
- String UUID) {
- String tablePath = carbonTable.getTablePath();
- String segmentFileName = genSegmentFileName(segmentId, UUID) + CarbonTablePath.SEGMENT_EXT;
- try {
- SegmentFileStore sfs = new SegmentFileStore(tablePath, segmentFileName);
- List<CarbonFile> carbonIndexFiles = sfs.getIndexCarbonFiles();
- new CarbonIndexFileMergeWriter(carbonTable)
- .writeMergeIndexFileBasedOnSegmentFile(segmentId, null, sfs,
- carbonIndexFiles.toArray(new CarbonFile[carbonIndexFiles.size()]), UUID, null);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
/**
* Write segment file to the metadata folder of the table selecting only the current load files
*
@@ -401,13 +391,7 @@ public class SegmentFileStore {
FolderDetails folderDetails = new FolderDetails();
folderDetails.setRelative(absSegPath == null);
folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
- for (CarbonFile file : indexFiles) {
- if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- folderDetails.setMergeFileName(file.getName());
- } else {
- folderDetails.getFiles().add(file.getName());
- }
- }
+ setIndexFileNamesToFolderDetails(folderDetails, indexFiles);
String segmentRelativePath = "/";
if (!supportFlatFolder) {
if (absSegPath != null) {
@@ -441,6 +425,107 @@ public class SegmentFileStore {
}
/**
+ * Get stale and invalid index files that have already been merged.
+ * As we are not deleting index files immediately after updating old tables,
+ * we will have old index files in segment folder.
+ * This method is called in following scenarios:
+ * 1. During read, when segment file is not present or gets deleted.
+ * 2. When writing segment index size in tablestatus file.
+ */
+ public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles)
+ throws IOException {
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ Set<String> mergedAndInvalidIndexFiles = new HashSet<>();
+ long lastModifiedTime = 0L;
+ String validMergeIndexFile = null;
+ List<String> mergeIndexFileNames = new ArrayList<>();
+ boolean isIndexFilesPresent = false;
+ for (String indexFile : indexFiles) {
+ if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+ isIndexFilesPresent = true;
+ }
+ if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ long indexFileTimeStamp =
+ Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile));
+ // In case there are more than 1 mergeindex files present, latest one is considered as valid
+ if (indexFileTimeStamp > lastModifiedTime) {
+ lastModifiedTime = indexFileTimeStamp;
+ validMergeIndexFile = indexFile;
+ }
+ mergeIndexFileNames.add(indexFile);
+ }
+ }
+ // Possible scenarios where we have >1 merge index file:
+ // 1. SI load when MT has stale index files. As during SI load MT segment file is not updated,
+ // we directly read from segment directory.
+ // 2. In SI table, after small files index merge(refresh command) we will have more than one
+ // mergeindex file as we are not deleting old file immediately. If segment file gets deleted,
+ // then it reads from segment directory.
+ if (mergeIndexFileNames.size() > 1 && validMergeIndexFile != null) {
+ final String validIndexFileName = validMergeIndexFile;
+ // get the invalid mergeindex files by excluding the valid file.
+ mergedAndInvalidIndexFiles.addAll(
+ mergeIndexFileNames.stream().filter(file -> !file.equalsIgnoreCase(validIndexFileName))
+ .collect(Collectors.toSet()));
+ }
+ // If both index and merge index files are present, read the valid merge index file and add its
+ // mapped index files to the excluding list.
+ // Example: We have xxx.index and xxx.mergeindex files in a segment directory.
+ // Here the index file (xxx.index) is considered invalid.
+ if (isIndexFilesPresent && validMergeIndexFile != null) {
+ indexFileStore.readMergeFile(validMergeIndexFile);
+ Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+ indexFileStore.getCarbonMergeFileToIndexFilesMap();
+ String segmentPath =
+ validMergeIndexFile.substring(0, validMergeIndexFile.lastIndexOf(File.separator) + 1);
+ mergedAndInvalidIndexFiles
+ .addAll(carbonMergeFileToIndexFilesMap.get(validMergeIndexFile).stream()
+ .map(file -> segmentPath + file).collect(Collectors.toSet()));
+ }
+ if (mergeIndexFileNames.size() == 0 && indexFiles.size() > 1) {
+ // if more than one index file present with different timestamps, then stale/invalid
+ // data is present. Latest one is considered as valid.
+ Long validFile = indexFiles.stream()
+ .map(file -> Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(file)))
+ .max(Long::compareTo).get();
+ mergedAndInvalidIndexFiles.addAll(
+ indexFiles.stream().filter(file -> !file.contains(validFile.toString()))
+ .collect(Collectors.toSet()));
+ }
+ return mergedAndInvalidIndexFiles;
+ }
+
+ /**
+ * Get valid index files excluding the invalid ones.
+ * This method is called while calculating index size using segment directory.
+ */
+ public static Object getValidCarbonIndexFiles(Object carbonFiles) throws IOException {
+ // For supporting local or default file system.
+ if (carbonFiles instanceof CarbonFile[]) {
+ Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(
+ Arrays.stream((CarbonFile[]) carbonFiles).map(file -> file.getAbsolutePath())
+ .collect(Collectors.toList()));
+ if (!mergedAndInvalidIndexFiles.isEmpty()) {
+ return Arrays.stream((CarbonFile[]) carbonFiles)
+ .filter(file -> !mergedAndInvalidIndexFiles.contains(file.getAbsolutePath()))
+ .toArray(CarbonFile[]::new);
+ }
+ }
+ // For supporting HDFS/S3/other file systems.
+ else if (carbonFiles instanceof FileStatus[]) {
+ Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(
+ Arrays.stream((FileStatus[]) carbonFiles).map(file -> file.getPath().toString())
+ .collect(Collectors.toList()));
+ if (!mergedAndInvalidIndexFiles.isEmpty()) {
+ return Arrays.stream((FileStatus[]) carbonFiles)
+ .filter(file -> !mergedAndInvalidIndexFiles.contains(file.getPath().toString()))
+ .toArray(FileStatus[]::new);
+ }
+ }
+ return carbonFiles;
+ }
+
+ /**
* Move the loaded data from source folder to destination folder.
*/
private static void moveFromTempFolder(String source, String dest) {
@@ -654,13 +739,23 @@ public class SegmentFileStore {
String location = spec.getLocation().toString();
CarbonFile carbonFile = FileFactory.getCarbonFile(location);
- CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
- @Override
- public boolean accept(CarbonFile file) {
- return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
- }
- });
+ CarbonFile[] listFiles =
+ carbonFile.listFiles(file -> CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath()));
if (listFiles != null && listFiles.length > 0) {
+ Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(
+ Arrays.stream(listFiles).map(CarbonFile::getAbsolutePath).collect(Collectors.toList()));
+ // In add hive partition after merge index is written, delete index files that are merged.
+ for (CarbonFile indexFile : listFiles) {
+ if (mergedAndInvalidIndexFiles.contains(indexFile.getAbsolutePath())) {
+ indexFile.delete();
+ }
+ }
+ CarbonFile[] carbonIndexFiles = listFiles;
+ if (!mergedAndInvalidIndexFiles.isEmpty()) {
+ carbonIndexFiles = Arrays.stream(listFiles)
+ .filter(file -> !mergedAndInvalidIndexFiles.contains(file.getAbsolutePath()))
+ .toArray(CarbonFile[]::new);
+ }
boolean isRelative = false;
if (location.startsWith(tablePath)) {
location = location.substring(tablePath.length());
@@ -671,13 +766,7 @@ public class SegmentFileStore {
folderDetails.setRelative(isRelative);
folderDetails.setPartitions(spec.getPartitions());
folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
- for (CarbonFile file : listFiles) {
- if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- folderDetails.setMergeFileName(file.getName());
- } else {
- folderDetails.getFiles().add(file.getName());
- }
- }
+ setIndexFileNamesToFolderDetails(folderDetails, carbonIndexFiles);
localSegmentFile.addPath(location, folderDetails);
if (segmentFile == null) {
segmentFile = localSegmentFile;
@@ -1336,24 +1425,6 @@ public class SegmentFileStore {
}
/**
- * This method returns the list of index/merge index files for a segment in carbonTable.
- */
- public static Set<String> getIndexFilesListForSegment(Segment segment, String tablePath)
- throws IOException {
- Set<String> indexFiles;
- if (segment.getSegmentFileName() == null) {
- String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo());
- indexFiles =
- new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(segmentPath).keySet();
- } else {
- SegmentFileStore segmentFileStore =
- new SegmentFileStore(tablePath, segment.getSegmentFileName());
- indexFiles = segmentFileStore.getIndexAndMergeFiles().keySet();
- }
- return indexFiles;
- }
-
- /**
* It contains the segment information like location, partitions and related index files
*/
public static class SegmentFile implements Serializable {
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 41c4b2b..c02af10 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -290,6 +290,8 @@ public class CarbonUpdateUtil {
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+ // to update table status only when required.
+ boolean isUpdateRequired = false;
for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
// we are storing the link between the 2 status files in the segment 0 only.
@@ -304,6 +306,7 @@ public class CarbonUpdateUtil {
if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName()))) {
loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
loadMetadata.setModificationOrDeletionTimestamp(Long.parseLong(updatedTimeStamp));
+ isUpdateRequired = true;
}
}
for (Segment segName : updatedSegmentsList) {
@@ -318,19 +321,23 @@ public class CarbonUpdateUtil {
}
// update end timestamp for each time.
loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp);
+ isUpdateRequired = true;
}
if (segmentFilesTobeUpdated
.contains(Segment.toSegment(loadMetadata.getLoadName(), null))) {
loadMetadata.setSegmentFile(loadMetadata.getLoadName() + "_" + updatedTimeStamp
+ CarbonTablePath.SEGMENT_EXT);
+ isUpdateRequired = true;
}
}
}
}
try {
- SegmentStatusManager
- .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+ if (isUpdateRequired || isUpdateStatusFileUpdateRequired) {
+ SegmentStatusManager
+ .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+ }
} catch (IOException e) {
return false;
}
@@ -508,7 +515,8 @@ public class CarbonUpdateUtil {
int maxTime;
try {
maxTime = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
+ .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME,
+ Integer.toString(CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME)));
} catch (NumberFormatException e) {
maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
}
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index c4f948a..9077707 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.readcommitter;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -24,6 +25,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -31,6 +34,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
@@ -126,7 +130,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
}
@Override
- public Map<String, String> getCommittedIndexFile(Segment segment) {
+ public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
Map<String, String> indexFileStore = new HashMap<>();
Map<String, List<String>> snapShot = readCommittedIndexFileSnapShot.getSegmentIndexFileMap();
String segName;
@@ -135,15 +139,26 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
} else {
segName = segment.getSegmentFileName();
}
- List<String> index = snapShot.get(segName);
- if (null == index) {
- index = new LinkedList<>();
+ List<String> indexFiles = snapShot.get(segName);
+ if (null == indexFiles) {
+ indexFiles = new LinkedList<>();
}
- for (String indexPath : index) {
- if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
- indexFileStore.put(indexPath, indexPath.substring(indexPath.lastIndexOf('/') + 1));
+ Set<String> mergedIndexFiles =
+ SegmentFileStore.getInvalidAndMergedIndexFiles(indexFiles);
+ List<String> filteredIndexFiles = indexFiles;
+ if (!mergedIndexFiles.isEmpty()) {
+ // do not include already merged index files details.
+ filteredIndexFiles = indexFiles.stream().filter(
+ file -> !mergedIndexFiles.contains(file))
+ .collect(Collectors.toList());
+ }
+ for (String indexFile : filteredIndexFiles) {
+ if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ indexFileStore
+ .put(indexFile, indexFile.substring(indexFile.lastIndexOf(File.separator) + 1));
+ break;
} else {
- indexFileStore.put(indexPath, null);
+ indexFileStore.put(indexFile, null);
}
}
return indexFileStore;
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index cd3992b..2c9ab84 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -18,11 +18,14 @@
package org.apache.carbondata.core.readcommitter;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -79,13 +82,25 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
@Override
public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
Map<String, String> indexFiles;
- if (segment.getSegmentFileName() == null) {
+ SegmentFileStore fileStore = null;
+ if (segment.getSegmentFileName() != null) {
+ fileStore = new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+ }
+ if (segment.getSegmentFileName() == null || fileStore.getSegmentFile() == null) {
String path =
CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
+ Set<String> mergedIndexFiles =
+ SegmentFileStore.getInvalidAndMergedIndexFiles(new ArrayList<>(indexFiles.keySet()));
+ Map<String, String> filteredIndexFiles = indexFiles;
+ if (mergedIndexFiles.size() > 0) {
+ // do not include already merged index files details.
+ filteredIndexFiles = indexFiles.entrySet().stream()
+ .filter(indexFile -> !mergedIndexFiles.contains(indexFile.getKey()))
+ .collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue()), HashMap::putAll);
+ }
+ return filteredIndexFiles;
} else {
- SegmentFileStore fileStore =
- new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
indexFiles = fileStore.getIndexAndMergeFiles();
if (fileStore.getSegmentFile() != null) {
segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
@@ -97,12 +112,15 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, UpdateVO updateVo) {
SegmentRefreshInfo segmentRefreshInfo;
long segmentFileTimeStamp = 0L;
- if (null != segment.getLoadMetadataDetails()) {
- segmentFileTimeStamp = segment.getLoadMetadataDetails().getLastModifiedTime();
- } else if (null != segment.getSegmentFileName()) {
- segmentFileTimeStamp = FileFactory.getCarbonFile(CarbonTablePath
- .getSegmentFilePath(identifier.getTablePath(), segment.getSegmentFileName()))
- .getLastModifiedTime();
+ String segmentFileName = segment.getSegmentFileName();
+ if (null != segmentFileName) {
+ // Do not use getLastModifiedTime API on segment file carbon file object as it will slow down
+ // operation in Object stores like S3. Now the segment file is always written for operations
+ // which was overwriting earlier, so this timestamp can be checked always to check whether
+ // to refresh the cache or not
+ segmentFileTimeStamp = Long.parseLong(segmentFileName
+ .substring(segmentFileName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1,
+ segmentFileName.lastIndexOf(CarbonCommonConstants.POINT)));
}
if (updateVo != null) {
segmentRefreshInfo =
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java
index 822c7fa..25bbc0b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java
@@ -23,6 +23,7 @@ import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
@@ -43,10 +44,14 @@ import org.apache.carbondata.core.datastore.page.LazyColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.LocalDictionaryChunk;
public class CarbonTestUtil {
@@ -161,6 +166,41 @@ public class CarbonTestUtil {
return isLocalDictionaryGenerated;
}
+ public static int getSegmentFileCount(String tableName) throws IOException {
+ CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableName);
+ CarbonFile segmentsFolder = FileFactory
+ .getCarbonFile(CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath()));
+ assert (segmentsFolder.isFileExist());
+ return segmentsFolder.listFiles(true).size();
+ }
+
+ public static int getIndexFileCount(String tableName, String segment) throws IOException {
+ return getIndexFileCount(tableName, segment, null);
+ }
+
+ public static int getIndexFileCount(String tableName,
+ String segment, String extension) throws IOException {
+ if (extension == null) {
+ extension = CarbonTablePath.INDEX_FILE_EXT;
+ }
+ CarbonTable table = CarbonMetadata.getInstance().getCarbonTable(tableName);
+ String path = CarbonTablePath
+ .getSegmentPath(table.getAbsoluteTableIdentifier().getTablePath(), segment);
+ boolean recursive = false;
+ if (table.isHivePartitionTable()) {
+ path = table.getAbsoluteTableIdentifier().getTablePath();
+ recursive = true;
+ }
+ List<CarbonFile> carbonFiles = FileFactory.getCarbonFile(path).listFiles(recursive,
+ file -> file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+ .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
+ CarbonFile[] validIndexFiles = (CarbonFile[]) SegmentFileStore
+ .getValidCarbonIndexFiles(carbonFiles.toArray(new CarbonFile[carbonFiles.size()]));
+ String finalExtension = extension;
+ return Arrays.stream(validIndexFiles).filter(file -> file.getName().endsWith(finalExtension))
+ .toArray().length;
+ }
+
public static void copy(String oldLoc, String newLoc) throws IOException {
CarbonFile oldFolder = FileFactory.getCarbonFile(oldLoc);
FileFactory.mkdirs(newLoc, FileFactory.getConfiguration());
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index a5e6a12..7b762ba 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -114,7 +114,6 @@ import org.apache.carbondata.format.IndexHeader;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
@@ -2476,7 +2475,9 @@ public final class CarbonUtil {
if (fs.exists(path)) {
FileStatus[] fileStatuses = fs.listStatus(path);
if (null != fileStatuses) {
- for (FileStatus dataAndIndexStatus : fileStatuses) {
+ FileStatus[] validDataAndIndexStatus =
+ (FileStatus[]) SegmentFileStore.getValidCarbonIndexFiles(fileStatuses);
+ for (FileStatus dataAndIndexStatus : validDataAndIndexStatus) {
String pathName = dataAndIndexStatus.getPath().getName();
if (pathName.endsWith(CarbonTablePath.getCarbonIndexExtension()) || pathName
.endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) {
@@ -2491,17 +2492,18 @@ public final class CarbonUtil {
case LOCAL:
default:
segmentPath = FileFactory.getUpdatedFilePath(segmentPath);
- File file = new File(segmentPath);
- File[] segmentFiles = file.listFiles();
- if (null != segmentFiles) {
- for (File dataAndIndexFile : segmentFiles) {
+ CarbonFile[] dataAndIndexFiles = FileFactory.getCarbonFile(segmentPath).listFiles();
+ if (null != dataAndIndexFiles) {
+ CarbonFile[] validDataAndIndexFiles =
+ (CarbonFile[]) SegmentFileStore.getValidCarbonIndexFiles(dataAndIndexFiles);
+ for (CarbonFile dataAndIndexFile : validDataAndIndexFiles) {
if (dataAndIndexFile.getCanonicalPath()
.endsWith(CarbonTablePath.getCarbonIndexExtension()) || dataAndIndexFile
.getCanonicalPath().endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) {
- carbonIndexSize += FileUtils.sizeOf(dataAndIndexFile);
+ carbonIndexSize += dataAndIndexFile.getSize();
} else if (dataAndIndexFile.getCanonicalPath()
.endsWith(CarbonTablePath.getCarbonDataExtension())) {
- carbonDataSize += FileUtils.sizeOf(dataAndIndexFile);
+ carbonDataSize += dataAndIndexFile.getSize();
}
}
}
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 3f26aa2..a619f15 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -36,7 +36,9 @@ import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
@@ -70,18 +72,18 @@ public class CarbonIndexFileMergeWriter {
* @param tablePath
* @param indexFileNamesTobeAdded while merging, it considers only these files.
* If null, then consider all
- * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
+ * @param isOldStoreIndexFilesPresent flag to read file footer information from carbondata
* file. This will used in case of upgrade from version
* which do not store the blocklet info to current version
* @throws IOException
*/
- private String mergeCarbonIndexFilesOfSegment(String segmentId,
- String tablePath, List<String> indexFileNamesTobeAdded,
- boolean readFileFooterFromCarbonDataFile, String uuid, String partitionPath) {
+ private String mergeCarbonIndexFilesOfSegment(String segmentId, String tablePath,
+ List<String> indexFileNamesTobeAdded, boolean isOldStoreIndexFilesPresent, String uuid,
+ String partitionPath) {
+ Segment segment = Segment.getSegment(segmentId, tablePath);
+ String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
try {
- Segment segment = Segment.getSegment(segmentId, tablePath);
- String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
- CarbonFile[] indexFiles;
+ List<CarbonFile> indexFiles = new ArrayList<>();
SegmentFileStore sfs = null;
if (segment != null && segment.getSegmentFileName() != null) {
sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName());
@@ -95,28 +97,28 @@ public class CarbonIndexFileMergeWriter {
indexFilesInPartition.add(indexCarbonFile);
}
}
- indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]);
+ indexFiles = indexFilesInPartition;
} else {
- indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+ indexFiles = indexCarbonFiles;
}
- } else {
- indexFiles = SegmentIndexFileStore
- .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration(), uuid);
}
- if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
- if (sfs == null) {
- return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
- readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId, uuid);
- } else {
- return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded, sfs,
- indexFiles, uuid, partitionPath);
+ if (sfs == null || indexFiles.isEmpty()) {
+ if (table.isHivePartitionTable()) {
+ segmentPath = partitionPath;
}
+ return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+ isOldStoreIndexFilesPresent, segmentPath, segmentId, uuid, true);
+ } else {
+ return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded,
+ isOldStoreIndexFilesPresent, sfs,
+ indexFiles.toArray(new CarbonFile[0]), uuid, partitionPath);
}
} catch (Exception e) {
- LOGGER.error(
- "Failed to merge index files in path: " + tablePath, e);
+ String message =
+ "Failed to merge index files in path: " + segmentPath + ". " + e.getMessage();
+ LOGGER.error(message);
+ throw new RuntimeException(message, e);
}
- return null;
}
/**
@@ -184,30 +186,49 @@ public class CarbonIndexFileMergeWriter {
return indexLocationMap;
}
- private String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded,
- boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles,
- String segmentId, String uuid) throws IOException {
+ public String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded,
+ boolean isOldStoreIndexFilesPresent, String segmentPath,
+ String segmentId, String uuid, boolean readBasedOnUUID) throws IOException {
+ CarbonFile[] indexFiles = null;
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
- if (readFileFooterFromCarbonDataFile) {
+ if (isOldStoreIndexFilesPresent) {
// this case will be used in case of upgrade where old store will not have the blocklet
// info in the index file and therefore blocklet info need to be read from the file footer
// in the carbondata file
- fileStore.readAllIndexAndFillBlockletInfo(segmentPath, uuid);
+ fileStore.readAllIndexAndFillBlockletInfo(segmentPath, null);
} else {
- fileStore.readAllIIndexOfSegment(segmentPath, uuid);
+ if (readBasedOnUUID) {
+ indexFiles = SegmentIndexFileStore
+ .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration(), uuid);
+ fileStore.readAllIIndexOfSegment(segmentPath, uuid);
+ } else {
+ // The uuid can be different, when we add load from external path.
+ indexFiles =
+ SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
+ fileStore.readAllIIndexOfSegment(segmentPath);
+ }
}
Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
- writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap, segmentId, uuid);
- for (CarbonFile indexFile : indexFiles) {
- indexFile.delete();
+ Map<String, List<String>> mergeToIndexFileMap = fileStore.getCarbonMergeFileToIndexFilesMap();
+ if (!mergeToIndexFileMap.containsValue(new ArrayList<>(indexMap.keySet()))) {
+ writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap, segmentId, uuid);
+ // If Alter merge index for old tables is triggered, do not delete index files immediately
+ // to avoid index file not found during concurrent queries
+ if (!isOldStoreIndexFilesPresent && indexFiles != null) {
+ for (CarbonFile indexFile : indexFiles) {
+ indexFile.delete();
+ }
+ }
}
return null;
}
public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
- List<String> indexFileNamesTobeAdded, SegmentFileStore segmentFileStore,
- CarbonFile[] indexFiles, String uuid, String partitionPath) throws IOException {
+ List<String> indexFileNamesTobeAdded, boolean isOldStoreIndexFilesPresent,
+ SegmentFileStore segmentFileStore, CarbonFile[] indexFiles, String uuid, String partitionPath)
+ throws IOException {
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
+ String newSegmentFileName = null;
// in case of partition table, merge index file to be created for each partition
if (null != partitionPath) {
for (CarbonFile indexFile : indexFiles) {
@@ -245,9 +266,10 @@ public class CarbonIndexFileMergeWriter {
for (PartitionSpec partitionSpec : partitionSpecs) {
if (partitionSpec.getLocation().toString().equals(partitionPath)) {
try {
- SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
- segmentId + CarbonCommonConstants.UNDERSCORE + uuid + "",
- partitionSpec.getPartitions(), true);
+ SegmentFileStore
+ .writeSegmentFileForPartitionTable(table.getTablePath(), mergeIndexFile,
+ partitionPath, segmentId + CarbonCommonConstants.UNDERSCORE + uuid + "",
+ partitionSpec.getPartitions(), true);
} catch (Exception ex) {
// delete merge index file if created,
// keep only index files as segment file writing is failed
@@ -260,12 +282,23 @@ public class CarbonIndexFileMergeWriter {
}
}
}
- String newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
- + CarbonTablePath.SEGMENT_EXT;
+ if (table.isIndexTable()) {
+ // To maintain same segment file name mapping between parent and SI table.
+ IndexMetadata indexMetadata = table.getIndexMetadata();
+ LoadMetadataDetails[] loadDetails = SegmentStatusManager
+ .readLoadMetadata(CarbonTablePath.getMetadataPath(indexMetadata.getParentTablePath()));
+ LoadMetadataDetails loadMetaDetail = Arrays.stream(loadDetails)
+ .filter(loadDetail -> loadDetail.getLoadName().equals(segmentId)).findFirst().get();
+ newSegmentFileName = loadMetaDetail.getSegmentFile();
+ } else {
+ // Instead of modifying existing segment file, generate uuid to write into new segment file.
+ uuid = String.valueOf(System.currentTimeMillis());
+ newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
+ + CarbonTablePath.SEGMENT_EXT;
+ }
String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
+ CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
if (!table.isHivePartitionTable()) {
- String content = SegmentStatusManager.readFileAsString(path);
try {
SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
} catch (Exception ex) {
@@ -280,18 +313,15 @@ public class CarbonIndexFileMergeWriter {
boolean status = SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName,
table.getCarbonTableIdentifier().getTableId(), segmentFileStore);
if (!status) {
- // revert to original segment file as the table status update has failed.
- SegmentStatusManager.writeStringIntoFile(path, content);
- // delete merge index file.
- for (String file : mergeIndexFiles) {
- FileFactory.getCarbonFile(file).delete();
- }
- // no need to delete index files, so return from here.
- return uuid;
+ throw new IOException("Table status update with mergeIndex file has failed");
}
}
- for (CarbonFile file : indexFiles) {
- file.delete();
+ // If Alter merge index for old tables is triggered,
+ // do not delete index files immediately to avoid index file not found during concurrent queries
+ if (!isOldStoreIndexFilesPresent) {
+ for (CarbonFile file : indexFiles) {
+ file.delete();
+ }
}
return uuid;
}
@@ -329,8 +359,9 @@ public class CarbonIndexFileMergeWriter {
* @param segmentId
*/
public String mergeCarbonIndexFilesOfSegment(String segmentId, String uuid, String tablePath,
- String partitionPath) {
- return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false, uuid, partitionPath);
+ String partitionPath, boolean isOldStoreIndexFilesPresent) {
+ return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null,
+ isOldStoreIndexFilesPresent, uuid, partitionPath);
}
/**
@@ -345,15 +376,6 @@ public class CarbonIndexFileMergeWriter {
readFileFooterFromCarbonDataFile, uuid, null);
}
- private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) {
- for (CarbonFile file : indexFiles) {
- if (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
- return true;
- }
- }
- return false;
- }
-
/**
* It writes thrift object to file
*
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 89cdfb3..f65d36f 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -115,7 +115,6 @@ This section provides the details of all the configurations required for the Car
| carbon.enable.page.level.reader.in.compaction|false|Enabling page level reader for compaction reduces the memory usage while compacting more number of segments. It allows reading only page by page instead of reading whole blocklet to memory. **NOTE:** Please refer to [file-structure-of-carbondata](./file-structure-of-carbondata.md#carbondata-file-format) to understand the storage format of CarbonData and concepts of pages.|
| carbon.concurrent.compaction | true | Compaction of different tables can be executed concurrently. This configuration determines whether to compact all qualifying tables in parallel or not. **NOTE:** Compacting concurrently is a resource demanding operation and needs more resources there by affecting the query performance also. This configuration is **deprecated** and might be removed in future releases. |
| carbon.compaction.prefetch.enable | false | Compaction operation is similar to Query + data load where in data from qualifying segments are queried and data loading performed to generate a new single segment. This configuration determines whether to query ahead data from segments and feed it for data loading. **NOTE:** This configuration is disabled by default as it needs extra resources for querying extra data. Based on the memory availability on the cluster, user can enable it to imp [...]
-| carbon.merge.index.in.segment | true | Each CarbonData file has a companion CarbonIndex file which maintains the metadata about the data. These CarbonIndex files are read and loaded into driver and is used subsequently for pruning of data during queries. These CarbonIndex files are very small in size(few KB) and are many. Reading many small files from HDFS is not efficient and leads to slow IO performance. Hence these CarbonIndex files belonging to a segment can be combined into a sin [...]
| carbon.enable.range.compaction | true | To configure Ranges-based Compaction to be used or not for RANGE_COLUMN. If true after compaction also the data would be present in ranges. |
| carbon.si.segment.merge | false | Making this true degrade the LOAD performance. When the number of small files increase for SI segments(it can happen as number of columns will be less and we store position id and reference columns), user an either set to true which will merge the data files for upcoming loads or run SI refresh command which does this job for all segments. (REFRESH INDEX <index_table>) |
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 98f5502..727bd91 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -156,11 +156,6 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
OperationContext operationContext = (OperationContext) getOperationContext();
CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
String uuid = "";
- SegmentFileStore.updateTableStatusFile(carbonTable, loadModel.getSegmentId(),
- segmentFileName + CarbonTablePath.SEGMENT_EXT,
- carbonTable.getCarbonTableIdentifier().getTableId(),
- new SegmentFileStore(carbonTable.getTablePath(),
- segmentFileName + CarbonTablePath.SEGMENT_EXT));
newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
CarbonLoaderUtil
.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, loadModel.getFactTimeStamp(),
@@ -267,6 +262,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
.getProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT));
if (!isMergeIndex) {
+ // By default carbon.merge.index.in.segment is true and this code will be used for
+ // developer debugging purpose.
Map<String, Set<String>> indexFileNameMap = (Map<String, Set<String>>) ObjectSerializationUtil
.convertStringToObject(context.getConfiguration().get("carbon.index.files.name"));
List<String> partitionList =
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
index 97d013d..cfad7cd 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
@@ -110,8 +110,8 @@ class CarbonDataFileMergeTestCaseOnSI
.addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
sql("REFRESH INDEX nonindexmerge_index1 ON TABLE nonindexmerge").collect()
checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows)
- assert(getDataFileCount("nonindexmerge_index1", "0") < 7)
- assert(getDataFileCount("nonindexmerge_index1", "1") < 7)
+ assert(getDataFileCount("nonindexmerge_index1", "0") > 7)
+ assert(getDataFileCount("nonindexmerge_index1", "1") > 7)
checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows)
}
@@ -128,12 +128,12 @@ class CarbonDataFileMergeTestCaseOnSI
sql("REFRESH INDEX nonindexmerge_index2 ON TABLE nonindexmerge WHERE SEGMENT.ID IN(0)")
.collect()
checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows)
- assert(getDataFileCount("nonindexmerge_index2", "0") < 7)
+ assert(getDataFileCount("nonindexmerge_index2", "0") > 7)
assert(getDataFileCount("nonindexmerge_index2", "1") == 20)
sql("REFRESH INDEX nonindexmerge_index2 ON TABLE nonindexmerge WHERE SEGMENT.ID IN(1)")
.collect()
checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows)
- assert(getDataFileCount("nonindexmerge_index2", "1") < 7)
+ assert(getDataFileCount("nonindexmerge_index2", "1") > 7)
checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows)
}
@@ -227,8 +227,8 @@ class CarbonDataFileMergeTestCaseOnSI
val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
.queryExecution.sparkPlan
assert(isFilterPushedDownToSI(df1))
- assert(getDataFileCount("nonindexmerge_index1", "0") < 15)
- assert(getDataFileCount("nonindexmerge_index1", "1") < 15)
+ assert(getDataFileCount("nonindexmerge_index1", "0") > 15)
+ assert(getDataFileCount("nonindexmerge_index1", "1") > 15)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants
.CARBON_SI_SEGMENT_MERGE, "true")
}
@@ -318,8 +318,8 @@ class CarbonDataFileMergeTestCaseOnSI
val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
.queryExecution.sparkPlan
assert(isFilterPushedDownToSI(df1))
- assert(getDataFileCount("nonindexmerge_index1", "0") < 15)
- assert(getDataFileCount("nonindexmerge_index1", "1") < 15)
+ assert(getDataFileCount("nonindexmerge_index1", "0") > 15)
+ assert(getDataFileCount("nonindexmerge_index1", "1") > 15)
checkAnswer(sql(" select count(*) from nonindexmerge_index1"), rows)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants
.CARBON_SI_SEGMENT_MERGE, "true")
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergeindex/CarbonIndexFileMergeTestCaseWithSI.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergeindex/CarbonIndexFileMergeTestCaseWithSI.scala
index 40f25d1..d2025fb 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergeindex/CarbonIndexFileMergeTestCaseWithSI.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergeindex/CarbonIndexFileMergeTestCaseWithSI.scala
@@ -24,10 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTestUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
class CarbonIndexFileMergeTestCaseWithSI
@@ -81,8 +78,8 @@ class CarbonIndexFileMergeTestCaseWithSI
sql("CREATE INDEX nonindexmerge_index on table nonindexmerge (name) AS 'carbondata'")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index", "0") == 20)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("DROP TABLE IF EXISTS indexmerge")
@@ -95,8 +92,8 @@ class CarbonIndexFileMergeTestCaseWithSI
sql("CREATE INDEX indexmerge_index1 on table indexmerge (name) AS 'carbondata'")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
- assert(getIndexFileCount("default_indexmerge", "0") == 0)
- assert(getIndexFileCount("default_indexmerge_index1", "0") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_indexmerge", "0") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_indexmerge_index1", "0") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""),
sql("""Select count(*) from indexmerge"""))
}
@@ -117,17 +114,17 @@ class CarbonIndexFileMergeTestCaseWithSI
s"'GLOBAL_SORT_PARTITIONS'='20')")
sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS 'carbondata'")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index1", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index1", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index1", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index1", "1") == 20)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
- assert(getIndexFileCount("default_nonindexmerge_index1", "0") == 0)
- assert(getIndexFileCount("default_nonindexmerge_index1", "1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index1", "0") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index1", "1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
}
@@ -147,15 +144,15 @@ class CarbonIndexFileMergeTestCaseWithSI
s"'GLOBAL_SORT_PARTITIONS'='20')")
sql("CREATE INDEX nonindexmerge_index2 on table nonindexmerge (name) AS 'carbondata'")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index2", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index2", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index2", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index2", "1") == 20)
sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
- assert(getIndexFileCount("default_nonindexmerge_index2", "0") == 0)
- assert(getIndexFileCount("default_nonindexmerge_index2", "1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index2", "0") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index2", "1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
}
@@ -179,15 +176,15 @@ class CarbonIndexFileMergeTestCaseWithSI
s"'GLOBAL_SORT_PARTITIONS'='20')")
sql("CREATE INDEX nonindexmerge_index3 on table nonindexmerge (name) AS 'carbondata'")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index3", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index3", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index3", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index3", "1") == 20)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
- assert(getIndexFileCount("default_nonindexmerge_index3", "0.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index3", "0.1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
@@ -215,52 +212,37 @@ class CarbonIndexFileMergeTestCaseWithSI
s"'GLOBAL_SORT_PARTITIONS'='20')")
sql("CREATE INDEX nonindexmerge_index4 on table nonindexmerge (name) AS 'carbondata'")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index4", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index4", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index4", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index4", "3") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "3") == 20)
sql("alter table nonindexmerge set tblproperties('global_sort_partitions'='20')")
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "0.1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "2.1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "0.2") == 0)
- assert(getIndexFileCount("default_nonindexmerge_index4", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index4", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index4", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index4", "3") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index4", "0.1") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index4", "2.1") == 20)
- assert(getIndexFileCount("default_nonindexmerge_index4", "0.2") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2.1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.2") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "3") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "0.1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "2.1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "0.2") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
}
- private def getIndexFileCount(tableName: String, segment: String): Int = {
- val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
- val path = CarbonTablePath
- .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
- val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonTablePath
- .INDEX_FILE_EXT)
- })
- if (carbonFiles != null) {
- carbonFiles.length
- } else {
- 0
- }
- }
-
private def createFile(fileName: String, line: Int = 10000, start: Int = 0): Boolean = {
try {
val write = new PrintWriter(fileName);
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
index 2b265d4..7543f90 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
@@ -23,8 +23,10 @@ import java.util.UUID;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
import org.apache.carbondata.hadoop.api.CarbonOutputCommitter;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hive.util.HiveCarbonUtil;
@@ -128,11 +130,13 @@ public class MapredCarbonOutputCommitter extends OutputCommitter {
Configuration configuration = jobContext.getConfiguration();
CarbonLoadModel carbonLoadModel = MapredCarbonOutputFormat.getLoadModel(configuration);
ThreadLocalSessionInfo.unsetAll();
+ CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
+ new CarbonIndexFileMergeWriter(carbonTable)
+ .mergeCarbonIndexFilesOfSegment(carbonLoadModel.getSegmentId(),
+ carbonTable.getTablePath(), false,
+ String.valueOf(carbonLoadModel.getFactTimeStamp()));
SegmentFileStore.writeSegmentFile(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
carbonLoadModel.getSegmentId(), String.valueOf(carbonLoadModel.getFactTimeStamp()));
- SegmentFileStore
- .mergeIndexAndWriteSegmentFile(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
- carbonLoadModel.getSegmentId(), String.valueOf(carbonLoadModel.getFactTimeStamp()));
CarbonTableOutputFormat.setLoadModel(configuration, carbonLoadModel);
carbonOutputCommitter.commitJob(jobContext);
} catch (Exception e) {
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
index 5c5a1b7..03c6e1c 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable
import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.metadata.schema.table.TableSchema
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonTestUtil, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.presto.server.PrestoServer
import org.apache.carbondata.presto.util.CarbonDataStoreCreator
@@ -161,9 +161,7 @@ class PrestoInsertIntoTableTestCase
file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
}
}).length == 2)
- val segmentsPath = CarbonTablePath.getSegmentFilesLocation(tablePath)
- assert(FileFactory.getCarbonFile(segmentsPath).isFileExist &&
- FileFactory.getCarbonFile(segmentsPath).listFiles(true).size() == 2)
+ assert(CarbonTestUtil.getSegmentFileCount("testdb_testtable") == 2)
val metadataFolderPath = CarbonTablePath.getMetadataPath(tablePath)
FileFactory.getCarbonFile(metadataFolderPath).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 754fc63..9405f3a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -491,19 +491,8 @@ object CarbonDataRDDFactory {
val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
carbonLoadModel.getSegmentId,
segmentMetaDataAccumulator)
- val segmentFileName =
- SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
- String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo)
- // clear segmentMetaDataAccumulator
segmentMetaDataAccumulator.reset()
- SegmentFileStore.updateTableStatusFile(
- carbonTable,
- carbonLoadModel.getSegmentId,
- segmentFileName,
- carbonTable.getCarbonTableIdentifier.getTableId,
- new SegmentFileStore(carbonTable.getTablePath, segmentFileName))
-
operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
carbonLoadModel.getSegmentId)
val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
@@ -512,6 +501,9 @@ object CarbonDataRDDFactory {
carbonLoadModel)
OperationListenerBus.getInstance()
.fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+ val segmentFileName =
+ SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
+ String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo)
val (done, writtenSegment) =
updateTableStatus(
sqlContext.sparkSession,
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index d8072cf..642387c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -352,6 +352,8 @@ class CarbonTableCompactor(
s"to ${segmentFileName} failed.")
}
} else {
+ // By default carbon.merge.index.in.segment is true and this code will be used for
+ // developer debugging purpose.
val readPath =
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
CarbonCommonConstants.FILE_SEPARATOR + carbonLoadModel.getFactTimeStamp + ".tmp"
@@ -408,7 +410,6 @@ class CarbonTableCompactor(
MVManagerInSpark.get(sc.sparkSession))
if (!statusFileUpdate) {
- // no need to call merge index if table status update has failed
LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
throw new Exception(s"Compaction failed to update metadata for table" +
diff --git a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index 70a9dd8..974035c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -17,7 +17,8 @@
package org.apache.spark.rdd
-import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
+import java.io.IOException
+import java.util.concurrent.{Executors, ExecutorService}
import scala.collection.JavaConverters._
@@ -30,7 +31,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
@@ -87,7 +88,8 @@ object CarbonMergeFilesRDD {
carbonTable.isHivePartitionTable,
readFileFooterFromCarbonDataFile,
partitionInfo,
- tempFolderPath).collect()
+ tempFolderPath,
+ currPartitionSpec).collect()
} else {
try {
if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
@@ -148,10 +150,7 @@ object CarbonMergeFilesRDD {
val message = "Merge Index files request is failed " +
s"for table ${ carbonTable.getTableUniqueName }. " + ex.getMessage
LOGGER.error(message)
- if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION,
- CarbonCommonConstants.CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION_DEFAULT)) {
- throw new RuntimeException(message, ex)
- }
+ throw new RuntimeException(message, ex)
}
}
if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
@@ -185,13 +184,25 @@ object CarbonMergeFilesRDD {
val readPath: String = CarbonTablePath.getSegmentFilesLocation(tablePath) +
CarbonCommonConstants.FILE_SEPARATOR + segmentId + "_" +
segmentFileNameToSegmentIdMap.get(segmentId) + ".tmp"
+ // Generate new timestamp for segment file writing instead of overwriting existing one.
+ val uuid = String.valueOf(System.currentTimeMillis)
+ val newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
// Merge all partition files into a single file.
- val segmentFileName: String = SegmentFileStore
- .genSegmentFileName(segmentId, segmentFileNameToSegmentIdMap.get(segmentId))
- SegmentFileStore
+ val segmentFile = SegmentFileStore
.mergeSegmentFiles(readPath,
- segmentFileName,
+ newSegmentFileName,
CarbonTablePath.getSegmentFilesLocation(tablePath))
+ if (segmentFile != null) {
+ val sfs = new SegmentFileStore(tablePath, newSegmentFileName +
+ CarbonTablePath.SEGMENT_EXT)
+ // when compact segment_index, update table status with new segment file name
+ val status = SegmentFileStore.updateTableStatusFile(carbonTable, segmentId,
+ newSegmentFileName + CarbonTablePath.SEGMENT_EXT,
+ carbonTable.getCarbonTableIdentifier.getTableId, sfs)
+ if (!status) {
+ throw new IOException("Table status update with mergeIndex file has failed")
+ }
+ }
})
}
mergeIndexSize
@@ -280,12 +291,13 @@ class CarbonMergeFilesRDD(
var segmentFile: SegmentFileStore.SegmentFile = null
var indexSize: String = ""
- if (isHivePartitionedTable && partitionInfo.isEmpty) {
+ if (isHivePartitionedTable && readFileFooterFromCarbonDataFile) {
CarbonLoaderUtil.mergeIndexFilesInPartitionedSegment(
carbonTable,
split.segmentId,
segmentFileNameToSegmentIdMap.get(split.segmentId),
- split.partitionPath)
+ split.partitionPath,
+ readFileFooterFromCarbonDataFile)
} else if (isHivePartitionedTable && !partitionInfo.isEmpty) {
val folderDetails = CarbonLoaderUtil
.mergeIndexFilesInPartitionedTempSegment(carbonTable,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 3872db5..c92bce8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -29,10 +29,11 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.MergeIndexUtil
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatusManager}
import org.apache.carbondata.core.util.{DataLoadMetrics, ObjectSerializationUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -122,10 +123,10 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
.put(loadMetadataDetails.getLoadName,
String.valueOf(loadMetadataDetails.getLoadStartTime))
})
- val segmentsToMerge =
+ val validSegments =
+ CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
+ var segmentsToMerge =
if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
- val validSegments =
- CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
validSegments.foreach { segment =>
// do not add ROW_V1 format
@@ -140,11 +141,35 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
.get
.filterNot(streamingSegment.contains(_))
}
+ validSegments.filter(x => segmentsToMerge.contains(x.getSegmentNo)).foreach { segment =>
+ val segmentFile = segment.getSegmentFileName
+ val sfs = new SegmentFileStore(carbonMainTable.getTablePath, segmentFile)
+ if (sfs.getSegmentFile != null) {
+ val indexFiles = sfs.getIndexCarbonFiles
+ val segmentPath = CarbonTablePath
+ .getSegmentPath(carbonMainTable.getTablePath, segment.getSegmentNo)
+ if (indexFiles.size() == 0) {
+ LOGGER.warn(s"No index files present in path: $segmentPath to merge")
+ // call merge if segments have index files
+ segmentsToMerge = segmentsToMerge.toStream
+ .filterNot(s => s.equals(segment.getSegmentNo)).toList
+ }
+ }
+ }
// in case of merge index file creation using Alter DDL command
// readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy
// store (store <= 1.1 version) and create merge Index file as per new store so that
// old store is also upgraded to new store
val startTime = System.currentTimeMillis()
+ val partitionInfo: util.List[String] = operationContext
+ .getProperty("partitionPath")
+ .asInstanceOf[util.List[String]]
+ val currPartitionSpec = operationContext.getProperty("carbon.currentpartition")
+ val currPartitionSpecOption: Option[String] = if (currPartitionSpec == null) {
+ None
+ } else {
+ Option(currPartitionSpec.asInstanceOf[String])
+ }
CarbonMergeFilesRDD.mergeIndexFiles(
sparkSession = sparkSession,
segmentIds = segmentsToMerge,
@@ -152,7 +177,9 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
tablePath = carbonMainTable.getTablePath,
carbonTable = carbonMainTable,
mergeIndexProperty = true,
- readFileFooterFromCarbonDataFile = true)
+ readFileFooterFromCarbonDataFile = true,
+ partitionInfo = partitionInfo,
+ currPartitionSpec = currPartitionSpecOption)
LOGGER.info("Total time taken for merge index "
+ (System.currentTimeMillis() - startTime) + "ms")
// clear Block index Cache
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index b01c125..e319185 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -327,6 +327,15 @@ case class CarbonAddLoadCommand(
System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
segmentPath,
new util.HashMap[String, String](options.asJava))
+ // This event will trigger merge index job, only trigger it if it is carbon file
+ if (isCarbonFormat) {
+ CarbonLoaderUtil.mergeIndexFilesInAddLoadSegment(carbonTable,
+ model.getSegmentId,
+ segmentPath,
+ model.getFactTimeStamp.toString)
+ // clear Block index Cache
+ SegmentFileStore.clearBlockIndexCache(carbonTable, model.getSegmentId)
+ }
val writeSegment =
if (isCarbonFormat) {
SegmentFileStore.writeSegmentFile(carbonTable, segment)
@@ -335,22 +344,6 @@ case class CarbonAddLoadCommand(
carbonTable, segment, partitionSpecOp.orNull, partitionDataFiles.asJava)
}
- // This event will trigger merge index job, only trigger it if it is carbon file
- if (isCarbonFormat) {
- operationContext.setProperty(
- carbonTable.getTableUniqueName + "_Segment",
- model.getSegmentId)
- // when this event is triggered, SI load listener will be called for all the SI tables under
- // this main table, no need to load the SI tables for add load command, so add this property
- // to check in SI load event listener to avoid loading to SI.
- operationContext.setProperty("isAddLoad", "true")
- val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
- new LoadTablePreStatusUpdateEvent(
- carbonTable.getCarbonTableIdentifier,
- model)
- OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
- }
-
val success = if (writeSegment) {
SegmentFileStore.updateTableStatusFile(
carbonTable,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 7933a25..eaa62d8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.partition
import java.util
import scala.collection.JavaConverters._
+import scala.util.control.Breaks.{break, breakable}
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -31,11 +32,13 @@ import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonUtil, ObjectSerializationUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{withEvents, AlterTableMergeIndexEvent, OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
@@ -105,20 +108,6 @@ case class CarbonAlterTableAddHivePartitionCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
// Partitions with physical data should be registered to as a new segment.
if (partitionSpecsAndLocsTobeAdded != null && partitionSpecsAndLocsTobeAdded.size() > 0) {
- val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table.getTablePath,
- partitionSpecsAndLocsTobeAdded)
- if (segmentFile != null) {
- val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath)
- val tableColumns = table.getTableInfo.getFactTable.getListOfColumns.asScala
- val isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) =>
- columnSchemas.asScala.exists { col =>
- tableColumns.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId))
- } && columnSchemas.size() == tableColumns.length
- }
- if (!isSameSchema) {
- throw new UnsupportedOperationException(
- "Schema of index files located in location is not matching with current table schema")
- }
val loadModel = new CarbonLoadModel
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
@@ -128,42 +117,28 @@ case class CarbonAlterTableAddHivePartitionCommand(
loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
// create operationContext to fire load events
val operationContext: OperationContext = new OperationContext
- val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(
- sparkSession = sparkSession,
- carbonLoadModel = loadModel,
- uuid = "",
- factPath = "",
- null,
- null,
- isOverwriteTable = false,
- isDataFrame = false,
- updateModel = None,
- operationContext = operationContext)
+ var hasIndexFiles = false
+ breakable {
+ partitionSpecsAndLocsTobeAdded.asScala.foreach(partitionSpecAndLocs => {
+ val location = partitionSpecAndLocs.getLocation.toString
+ val carbonFile = FileFactory.getCarbonFile(location)
+ val listFiles: Array[CarbonFile] = carbonFile.listFiles(new CarbonFileFilter() {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+ file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
+ }
+ })
+ if (listFiles != null && listFiles.length > 0) {
+ hasIndexFiles = true
+ break()
+ }
+ })
+ }
+ // only if the partition path has some index files,
+ // make entry in table status and trigger merge index event.
+ if (hasIndexFiles) {
// Create new entry in tablestatus file
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)
- val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
- val segmentFileName =
- SegmentFileStore.genSegmentFileName(
- loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) +
- CarbonTablePath.SEGMENT_EXT
- newMetaEntry.setSegmentFile(segmentFileName)
- // set path to identify it as external added partition
- newMetaEntry.setPath(partitionSpecsAndLocsTobeAdded.asScala
- .map(_.getLocation.toString).mkString(","))
- val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
- CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)
- val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
- SegmentFileStore.writeSegmentFile(segmentFile, segmentPath)
- CarbonLoaderUtil.populateNewLoadMetaEntry(
- newMetaEntry,
- SegmentStatus.SUCCESS,
- loadModel.getFactTimeStamp,
- true)
- // Add size to the entry
- CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId, table)
- // Make the load as success in table status
- CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false)
-
// Normally, application will use Carbon SDK to write files into a partition folder, then
// add the folder to partitioned carbon table.
// If there are many threads writes to the same partition folder, there will be many
@@ -177,6 +152,10 @@ case class CarbonAlterTableAddHivePartitionCommand(
} else {
Some(Seq(loadModel.getSegmentId).toList)
}
+ operationContext.setProperty("partitionPath",
+ partitionSpecsAndLocsTobeAdded.asScala.map(_.getLocation.toString).asJava)
+ operationContext.setProperty("carbon.currentpartition",
+ ObjectSerializationUtil.convertObjectToString(partitionSpecsAndLocsTobeAdded))
val alterTableModel = AlterTableModel(
dbName = Some(table.getDatabaseName),
tableName = table.getTableName,
@@ -185,14 +164,64 @@ case class CarbonAlterTableAddHivePartitionCommand(
factTimeStamp = Some(System.currentTimeMillis()),
customSegmentIds = customSegmentIds)
val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table, alterTableModel)
- OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new OperationContext)
- // fire event to load data to materialized views
- CommonLoadUtils.firePostLoadEvents(sparkSession,
- loadModel,
- tableIndexes,
- indexOperationContext,
- table,
- operationContext)
+ OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, operationContext)
+ val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
+ val segmentFileName =
+ SegmentFileStore.genSegmentFileName(
+ loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) +
+ CarbonTablePath.SEGMENT_EXT
+ newMetaEntry.setSegmentFile(segmentFileName)
+ // set path to identify it as external added partition
+ newMetaEntry.setPath(partitionSpecsAndLocsTobeAdded.asScala
+ .map(_.getLocation.toString).mkString(","))
+ val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
+ CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)
+ val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
+ val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table
+ .getTablePath,
+ partitionSpecsAndLocsTobeAdded)
+ if (segmentFile != null) {
+ val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath)
+ val tableColumns = table.getTableInfo.getFactTable.getListOfColumns.asScala
+ val isSameSchema = indexToSchemas.asScala.exists { case (key, columnSchemas) =>
+ columnSchemas.asScala.exists { col =>
+ tableColumns.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId))
+ } && columnSchemas.size() == tableColumns.length
+ }
+ if (!isSameSchema) {
+ throw new UnsupportedOperationException(
+ "Schema of index files located in location is not matching with current table schema")
+ }
+ val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(
+ sparkSession = sparkSession,
+ carbonLoadModel = loadModel,
+ uuid = "",
+ factPath = "",
+ new util.HashMap[String, String](),
+ new util.HashMap[String, String](),
+ isOverwriteTable = false,
+ isDataFrame = false,
+ updateModel = None,
+ operationContext = operationContext)
+ SegmentFileStore.writeSegmentFile(segmentFile, segmentPath)
+ CarbonLoaderUtil.populateNewLoadMetaEntry(
+ newMetaEntry,
+ SegmentStatus.SUCCESS,
+ loadModel.getFactTimeStamp,
+ true)
+ // Add size to the entry
+ CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry,
+ loadModel.getSegmentId, table)
+ // Make the load as success in table status
+ CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false)
+ // fire event to load data to materialized views
+ CommonLoadUtils.firePostLoadEvents(sparkSession,
+ loadModel,
+ tableIndexes,
+ indexOperationContext,
+ table,
+ operationContext)
+ }
}
}
Seq.empty[Row]
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
index 6bb14cf..8cda55f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
@@ -33,8 +33,10 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.index.Segment
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
@@ -85,7 +87,21 @@ class AlterTableMergeIndexSIEventListener
.asScala
val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
validSegments.foreach { segment =>
- validSegmentIds += segment.getSegmentNo
+ val segmentFile = segment.getSegmentFileName
+ val sfs = new SegmentFileStore(indexCarbonTable.getTablePath, segmentFile)
+ if (sfs.getSegmentFile != null) {
+ val indexFiles = sfs.getIndexCarbonFiles
+ val segmentPath = CarbonTablePath
+ .getSegmentPath(indexCarbonTable.getTablePath, segment.getSegmentNo)
+ if (indexFiles.size() == 0) {
+ LOGGER.warn(s"No index files present in path: $segmentPath to merge")
+ } else {
+ // call merge only if segments having index files
+ validSegmentIds += segment.getSegmentNo
+ }
+ } else {
+ validSegmentIds += segment.getSegmentNo
+ }
}
// Just launch job to merge index for all index tables
CarbonMergeFilesRDD.mergeIndexFiles(
@@ -94,7 +110,8 @@ class AlterTableMergeIndexSIEventListener
SegmentStatusManager.mapSegmentToStartTime(carbonMainTable),
indexCarbonTable.getTablePath,
indexCarbonTable,
- mergeIndexProperty = true)
+ mergeIndexProperty = true,
+ readFileFooterFromCarbonDataFile = true)
}
}
}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
index 0b40ed8..15f9048 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
@@ -43,10 +43,6 @@ class SILoadEventListener extends OperationEventListener with Logging {
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
event match {
case _: LoadTablePreStatusUpdateEvent =>
- if (operationContext.getProperty("isAddLoad") != null &&
- operationContext.getProperty("isAddLoad").toString.toBoolean) {
- return
- }
LOGGER.info("Load pre status update event-listener called")
val loadTablePreStatusUpdateEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent]
val carbonLoadModel = loadTablePreStatusUpdateEvent.getCarbonLoadModel
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index b371947..a7677ea 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -48,11 +48,6 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
val loadTablePostStatusUpdateEvent = event.asInstanceOf[LoadTablePostStatusUpdateEvent]
val carbonLoadModel = loadTablePostStatusUpdateEvent.getCarbonLoadModel
val sparkSession = SparkSession.getActiveSession.get
- // Avoid loading segment to SI for add load command
- if (operationContext.getProperty("isAddLoad") != null &&
- operationContext.getProperty("isAddLoad").toString.toBoolean) {
- return
- }
if (CarbonProperties.getInstance().isSIRepairEnabled(carbonLoadModel.getDatabaseName,
carbonLoadModel.getTableName)) {
// when Si creation and load to main table are parallel, get the carbonTable from the
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
index cfb79b2..7fc29bd 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.status.IndexStatus
import org.apache.carbondata.core.locks.ICarbonLock
+import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
@@ -100,13 +101,6 @@ object Compactor {
return
}
allSegmentsLock ++= segmentLocks
- CarbonInternalLoaderUtil.updateLoadMetadataWithMergeStatus(
- indexCarbonTable,
- loadsToMerge,
- validSegments.head,
- segmentToSegmentTimestampMap,
- segmentIdToLoadStartTimeMapping(validSegments.head),
- SegmentStatus.INSERT_IN_PROGRESS, 0L, List.empty.asJava)
// merge index files
CarbonMergeFilesRDD.mergeIndexFiles(sqlContext.sparkSession,
@@ -132,7 +126,14 @@ object Compactor {
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
indexCarbonTable,
loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sqlContext)
-
+ if (rebuiltSegments.isEmpty) {
+ for (eachSegment <- secondaryIndexModel.validSegments) {
+ SegmentFileStore
+ .writeSegmentFile(indexCarbonTable,
+ eachSegment,
+ String.valueOf(carbonLoadModel.getFactTimeStamp))
+ }
+ }
CarbonInternalLoaderUtil.updateLoadMetadataWithMergeStatus(
indexCarbonTable,
loadsToMerge,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index e493d47..96cf8c7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -168,6 +168,7 @@ object SecondaryIndexCreator {
}
var successSISegments: List[String] = List()
var failedSISegments: List[String] = List()
+ val carbonLoadModel: CarbonLoadModel = getCopyObject(secondaryIndexModel)
val sort_scope = indexCarbonTable.getTableInfo.getFactTable.getTableProperties
.get("sort_scope")
if (sort_scope != null && sort_scope.equalsIgnoreCase("global_sort")) {
@@ -179,7 +180,6 @@ object SecondaryIndexCreator {
.submit(new Callable[Array[(String, (LoadMetadataDetails, ExecutionErrors))]] {
@throws(classOf[Exception])
override def call(): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
- val carbonLoadModel = getCopyObject(secondaryIndexModel)
// to load as a global sort SI segment,
// we need to query main table along with position reference projection
val projections = indexCarbonTable.getCreateOrderColumn
@@ -237,10 +237,6 @@ object SecondaryIndexCreator {
carbonLoadModel,
hadoopConf = configuration, segmentMetaDataAccumulator)
}
- SegmentFileStore
- .writeSegmentFile(indexCarbonTable,
- eachSegment,
- String.valueOf(carbonLoadModel.getFactTimeStamp))
segmentToLoadStartTimeMap
.put(eachSegment, String.valueOf(carbonLoadModel.getFactTimeStamp))
result
@@ -292,7 +288,6 @@ object SecondaryIndexCreator {
.put("carbonConf", SparkSQLUtil.sessionState(sc.sparkSession).newHadoopConf())
var eachSegmentSecondaryIndexCreationStatus: Array[(String, Boolean)] = Array.empty
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(segId, indexCarbonTable)
- val carbonLoadModel = getCopyObject(secondaryIndexModel)
carbonLoadModel
.setFactTimeStamp(secondaryIndexModel.segmentIdToLoadStartTimeMapping(eachSegment))
carbonLoadModel.setTablePath(secondaryIndexModel.carbonTable.getTablePath)
@@ -301,10 +296,6 @@ object SecondaryIndexCreator {
carbonLoadModel,
secondaryIndexModel.secondaryIndex,
segId, execInstance, indexCarbonTable, forceAccessSegment).collect()
- SegmentFileStore
- .writeSegmentFile(indexCarbonTable,
- segId,
- String.valueOf(carbonLoadModel.getFactTimeStamp))
segmentToLoadStartTimeMap.put(segId, carbonLoadModel.getFactTimeStamp.toString)
if (secondaryIndexCreationStatus.length > 0) {
eachSegmentSecondaryIndexCreationStatus = secondaryIndexCreationStatus
@@ -349,16 +340,6 @@ object SecondaryIndexCreator {
var tableStatusUpdateForFailure = false
if (successSISegments.nonEmpty && !isCompactionCall) {
- tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
- successSISegments,
- secondaryIndexModel.carbonLoadModel.getDatabaseName,
- secondaryIndexModel.secondaryIndex.indexName,
- segmentStatus,
- secondaryIndexModel.segmentIdToLoadStartTimeMapping,
- segmentToLoadStartTimeMap,
- indexCarbonTable,
- secondaryIndexModel.sqlContext.sparkSession)
-
// merge index files for success segments in case of only load
CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
successSISegments,
@@ -384,18 +365,6 @@ object SecondaryIndexCreator {
indexCarbonTable,
loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc)
- tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
- successSISegments,
- secondaryIndexModel.carbonLoadModel.getDatabaseName,
- secondaryIndexModel.secondaryIndex.indexName,
- SegmentStatus.SUCCESS,
- secondaryIndexModel.segmentIdToLoadStartTimeMapping,
- segmentToLoadStartTimeMap,
- indexCarbonTable,
- secondaryIndexModel.sqlContext.sparkSession,
- carbonLoadModelForMergeDataFiles.getFactTimeStamp,
- rebuiltSegments)
-
if (isInsertOverwrite) {
val overriddenSegments = SegmentStatusManager
.readLoadMetadata(indexCarbonTable.getMetadataPath)
@@ -412,6 +381,24 @@ object SecondaryIndexCreator {
indexTable,
secondaryIndexModel.sqlContext.sparkSession)
}
+ if (rebuiltSegments.isEmpty) {
+ for (loadMetadata <- loadMetadataDetails) {
+ SegmentFileStore
+ .writeSegmentFile(indexCarbonTable, loadMetadata.getLoadName,
+ String.valueOf(loadMetadata.getLoadStartTime))
+ }
+ tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
+ successSISegments,
+ secondaryIndexModel.carbonLoadModel.getDatabaseName,
+ secondaryIndexModel.secondaryIndex.indexName,
+ SegmentStatus.SUCCESS,
+ secondaryIndexModel.segmentIdToLoadStartTimeMapping,
+ segmentToLoadStartTimeMap,
+ indexCarbonTable,
+ secondaryIndexModel.sqlContext.sparkSession,
+ carbonLoadModelForMergeDataFiles.getFactTimeStamp,
+ rebuiltSegments)
+ }
}
if (!isCompactionCall) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 0977b77..63bd726 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.Logger
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.CarbonMergeFilesRDD
+import org.apache.spark.rdd.CarbonMergeFilesRDD.isPropertySet
import org.apache.spark.sql.{CarbonEnv, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
@@ -45,17 +45,18 @@ import org.apache.carbondata.core.datastore.block.{TableBlockInfo, TaskBlockInfo
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
-import org.apache.carbondata.core.locks.CarbonLockUtil
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
import org.apache.carbondata.core.metadata.datatype.{DataType, StructField, StructType}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableOutputFormat}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -63,7 +64,6 @@ import org.apache.carbondata.indexserver.IndexServer
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.MergeResultImpl
import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
import org.apache.carbondata.spark.rdd.CarbonSparkPartition
@@ -217,8 +217,17 @@ object SecondaryIndexUtil {
}
if (finalMergeStatus) {
if (null != mergeStatus && mergeStatus.length != 0) {
+ // Segment file is not yet written during SI load, in such case we can delete old index
+ // files immediately. If segment file is already present and SI refresh is triggered, then
+ // do not delete immediately to avoid failures during parallel queries.
val validSegmentsToUse = validSegments.asScala
- .filter(segment => mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo))
+ .filter(segment => {
+ val segmentFilePath = CarbonTablePath.getSegmentFilesLocation(tablePath) +
+ CarbonCommonConstants.FILE_SEPARATOR +
+ segment.getSegmentFileName
+ mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo) &&
+ !FileFactory.isFileExist(segmentFilePath)
+ })
deleteOldIndexOrMergeIndexFiles(
carbonLoadModel.getFactTimeStamp,
validSegmentsToUse.toList.asJava,
@@ -231,73 +240,36 @@ object SecondaryIndexUtil {
} else {
siRebuildRDD.partitions.foreach { partition =>
val carbonSparkPartition = partition.asInstanceOf[CarbonSparkPartition]
- deleteOldCarbonDataFiles(carbonSparkPartition)
+ deleteOldCarbonDataFiles(carbonSparkPartition, validSegmentsToUse.toList)
}
}
+ val segmentToLoadStartTimeMap: scala.collection.mutable.Map[String, java.lang.Long] =
+ scala.collection.mutable.Map()
+ // merge index files and write segment file for merged segments
mergedSegments.asScala.map { seg =>
+ val segmentPath = CarbonTablePath.getSegmentPath(tablePath, seg.getLoadName)
+ try {
+ new CarbonIndexFileMergeWriter(indexCarbonTable)
+ .writeMergeIndexFileBasedOnSegmentFolder(null, false, segmentPath,
+ seg.getLoadName, carbonLoadModel.getFactTimeStamp.toString,
+ true)
+ } catch {
+ case e: IOException =>
+ val message =
+ s"Failed to merge index files in path: $segmentPath. ${ e.getMessage() } "
+ LOGGER.error(message)
+ throw new RuntimeException(message, e)
+ }
val file = SegmentFileStore.writeSegmentFile(
indexCarbonTable,
seg.getLoadName,
carbonLoadModel.getFactTimeStamp.toString,
null,
null)
- val segment = new Segment(seg.getLoadName, file)
- SegmentFileStore.updateTableStatusFile(indexCarbonTable,
- seg.getLoadName,
- file,
- indexCarbonTable.getCarbonTableIdentifier.getTableId,
- new SegmentFileStore(tablePath, segment.getSegmentFileName))
- segment
- }
-
- val statusLock =
- new SegmentStatusManager(indexCarbonTable.getAbsoluteTableIdentifier).getTableStatusLock
- try {
- val retryCount = CarbonLockUtil.getLockProperty(CarbonCommonConstants
- .NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
- CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT)
- val maxTimeout = CarbonLockUtil.getLockProperty(CarbonCommonConstants
- .MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
- CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT)
- if (statusLock.lockWithRetries(retryCount, maxTimeout)) {
- val endTime = System.currentTimeMillis()
- val loadMetadataDetails = SegmentStatusManager
- .readLoadMetadata(indexCarbonTable.getMetadataPath)
- loadMetadataDetails.foreach(loadMetadataDetail => {
- if (rebuiltSegments.contains(loadMetadataDetail.getLoadName)) {
- loadMetadataDetail.setLoadStartTime(carbonLoadModel.getFactTimeStamp)
- loadMetadataDetail.setLoadEndTime(endTime)
- CarbonLoaderUtil
- .addDataIndexSizeIntoMetaEntry(loadMetadataDetail,
- loadMetadataDetail.getLoadName,
- indexCarbonTable)
- }
- })
- SegmentStatusManager
- .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
- loadMetadataDetails)
- } else {
- throw new RuntimeException(
- "Not able to acquire the lock for table status updation for table " + databaseName +
- "." + indexCarbonTable.getTableName)
- }
- } finally {
- if (statusLock != null) {
- statusLock.unlock()
- }
- }
- // clear the indexSchema cache for the merged segments, as the index files and
- // data files are rewritten after compaction
- if (mergedSegments.size > 0) {
-
- // merge index files for merged segments
- CarbonMergeFilesRDD.mergeIndexFiles(sc.sparkSession,
- rebuiltSegments.toSeq,
- segmentIdToLoadStartTimeMap,
- indexCarbonTable.getTablePath,
- indexCarbonTable, mergeIndexProperty = false
- )
-
+ segmentToLoadStartTimeMap.put(seg.getLoadName,
+ carbonLoadModel.getFactTimeStamp)
+ // clear the indexSchema cache for the merged segments, as the index files and
+ // data files are rewritten after compaction
if (CarbonProperties.getInstance()
.isDistributedPruningEnabled(indexCarbonTable.getDatabaseName,
indexCarbonTable.getTableName)) {
@@ -310,7 +282,24 @@ object SecondaryIndexUtil {
case _: Exception =>
}
}
-
+ val segment = new Segment(seg.getLoadName, file)
+ segment
+ }
+ // Here can exclude updating table status for compaction type.
+ // For compaction call, we can directly update table status with Compacted/Success
+ // with logic present in CarbonTableCompactor.
+ if (compactionType == null) {
+ FileInternalUtil.updateTableStatus(
+ rebuiltSegments.toList,
+ carbonLoadModel.getDatabaseName,
+ indexCarbonTable.getTableName,
+ SegmentStatus.SUCCESS,
+ segmentToLoadStartTimeMap,
+ new java.util.HashMap[String, String],
+ indexCarbonTable,
+ sc.sparkSession,
+ carbonLoadModel.getFactTimeStamp,
+ rebuiltSegments)
IndexStoreManager.getInstance
.clearInvalidSegments(indexCarbonTable, rebuiltSegments.toList.asJava)
}
@@ -343,7 +332,7 @@ object SecondaryIndexUtil {
indexCarbonTable: CarbonTable): Unit = {
// delete the index/merge index carbonFile of old data files
validSegments.asScala.foreach { segment =>
- SegmentFileStore.getIndexFilesListForSegment(segment, indexCarbonTable.getTablePath)
+ getIndexFilesListForSegment(segment, indexCarbonTable.getTablePath)
.asScala
.foreach { indexFile =>
if (DataFileUtil.getTimeStampFromFileName(indexFile).toLong < factTimeStamp) {
@@ -354,15 +343,36 @@ object SecondaryIndexUtil {
}
/**
+ * This method returns the list of index/merge index files for a segment in carbonTable.
+ */
+ @throws[IOException]
+ private def getIndexFilesListForSegment(segment: Segment, tablePath: String): util.Set[String] = {
+ var indexFiles : util.Set[String] = new util.HashSet[String]
+ val segmentFileStore = new SegmentFileStore(tablePath,
+ segment.getSegmentFileName)
+ val segmentPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo)
+ if (segmentFileStore.getSegmentFile == null) {
+ indexFiles = new SegmentIndexFileStore()
+ .getMergeOrIndexFilesFromSegment(segmentPath).keySet
+ } else {
+ indexFiles = segmentFileStore.getIndexAndMergeFiles.keySet
+ }
+ indexFiles
+ }
+
+ /**
* This method delete the carbondata files present in pertition of during small
* datafile merge after loading a segment to SI table. It should be deleted after
* data file merge operation, else, concurrency can cause file not found issues.
*/
- private def deleteOldCarbonDataFiles(partition: CarbonSparkPartition): Unit = {
+ private def deleteOldCarbonDataFiles(partition: CarbonSparkPartition,
+ validSegmentsToUse: List[Segment]): Unit = {
val splitList = partition.split.value.getAllSplits
splitList.asScala.foreach { split =>
- val carbonFile = FileFactory.getCarbonFile(split.getFilePath)
- carbonFile.delete()
+ if (validSegmentsToUse.contains(split.getSegment)) {
+ val mergedCarbonDataFile = FileFactory.getCarbonFile(split.getFilePath)
+ mergedCarbonDataFile.delete()
+ }
}
}
/**
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index f909537..e4be592 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -17,10 +17,12 @@
package org.apache.carbondata.spark.testsuite.datacompaction
+import java.io.IOException
import java.util
import scala.collection.JavaConverters._
+import mockit.{Mock, MockUp}
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.junit.Assert
@@ -32,11 +34,13 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier
import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTestUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
class CarbonIndexFileMergeTestCase
extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -69,7 +73,7 @@ class CarbonIndexFileMergeTestCase
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("DROP TABLE IF EXISTS indexmerge")
@@ -81,7 +85,7 @@ class CarbonIndexFileMergeTestCase
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
- assert(getIndexFileCount("default_indexmerge", "0") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_indexmerge", "0") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""),
sql("""Select count(*) from indexmerge"""))
}
@@ -101,13 +105,16 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getSegmentFileCount("default_nonindexmerge") == 2)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
+ // creates new segment file instead of updating
+ assert(CarbonTestUtil.getSegmentFileCount("default_nonindexmerge") == 4)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
}
@@ -126,11 +133,11 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
}
@@ -150,12 +157,12 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
@@ -178,13 +185,13 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
@@ -223,19 +230,19 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2.1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
}
@@ -291,20 +298,20 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "0.2") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.2") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
}
@@ -329,24 +336,24 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "true")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')"
)
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "4") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "4") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2.1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), Seq(Row(300000)))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "false")
@@ -373,24 +380,24 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')")
val rows = sql("""Select count(*) from nonindexmerge""").collect()
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
.addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "true")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='20')"
)
- assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
- assert(getIndexFileCount("default_nonindexmerge", "4") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
- assert(getIndexFileCount("default_nonindexmerge", "0.2") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "4") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.2") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), Seq(Row(300000)))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "false")
@@ -411,10 +418,32 @@ class CarbonIndexFileMergeTestCase
| TBLPROPERTIES('SORT_COLUMNS'='city,name')
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE partitionTable OPTIONS('header'='false')")
- assert(getIndexFileCount("default_partitionTable", "0") == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_partitionTable", "0") == 0)
sql("DROP TABLE IF EXISTS partitionTable")
}
+ test("Verify command of index merge for partition table") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+ sql("DROP TABLE IF EXISTS nonindexmerge")
+ sql(
+ """
+ | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING)
+ | PARTITIONED BY(age INT)
+ | STORED AS carbondata
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge " +
+ s"partition(age='20') OPTIONS('header'='false')")
+ val rows = sql("""Select count(*) from nonindexmerge""").collect()
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 1)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+ sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
+ assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0",
+ CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+ checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
+ }
+
test("Verify index merge for streaming table") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
@@ -428,7 +457,8 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')")
// check for one merge index file
assert(
- getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+ CarbonTestUtil.getIndexFileCount("default_streamingTable",
+ "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
sql("DROP TABLE IF EXISTS streamingTable")
}
@@ -444,15 +474,15 @@ class CarbonIndexFileMergeTestCase
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')")
// check for zero merge index file
- assert(
- getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_streamingTable",
+ "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
// check for one index file
- assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.INDEX_FILE_EXT) == 1)
+ assert(CarbonTestUtil.getIndexFileCount("default_streamingTable", "0") == 1)
sql("alter table streamingTable compact 'segment_index'")
sql("alter table streamingTable compact 'segment_index' where segment.id in (0)")
// check for one merge index file
- assert(
- getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+ assert(CarbonTestUtil.getIndexFileCount("default_streamingTable",
+ "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
sql("DROP TABLE IF EXISTS streamingTable")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
@@ -470,14 +500,14 @@ class CarbonIndexFileMergeTestCase
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')")
// check for zero merge index file
- assert(
- getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
+ assert(CarbonTestUtil.getIndexFileCount("default_streamingTable",
+ "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
// check for one index file
- assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.INDEX_FILE_EXT) == 1)
+ assert(CarbonTestUtil.getIndexFileCount("default_streamingTable", "0") == 1)
sql("alter table streamingTable compact 'segment_index' where segment.id in (0)")
// check for one merge index file
- assert(
- getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+ assert(CarbonTestUtil.getIndexFileCount("default_streamingTable",
+ "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
sql("DROP TABLE IF EXISTS streamingTable")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
@@ -506,6 +536,60 @@ class CarbonIndexFileMergeTestCase
assert(!mergeFileNameIsNull("0", "default", "merge_index_cache"))
}
+ test("verify load when merge index fails") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+ val mockMethod = new MockUp[CarbonIndexFileMergeWriter]() {
+ @Mock
+ def writeMergeIndexFileBasedOnSegmentFolder
+ (indexFileNamesTobeAdded: util.List[String], isOldStoreIndexFilesPresent: Boolean,
+ segmentPath: String, segmentId: String, uuid: String, readBasedOnUUID: Boolean): String = {
+ throw new IOException("mock failure reason")
+ }
+ }
+ sql("DROP TABLE IF EXISTS indexmerge")
+ sql(
+ """
+ | CREATE TABLE indexmerge(id INT, name STRING, city STRING, age INT)
+ | STORED AS carbondata
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name')
+ """.stripMargin)
+ intercept[RuntimeException] {
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false')")
+ }
+ checkAnswer(sql("Select count(*) from indexmerge"), Seq(Row(0)))
+ sql("DROP TABLE indexmerge")
+ mockMethod.tearDown()
+ }
+
+ test("verify load when merge index fails for partition table") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+ val mockMethod = new MockUp[CarbonLoaderUtil]() {
+ @Mock
+ def mergeIndexFilesInPartitionedTempSegment
+ (table: CarbonTable, segmentId: String, partitionPath: String,
+ partitionInfo: util.List[String], uuid: String, tempFolderPath: String,
+ currPartitionSpec: String): SegmentFileStore.FolderDetails = {
+ throw new IOException("mock failure reason")
+ }
+ }
+ sql("DROP TABLE IF EXISTS indexmergePartition")
+ sql(
+ """
+ | CREATE TABLE indexmergePartition(id INT, name STRING, city STRING)
+ | STORED AS carbondata partitioned by(age INT)
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name')
+ """.stripMargin)
+ intercept[RuntimeException] {
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmergePartition " +
+ s"OPTIONS('header'='false')")
+ }
+ checkAnswer(sql("Select count(*) from indexmergePartition"), Seq(Row(0)))
+ sql("DROP TABLE indexmergePartition")
+ mockMethod.tearDown()
+ }
+
private def mergeFileNameIsNull(segmentId: String, dbName: String, tableName: String): Boolean = {
val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), tableName)(sqlContext.sparkSession)
val indexFactory = IndexStoreManager.getInstance().getDefaultIndex(carbonTable)
@@ -520,33 +604,6 @@ class CarbonIndexFileMergeTestCase
identifiers.forall(identifier => identifier.getMergeIndexFileName == null)
}
- private def getIndexFileCount(tableName: String,
- segment: String,
- extension: String = CarbonTablePath.INDEX_FILE_EXT): Int = {
- val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
- val path = CarbonTablePath
- .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
- val carbonFiles = if (table.isHivePartitionTable) {
- FileFactory.getCarbonFile(table.getAbsoluteTableIdentifier.getTablePath)
- .listFiles(true, new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = {
- file.getName.endsWith(extension)
- }
- })
- } else {
- FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = {
- file.getName.endsWith(extension)
- }
- })
- }
- if (carbonFiles != null) {
- carbonFiles.size()
- } else {
- 0
- }
- }
-
private def getIndexOrMergeIndexFileSize(carbonTable: CarbonTable,
segmentId: String,
fileExtension: String): Long = {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index c418e49..4ad3167 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTestUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
@@ -84,13 +84,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
)
sql("""update zerorows d set (d.c2) = (d.c2 + 1) where d.c1 = 'e'""").collect()
sql("clean files for table iud.zerorows options('force'='true')")
- val carbonTable = CarbonEnv.getCarbonTable(Some("iud"), "zerorows")(sqlContext.sparkSession)
- val segmentPath = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(carbonTable
- .getTablePath, "2"))
- assert(!segmentPath.exists())
- val segmentFileLocation = FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(
- carbonTable.getTablePath))
- assert(segmentFileLocation.listFiles().length == 3)
+ assert(CarbonTestUtil.getSegmentFileCount("iud_zerorows") == 3)
CarbonProperties.getInstance().addProperty("carbon.clean.file.force.allowed", "true")
sql("""drop table iud.zerorows""")
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 2efd56b..9867099 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -196,11 +196,12 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
// mergeIndex is true, the segment file not need to be written
// and will be written during merging index
if (partitionSpec != null && !isMergeIndex) {
+ // By default carbon.merge.index.in.segment is true and this code will be used for
+ // developer debugging purpose.
try {
- SegmentFileStore
- .writeSegmentFile(carbonLoadModel.getTablePath(), carbonLoadModel.getTaskNo(),
- partitionSpec.getLocation().toString(), carbonLoadModel.getFactTimeStamp() + "",
- partitionSpec.getPartitions());
+ SegmentFileStore.writeSegmentFileForPartitionTable(carbonLoadModel.getTablePath(),
+ carbonLoadModel.getTaskNo(), partitionSpec.getLocation().toString(),
+ carbonLoadModel.getFactTimeStamp() + "", partitionSpec.getPartitions());
} catch (IOException e) {
throw e;
}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 6d8cae7..6464306 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -192,9 +192,11 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
// mergeIndex is true, the segment file not need to be written
// and will be written during merging index
if (partitionSpec != null && !isMergeIndex) {
- SegmentFileStore.writeSegmentFile(loadModel.getTablePath(), loadModel.getTaskNo(),
- partitionSpec.getLocation().toString(), loadModel.getFactTimeStamp() + "",
- partitionSpec.getPartitions());
+ // By default carbon.merge.index.in.segment is true and this code will be used for
+ // developer debugging purpose.
+ SegmentFileStore.writeSegmentFileForPartitionTable(loadModel.getTablePath(),
+ loadModel.getTaskNo(), partitionSpec.getLocation().toString(),
+ loadModel.getFactTimeStamp() + "", partitionSpec.getPartitions());
}
} catch (CarbonDataWriterException | IOException e) {
mergeStatus = false;
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 1d35502..919742d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -1148,10 +1148,11 @@ public final class CarbonLoaderUtil {
* @return
*/
public static String mergeIndexFilesInPartitionedSegment(CarbonTable table, String segmentId,
- String uuid, String partitionPath) {
+ String uuid, String partitionPath, boolean isOldStoreIndexFilesPresent) {
String tablePath = table.getTablePath();
return new CarbonIndexFileMergeWriter(table)
- .mergeCarbonIndexFilesOfSegment(segmentId, uuid, tablePath, partitionPath);
+ .mergeCarbonIndexFilesOfSegment(segmentId, uuid, tablePath, partitionPath,
+ isOldStoreIndexFilesPresent);
}
public static SegmentFileStore.FolderDetails mergeIndexFilesInPartitionedTempSegment(
@@ -1163,6 +1164,23 @@ public final class CarbonLoaderUtil {
tempFolderPath, currPartitionSpec);
}
+ /**
+ * Merge index files of segment with external path.
+ */
+ public static String mergeIndexFilesInAddLoadSegment(CarbonTable table, String segmentId,
+ String segmentPath, String uuid) {
+ try {
+ return new CarbonIndexFileMergeWriter(table)
+ .writeMergeIndexFileBasedOnSegmentFolder(null, false, segmentPath, segmentId, uuid,
+ false);
+ } catch (IOException e) {
+ String message =
+ "Failed to merge index files in path: " + segmentPath + ". " + e.getMessage();
+ LOGGER.error(message);
+ throw new RuntimeException(message, e);
+ }
+ }
+
private static void deleteFiles(List<String> filesToBeDeleted) throws IOException {
for (String filePath : filesToBeDeleted) {
FileFactory.deleteFile(filePath);