You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/27 08:19:12 UTC
[15/16] carbondata git commit: [CARBONDATA-2187][PARTITION] Partition
restructure for new folder structure and supporting partition location
feature
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
deleted file mode 100644
index a0ce24a..0000000
--- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.metadata;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
-import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatus;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
-
-import com.google.gson.Gson;
-
-/**
- * Provide read and write support for partition mapping file in each segment
- */
-public class PartitionMapFileStore {
-
- private Map<String, List<String>> partitionMap = new HashMap<>();
-
- private boolean partionedSegment = false;
- /**
- * Write partitionmapp file to the segment folder with indexfilename and corresponding partitions.
- *
- * @param segmentPath
- * @param taskNo
- * @param partionNames
- * @throws IOException
- */
- public void writePartitionMapFile(String segmentPath, final String taskNo,
- List<String> partionNames) throws IOException {
- CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
- // write partition info to new file.
- if (carbonFile.exists() && partionNames.size() > 0) {
- CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile file) {
- return file.getName().startsWith(taskNo) && file.getName()
- .endsWith(CarbonTablePath.INDEX_FILE_EXT);
- }
- });
- if (carbonFiles != null && carbonFiles.length > 0) {
- PartitionMapper partitionMapper = new PartitionMapper();
- Map<String, List<String>> partitionMap = new HashMap<>();
- partitionMap.put(carbonFiles[0].getName(), partionNames);
- partitionMapper.setPartitionMap(partitionMap);
- String path = segmentPath + "/" + taskNo + CarbonTablePath.PARTITION_MAP_EXT;
- writePartitionFile(partitionMapper, path);
- }
- }
- }
-
- private void writePartitionFile(PartitionMapper partitionMapper, String path) throws IOException {
- AtomicFileOperations fileWrite =
- new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
- BufferedWriter brWriter = null;
- DataOutputStream dataOutputStream = null;
- Gson gsonObjectToWrite = new Gson();
- try {
- dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
- brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
- String metadataInstance = gsonObjectToWrite.toJson(partitionMapper);
- brWriter.write(metadataInstance);
- } finally {
- if (null != brWriter) {
- brWriter.flush();
- }
- CarbonUtil.closeStreams(brWriter);
- fileWrite.close();
- }
- }
-
- /**
- * Merge all partition files in a segment to single file.
- *
- * @param segmentPath
- * @throws IOException
- */
- public void mergePartitionMapFiles(String segmentPath, String mergeFileName) throws IOException {
- CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
- if (partitionFiles != null && partitionFiles.length > 0) {
- PartitionMapper partitionMapper = null;
- for (CarbonFile file : partitionFiles) {
- PartitionMapper localMapper = readPartitionMap(file.getAbsolutePath());
- if (partitionMapper == null && localMapper != null) {
- partitionMapper = localMapper;
- }
- if (localMapper != null) {
- partitionMapper = partitionMapper.merge(localMapper);
- }
- }
- if (partitionMapper != null) {
- String path = segmentPath + "/" + mergeFileName + CarbonTablePath.PARTITION_MAP_EXT;
- writePartitionFile(partitionMapper, path);
- for (CarbonFile file : partitionFiles) {
- if (!FileFactory.deleteAllCarbonFilesOfDir(file)) {
- throw new IOException("Old partition map files cannot be deleted");
- }
- }
- }
- }
- }
-
- private String getPartitionFilePath(String segmentPath) {
- CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
- if (carbonFile.exists()) {
- CarbonFile[] partitionFiles = carbonFile.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile file) {
- return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT);
- }
- });
- if (partitionFiles != null && partitionFiles.length > 0) {
- partionedSegment = true;
- int i = 0;
- // Get the latest partition map file based on the timestamp of that file.
- long[] partitionTimestamps = new long[partitionFiles.length];
- for (CarbonFile file : partitionFiles) {
- partitionTimestamps[i++] = Long.parseLong(file.getName()
- .substring(0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
- }
- Arrays.sort(partitionTimestamps);
- return segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1]
- + CarbonTablePath.PARTITION_MAP_EXT;
- }
- }
- return null;
- }
-
- private String getPartitionFilePath(CarbonFile[] carbonFiles, String segmentPath) {
-
- List<CarbonFile> partitionFiles = new ArrayList<>();
- for (CarbonFile file : carbonFiles) {
- if (file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT)) {
- partitionFiles.add(file);
- }
- }
- if (partitionFiles.size() > 0) {
- partionedSegment = true;
- int i = 0;
- // Get the latest partition map file based on the timestamp of that file.
- long[] partitionTimestamps = new long[partitionFiles.size()];
- for (CarbonFile file : partitionFiles) {
- partitionTimestamps[i++] = Long.parseLong(file.getName()
- .substring(0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
- }
- Arrays.sort(partitionTimestamps);
- return segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1]
- + CarbonTablePath.PARTITION_MAP_EXT;
- }
- return null;
- }
-
- private CarbonFile[] getPartitionFiles(String segmentPath) {
- CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
- if (carbonFile.exists()) {
- return carbonFile.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile file) {
- return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT);
- }
- });
- }
- return null;
- }
-
- /**
- * This method reads the partition file
- *
- * @param partitionMapPath
- * @return
- */
- private PartitionMapper readPartitionMap(String partitionMapPath) throws IOException {
- Gson gsonObjectToRead = new Gson();
- DataInputStream dataInputStream = null;
- BufferedReader buffReader = null;
- InputStreamReader inStream = null;
- PartitionMapper partitionMapper;
- AtomicFileOperations fileOperation =
- new AtomicFileOperationsImpl(partitionMapPath, FileFactory.getFileType(partitionMapPath));
-
- try {
- if (!FileFactory.isFileExist(partitionMapPath, FileFactory.getFileType(partitionMapPath))) {
- return null;
- }
- dataInputStream = fileOperation.openForRead();
- inStream = new InputStreamReader(dataInputStream,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
- buffReader = new BufferedReader(inStream);
- partitionMapper = gsonObjectToRead.fromJson(buffReader, PartitionMapper.class);
- } finally {
- if (inStream != null) {
- CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
- }
- }
-
- return partitionMapper;
- }
-
- /**
- * Reads all partitions which existed inside the passed segment path
- * @param segmentPath
- */
- public void readAllPartitionsOfSegment(String segmentPath) throws IOException {
- String partitionFilePath = getPartitionFilePath(segmentPath);
- if (partitionFilePath != null) {
- partionedSegment = true;
- PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
- partitionMap.putAll(partitionMapper.getPartitionMap());
- }
- }
-
- /**
- * Reads all partitions which existed inside the passed segment path
- * @param carbonFiles
- */
- public void readAllPartitionsOfSegment(CarbonFile[] carbonFiles, String segmentPath)
- throws IOException {
- String partitionFilePath = getPartitionFilePath(carbonFiles, segmentPath);
- if (partitionFilePath != null) {
- partionedSegment = true;
- PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
- partitionMap.putAll(partitionMapper.getPartitionMap());
- }
- }
-
- public boolean isPartionedSegment() {
- return partionedSegment;
- }
-
- /**
- * Drops the partitions from the partition mapper file of the segment and writes to a new file.
- * @param segmentPath
- * @param partitionsToDrop
- * @param uniqueId
- * @param partialMatch If it is true then even the partial partition spec matches also can be
- * dropped
- * @throws IOException
- */
- public void dropPartitions(String segmentPath, List<List<String>> partitionsToDrop,
- String uniqueId, boolean partialMatch) throws IOException {
- readAllPartitionsOfSegment(segmentPath);
- List<String> indexesToDrop = new ArrayList<>();
- for (Map.Entry<String, List<String>> entry: partitionMap.entrySet()) {
- for (List<String> partitions: partitionsToDrop) {
- if (partialMatch) {
- if (entry.getValue().containsAll(partitions)) {
- indexesToDrop.add(entry.getKey());
- }
- } else {
- if (partitions.containsAll(entry.getValue())) {
- indexesToDrop.add(entry.getKey());
- }
- }
- }
- }
- if (indexesToDrop.size() > 0) {
- // Remove the indexes from partition map
- for (String indexToDrop : indexesToDrop) {
- partitionMap.remove(indexToDrop);
- }
- PartitionMapper mapper = new PartitionMapper();
- mapper.setPartitionMap(partitionMap);
- String path = segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT + ".tmp";
- writePartitionFile(mapper, path);
- }
- }
-
- /**
- * It deletes the old partition mapper files in case of success. And in case of failure it removes
- * the old new file.
- * @param segmentPath
- * @param uniqueId
- * @param success
- */
- public void commitPartitions(String segmentPath, final String uniqueId, boolean success,
- String tablePath, List<String> partitionsToDrop) {
- CarbonFile carbonFile = FileFactory
- .getCarbonFile(segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT + ".tmp");
- CarbonFile carbonPartFile = FileFactory
- .getCarbonFile(tablePath + "/" + partitionsToDrop.get(0));
- // write partition info to new file.
- if (carbonFile.exists()) {
- if (success) {
- carbonFile.renameForce(segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT);
- } else {
- carbonFile.delete();
- }
- }
- //Remove the partition directory from table path
- carbonPartFile.delete();
- }
-
- /**
- * Clean up invalid data after drop partition in all segments of table
- * @param table
- * @param currentPartitions Current partitions of table
- * @param forceDelete Whether it should be deleted force or check the time for an hour creation
- * to delete data.
- * @throws IOException
- */
- public void cleanSegments(
- CarbonTable table,
- List<String> currentPartitions,
- boolean forceDelete) throws IOException {
- SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
-
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(),
- table.getAbsoluteTableIdentifier().getCarbonTableIdentifier());
-
- LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath());
- // scan through each segment.
- List<String> segmentsNeedToBeDeleted = new ArrayList<>();
- for (LoadMetadataDetails segment : details) {
-
- // if this segment is valid then only we will go for deletion of related
- // dropped partition files. if the segment is mark for delete or compacted then any way
- // it will get deleted.
-
- if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
- || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
- List<String> toBeDeletedIndexFiles = new ArrayList<>();
- List<String> toBeDeletedDataFiles = new ArrayList<>();
- // take the list of files from this segment.
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
- String partitionFilePath = getPartitionFilePath(segmentPath);
- if (partitionFilePath != null) {
- PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
- if (partitionMapper.partitionMap.size() == 0) {
- // There is no partition information, it means all partitions are dropped.
- // So segment need to be marked as delete.
- segmentsNeedToBeDeleted.add(segment.getLoadName());
- continue;
- }
- DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
- SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
- indexFileStore.readAllIIndexOfSegment(segmentPath);
- Set<String> indexFilesFromSegment = indexFileStore.getCarbonIndexMap().keySet();
- for (String indexFile : indexFilesFromSegment) {
- // Check the partition information in the partiton mapper
- List<String> indexPartitions = partitionMapper.partitionMap.get(indexFile);
- if (indexPartitions == null || !currentPartitions.containsAll(indexPartitions)) {
- Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
- .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
- indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length()));
- if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
- toBeDeletedIndexFiles.add(indexFile);
- // Add the corresponding carbondata files to the delete list.
- byte[] fileData = indexFileStore.getFileData(indexFile);
- List<DataFileFooter> indexInfo =
- fileFooterConverter.getIndexInfo(segmentPath + "/" + indexFile, fileData);
- for (DataFileFooter footer : indexInfo) {
- toBeDeletedDataFiles.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
- }
- }
- }
- }
-
- if (toBeDeletedIndexFiles.size() > 0) {
- indexFilesFromSegment.removeAll(toBeDeletedIndexFiles);
- new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath,
- new ArrayList<String>(indexFilesFromSegment));
- for (String dataFile : toBeDeletedDataFiles) {
- FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
- }
- }
- CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
- CarbonFile currentPartitionFile = FileFactory.getCarbonFile(partitionFilePath);
- if (partitionFiles != null) {
- // Delete all old partition files
- for (CarbonFile partitionFile : partitionFiles) {
- if (!currentPartitionFile.getName().equalsIgnoreCase(partitionFile.getName())) {
- long fileTimeStamp = Long.parseLong(partitionFile.getName().substring(0,
- partitionFile.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
- if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimeStamp) || forceDelete) {
- partitionFile.delete();
- }
- }
- }
- }
- partitionMapper = readPartitionMap(partitionFilePath);
- if (partitionMapper != null) {
- // delete partition map if there is no partition files exist
- if (partitionMapper.partitionMap.size() == 0) {
- currentPartitionFile.delete();
- }
- }
- }
- }
- }
- // If any segments that are required to delete
- if (segmentsNeedToBeDeleted.size() > 0) {
- try {
- // Mark the segments as delete.
- SegmentStatusManager.updateDeletionStatus(
- table.getAbsoluteTableIdentifier(),
- segmentsNeedToBeDeleted,
- table.getMetaDataFilepath());
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- }
-
- public List<String> getPartitions(String indexFileName) {
- return partitionMap.get(indexFileName);
- }
-
- public Map<String, List<String>> getPartitionMap() {
- return partitionMap;
- }
-
- public static class PartitionMapper implements Serializable {
-
- private static final long serialVersionUID = 3582245668420401089L;
-
- private Map<String, List<String>> partitionMap;
-
- public PartitionMapper merge(PartitionMapper mapper) {
- if (this == mapper) {
- return this;
- }
- if (partitionMap != null && mapper.partitionMap != null) {
- partitionMap.putAll(mapper.partitionMap);
- }
- if (partitionMap == null) {
- partitionMap = mapper.partitionMap;
- }
- return this;
- }
-
- public Map<String, List<String>> getPartitionMap() {
- return partitionMap;
- }
-
- public void setPartitionMap(Map<String, List<String>> partitionMap) {
- this.partitionMap = partitionMap;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
new file mode 100644
index 0000000..b5f5a25
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ /**
+ * Here key folder path and values are index files in it.
+ */
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ public SegmentFileStore(String tablePath, String segmentFileName) throws IOException {
+ this.tablePath = tablePath;
+ this.segmentFile = readSegment(tablePath, segmentFileName);
+ }
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public static void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public static void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ brWriter.flush();
+ } finally {
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @throws IOException
+ */
+ public static SegmentFile mergeSegmentFiles(String readPath, String mergeFileName,
+ String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private static CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+ }
+ });
+ if (listFiles != null && listFiles.length > 0) {
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ SegmentFile localSegmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(spec.getPartitions());
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : listFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ locationMap.put(location, folderDetails);
+ localSegmentFile.setLocationMap(locationMap);
+ if (segmentFile == null) {
+ segmentFile = localSegmentFile;
+ } else {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ }
+ return segmentFile;
+ }
+
+ /**
+ * This method reads the segment file which is written in json format
+ *
+ * @param segmentFilePath
+ * @return
+ */
+ private static SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
+ Gson gsonObjectToRead = new Gson();
+ DataInputStream dataInputStream = null;
+ BufferedReader buffReader = null;
+ InputStreamReader inStream = null;
+ SegmentFile segmentFile;
+ AtomicFileOperations fileOperation =
+ new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+
+ try {
+ if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
+ return null;
+ }
+ dataInputStream = fileOperation.openForRead();
+ inStream = new InputStreamReader(dataInputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ buffReader = new BufferedReader(inStream);
+ segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
+ } finally {
+ if (inStream != null) {
+ CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+ }
+ }
+
+ return segmentFile;
+ }
+
+ /**
+ * Reads segment file.
+ */
+ private SegmentFile readSegment(String tablePath, String segmentFileName) throws IOException {
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFileName;
+ return readSegmentFile(segmentFilePath);
+ }
+
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ /**
+ * Gets all the index files and related carbondata files from this segment. First user needs to
+ * call @readIndexFiles method before calling it.
+ * @return
+ */
+ public Map<String, List<String>> getIndexFilesMap() {
+ return indexFilesMap;
+ }
+
+ /**
+ * Reads all index files which are located in this segment. First user needs to call
+ * @readSegment method before calling it.
+ * @throws IOException
+ */
+ public void readIndexFiles() throws IOException {
+ readIndexFiles(SegmentStatus.SUCCESS, false);
+ }
+
+ /**
+ * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
+ * reads all index files
+ * @param status
+ * @param ignoreStatus
+ * @throws IOException
+ */
+ private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException {
+ if (indexFilesMap != null) {
+ return;
+ }
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ indexFilesMap = new HashMap<>();
+ indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+ Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+ List<DataFileFooter> indexInfo =
+ fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+ List<String> blocks = new ArrayList<>();
+ for (DataFileFooter footer : indexInfo) {
+ blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
+ }
+ indexFilesMap.put(entry.getKey(), blocks);
+ }
+ }
+
+ /**
+ * Gets all index files from this segment
+ * @return
+ */
+ public Map<String, String> getIndexFiles() {
+ Map<String, String> indexFiles = new HashMap<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+ for (String indexFile : entry.getValue().getFiles()) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+ entry.getValue().mergeFileName);
+ }
+ }
+ }
+ }
+ return indexFiles;
+ }
+
+ /**
+ * Drops the partition related files from the segment file of the segment and writes
+ * to a new file. First iterator over segment file and check the path it needs to be dropped.
+ * And update the status with delete if it found.
+ *
+ * @param uniqueId
+ * @throws IOException
+ */
+ public void dropPartitions(Segment segment, List<PartitionSpec> partitionSpecs,
+ String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments)
+ throws IOException {
+ readSegment(tablePath, segment.getSegmentFileName());
+ boolean updateSegment = false;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ Path path = new Path(location);
+ // Update the status to delete if path equals
+ for (PartitionSpec spec : partitionSpecs) {
+ if (path.equals(spec.getLocation())) {
+ entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
+ updateSegment = true;
+ break;
+ }
+ }
+ }
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+ writePath =
+ writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentNo() + "_" + uniqueId
+ + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, writePath);
+ // Check whether we can completly remove the segment.
+ boolean deleteSegment = true;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+ deleteSegment = false;
+ break;
+ }
+ }
+ if (deleteSegment) {
+ toBeDeletedSegments.add(segment.getSegmentNo());
+ }
+ if (updateSegment) {
+ toBeUpdatedSegments.add(segment.getSegmentNo());
+ }
+ }
+
+ /**
+ * Update the table status file with the dropped partitions information
+ *
+ * @param carbonTable
+ * @param uniqueId
+ * @param toBeUpdatedSegments
+ * @param toBeDeleteSegments
+ * @throws IOException
+ */
+ public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId,
+ List<String> toBeUpdatedSegments, List<String> toBeDeleteSegments) throws IOException {
+ if (toBeDeleteSegments.size() > 0 || toBeUpdatedSegments.size() > 0) {
+ Set<Segment> segmentSet = new HashSet<>(
+ new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
+ .getValidAndInvalidSegments().getValidSegments());
+ CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
+ Segment.toSegmentList(toBeDeleteSegments), Segment.toSegmentList(toBeUpdatedSegments));
+ }
+ }
+
+ /**
+ * Clean up invalid data after drop partition in all segments of table
+ *
+ * @param table
+ * @param forceDelete Whether it should be deleted force or check the time for an hour creation
+ * to delete data.
+ * @throws IOException
+ */
+ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitionSpecs,
+ boolean forceDelete) throws IOException {
+
+ LoadMetadataDetails[] details =
+ SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath());
+ // scan through each segment.
+ for (LoadMetadataDetails segment : details) {
+ // if this segment is valid then only we will go for deletion of related
+ // dropped partition files. if the segment is mark for delete or compacted then any way
+ // it will get deleted.
+
+ if ((segment.getSegmentStatus() == SegmentStatus.SUCCESS
+ || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS)
+ && segment.getSegmentFile() != null) {
+ List<String> toBeDeletedIndexFiles = new ArrayList<>();
+ List<String> toBeDeletedDataFiles = new ArrayList<>();
+ // take the list of files from this segment.
+ SegmentFileStore fileStore =
+ new SegmentFileStore(table.getTablePath(), segment.getSegmentFile());
+ fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
+ if (forceDelete) {
+ deletePhysicalPartition(partitionSpecs, fileStore.getIndexFilesMap());
+ }
+ for (Map.Entry<String, List<String>> entry : fileStore.indexFilesMap.entrySet()) {
+ String indexFile = entry.getKey();
+ // Check the partition information in the partiton mapper
+ Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
+ .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+ indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length()));
+ if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
+ toBeDeletedIndexFiles.add(indexFile);
+ // Add the corresponding carbondata files to the delete list.
+ toBeDeletedDataFiles.addAll(entry.getValue());
+ }
+ }
+ if (toBeDeletedIndexFiles.size() > 0) {
+ for (String dataFile : toBeDeletedIndexFiles) {
+ FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
+ }
+ for (String dataFile : toBeDeletedDataFiles) {
+ FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Deletes the segment file and its physical files like partition folders from disk
+ * @param tablePath
+ * @param segmentFile
+ * @param partitionSpecs
+ * @throws IOException
+ */
+ public static void deleteSegment(String tablePath, String segmentFile,
+ List<PartitionSpec> partitionSpecs) throws IOException {
+ SegmentFileStore fileStore = new SegmentFileStore(tablePath, segmentFile);
+ fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
+ Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+ for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+ FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey()));
+ for (String file : entry.getValue()) {
+ FileFactory.deleteFile(file, FileFactory.getFileType(file));
+ }
+ }
+ deletePhysicalPartition(partitionSpecs, indexFilesMap);
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFile;
+ // Deletes the physical segment file
+ FileFactory.deleteFile(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+ }
+
+ private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
+ Map<String, List<String>> locationMap) {
+ for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
+ Path location = new Path(entry.getKey()).getParent();
+ boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
+ if (!exists) {
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+ }
+ }
+ }
+
+ private static boolean pathExistsInPartitionSpec(List<PartitionSpec> partitionSpecs,
+ Path partitionPath) {
+ for (PartitionSpec spec : partitionSpecs) {
+ if (spec.getLocation().equals(partitionPath)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get the partition specs of the segment
+ * @param segmentId
+ * @param tablePath
+ * @return
+ * @throws IOException
+ */
+ public static List<PartitionSpec> getPartitionSpecs(String segmentId, String tablePath)
+ throws IOException {
+ LoadMetadataDetails segEntry = null;
+ LoadMetadataDetails[] details =
+ SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath));
+ for (LoadMetadataDetails entry : details) {
+ if (entry.getLoadName().equals(segmentId)) {
+ segEntry = entry;
+ break;
+ }
+ }
+ if (segEntry != null && segEntry.getSegmentFile() != null) {
+ SegmentFileStore fileStore = new SegmentFileStore(tablePath, segEntry.getSegmentFile());
+ List<PartitionSpec> partitionSpecs = fileStore.getPartitionSpecs();
+ for (PartitionSpec spec : partitionSpecs) {
+ spec.setUuid(segmentId + "_" + segEntry.getLoadStartTime());
+ }
+ return partitionSpecs;
+ }
+ return null;
+ }
+
+ /**
+ * Move the loaded data from temp folder to respective partition folder.
+ * @param segmentFile
+ * @param tmpFolder
+ * @param tablePath
+ */
+ public static void moveFromTempFolder(SegmentFile segmentFile, String tmpFolder,
+ String tablePath) {
+
+ for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : segmentFile.getLocationMap()
+ .entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative()) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ CarbonFile oldFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tmpFolder);
+ CarbonFile[] oldFiles = oldFolder.listFiles();
+ for (CarbonFile file : oldFiles) {
+ file.renameForce(location + CarbonCommonConstants.FILE_SEPARATOR + file.getName());
+ }
+ oldFolder.delete();
+ }
+ }
+
+ /**
+ * Remove temp stage folder in case of job aborted.
+ *
+ * @param locationMap
+ * @param tmpFolder
+ * @param tablePath
+ */
+ public static void removeTempFolder(Map<String, FolderDetails> locationMap, String tmpFolder,
+ String tablePath) {
+ if (locationMap == null) {
+ return;
+ }
+ for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : locationMap.entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative()) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ CarbonFile oldFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tmpFolder);
+ if (oldFolder.exists()) {
+ FileFactory.deleteAllCarbonFilesOfDir(oldFolder);
+ }
+ }
+ }
+
+ /**
+ * Returns content of segment
+ * @return
+ */
+ public Map<String, FolderDetails> getLocationMap() {
+ if (segmentFile == null) {
+ return new HashMap<>();
+ }
+ return segmentFile.getLocationMap();
+ }
+
+ /**
+ * Returs the current partition specs of this segment
+ * @return
+ */
+ public List<PartitionSpec> getPartitionSpecs() {
+ List<PartitionSpec> partitionSpecs = new ArrayList<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+ partitionSpecs.add(new PartitionSpec(entry.getValue().partitions, location));
+ }
+ }
+ }
+ return partitionSpecs;
+ }
+
+ /**
+ * It contains the segment information like location, partitions and related index files
+ */
+ public static class SegmentFile implements Serializable {
+
+ private static final long serialVersionUID = 3582245668420401089L;
+
+ private Map<String, FolderDetails> locationMap;
+
+ public SegmentFile merge(SegmentFile mapper) {
+ if (this == mapper) {
+ return this;
+ }
+ if (locationMap != null && mapper.locationMap != null) {
+ for (Map.Entry<String, FolderDetails> entry : mapper.locationMap.entrySet()) {
+ FolderDetails folderDetails = locationMap.get(entry.getKey());
+ if (folderDetails != null) {
+ folderDetails.merge(entry.getValue());
+ } else {
+ locationMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ if (locationMap == null) {
+ locationMap = mapper.locationMap;
+ }
+ return this;
+ }
+
+ public Map<String, FolderDetails> getLocationMap() {
+ return locationMap;
+ }
+
+ public void setLocationMap(Map<String, FolderDetails> locationMap) {
+ this.locationMap = locationMap;
+ }
+ }
+
+ /**
+ * Represents one partition folder
+ */
+ public static class FolderDetails implements Serializable {
+
+ private static final long serialVersionUID = 501021868886928553L;
+
+ private Set<String> files = new HashSet<>();
+
+ private List<String> partitions = new ArrayList<>();
+
+ private String status;
+
+ private String mergeFileName;
+
+ private boolean isRelative;
+
+ public FolderDetails merge(FolderDetails folderDetails) {
+ if (this == folderDetails || folderDetails == null) {
+ return this;
+ }
+ if (folderDetails.files != null) {
+ files.addAll(folderDetails.files);
+ }
+ if (files == null) {
+ files = folderDetails.files;
+ }
+ partitions = folderDetails.partitions;
+ return this;
+ }
+
+ public Set<String> getFiles() {
+ return files;
+ }
+
+ public void setFiles(Set<String> files) {
+ this.files = files;
+ }
+
+ public List<String> getPartitions() {
+ return partitions;
+ }
+
+ public void setPartitions(List<String> partitions) {
+ this.partitions = partitions;
+ }
+
+ public boolean isRelative() {
+ return isRelative;
+ }
+
+ public void setRelative(boolean relative) {
+ isRelative = relative;
+ }
+
+ public String getMergeFileName() {
+ return mergeFileName;
+ }
+
+ public void setMergeFileName(String mergeFileName) {
+ this.mergeFileName = mergeFileName;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index c5f61c2..de98fa8 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -82,14 +83,16 @@ public class CarbonUpdateUtil {
* @param factPath
* @return
*/
- public static String getTableBlockPath(String tid, String factPath) {
- String part =
- CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID));
+ public static String getTableBlockPath(String tid, String factPath, boolean isPartitionTable) {
+ String partField = getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID);
+ if (isPartitionTable) {
+ return factPath + CarbonCommonConstants.FILE_SEPARATOR + partField;
+ }
+ String part = CarbonTablePath.addPartPrefix(partField);
String segment =
CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID));
return factPath + CarbonCommonConstants.FILE_SEPARATOR + part
+ CarbonCommonConstants.FILE_SEPARATOR + segment;
-
}
/**
@@ -172,6 +175,22 @@ public class CarbonUpdateUtil {
}
/**
+ * Update table status
+ * @param updatedSegmentsList
+ * @param table
+ * @param updatedTimeStamp
+ * @param isTimestampUpdationRequired
+ * @param segmentsToBeDeleted
+ * @return
+ */
+ public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList,
+ CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdationRequired,
+ List<Segment> segmentsToBeDeleted) {
+ return updateTableMetadataStatus(updatedSegmentsList, table, updatedTimeStamp,
+ isTimestampUpdationRequired, segmentsToBeDeleted, new ArrayList<Segment>());
+ }
+
+ /**
*
* @param updatedSegmentsList
* @param table
@@ -180,10 +199,9 @@ public class CarbonUpdateUtil {
* @param segmentsToBeDeleted
* @return
*/
- public static boolean updateTableMetadataStatus(Set<String> updatedSegmentsList,
- CarbonTable table, String updatedTimeStamp,
- boolean isTimestampUpdationRequired,
- List<String> segmentsToBeDeleted) {
+ public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList,
+ CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdationRequired,
+ List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated) {
boolean status = false;
@@ -221,13 +239,13 @@ public class CarbonUpdateUtil {
}
// if the segments is in the list of marked for delete then update the status.
- if (segmentsToBeDeleted.contains(loadMetadata.getLoadName())) {
+ if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName(), null))) {
loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp));
}
}
- for (String segName : updatedSegmentsList) {
- if (loadMetadata.getLoadName().equalsIgnoreCase(segName)) {
+ for (Segment segName : updatedSegmentsList) {
+ if (loadMetadata.getLoadName().equalsIgnoreCase(segName.getSegmentNo())) {
// if this call is coming from the delete delta flow then the time stamp
// String will come empty then no need to write into table status file.
if (isTimestampUpdationRequired) {
@@ -240,6 +258,10 @@ public class CarbonUpdateUtil {
// update end timestamp for each time.
loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp);
}
+ if (segmentFilesTobeUpdated.contains(Segment.toSegment(loadMetadata.getLoadName()))) {
+ loadMetadata.setSegmentFile(loadMetadata.getLoadName() + "_" + updatedTimeStamp
+ + CarbonTablePath.SEGMENT_EXT);
+ }
}
}
}
@@ -696,14 +718,14 @@ public class CarbonUpdateUtil {
* @param segmentBlockCount
* @return
*/
- public static List<String> getListOfSegmentsToMarkDeleted(Map<String, Long> segmentBlockCount) {
- List<String> segmentsToBeDeleted =
+ public static List<Segment> getListOfSegmentsToMarkDeleted(Map<String, Long> segmentBlockCount) {
+ List<Segment> segmentsToBeDeleted =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) {
if (eachSeg.getValue() == 0) {
- segmentsToBeDeleted.add(eachSeg.getKey());
+ segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), null));
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 6490694..cc2e513 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -74,7 +74,6 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.commons.lang3.ArrayUtils;
@@ -269,7 +268,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
blockExecutionInfoList.add(getBlockExecutionInfoForBlock(queryModel, abstractIndex,
dataRefNode.getBlockInfos().get(0).getBlockletInfos().getStartBlockletNumber(),
dataRefNode.numberOfNodes(), dataRefNode.getBlockInfos().get(0).getFilePath(),
- dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath()));
+ dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath(),
+ dataRefNode.getBlockInfos().get(0).getSegmentId()));
}
if (null != queryModel.getStatisticsRecorder()) {
QueryStatistic queryStatistic = new QueryStatistic();
@@ -291,7 +291,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
*/
protected BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan, String filePath,
- String[] deleteDeltaFiles)
+ String[] deleteDeltaFiles, String segmentId)
throws QueryExecutionException {
BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
@@ -304,11 +304,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
.createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
queryModel.getQueryDimension(), tableBlockDimensions,
segmentProperties.getComplexDimensions());
- int tableFactPathLength = CarbonStorePath
- .getCarbonTablePath(queryModel.getAbsoluteTableIdentifier().getTablePath(),
- queryModel.getAbsoluteTableIdentifier().getCarbonTableIdentifier()).getFactDir()
- .length() + 1;
- blockExecutionInfo.setBlockId(filePath.substring(tableFactPathLength));
+ blockExecutionInfo.setBlockId(
+ CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId));
blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
@@ -457,6 +454,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
return blockExecutionInfo;
}
+
+
/**
* This method will be used to get fixed key length size this will be used
* to create a row from column chunk
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 85602bc..a0fa67d 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -123,6 +123,11 @@ public class LoadMetadataDetails implements Serializable {
*/
private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
+ /**
+ * Segment file name where it has the information of partition information.
+ */
+ private String segmentFile;
+
public String getPartitionCount() {
return partitionCount;
}
@@ -417,4 +422,17 @@ public class LoadMetadataDetails implements Serializable {
public void setFileFormat(FileFormat fileFormat) {
this.fileFormat = fileFormat;
}
+
+ public String getSegmentFile() {
+ return segmentFile;
+ }
+
+ public void setSegmentFile(String segmentFile) {
+ this.segmentFile = segmentFile;
+ }
+
+ @Override public String toString() {
+ return "LoadMetadataDetails{" + "loadStatus=" + loadStatus + ", loadName='" + loadName + '\''
+ + ", loadStartTime='" + loadStartTime + '\'' + ", segmentFile='" + segmentFile + '\'' + '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 9d14c62..76c2dc7 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -33,6 +33,7 @@ import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
@@ -99,13 +100,14 @@ public class SegmentStatusManager {
public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
// @TODO: move reading LoadStatus file to separate class
- List<String> listOfValidSegments = new ArrayList<>(10);
- List<String> listOfValidUpdatedSegments = new ArrayList<>(10);
- List<String> listOfInvalidSegments = new ArrayList<>(10);
- List<String> listOfStreamSegments = new ArrayList<>(10);
+ List<Segment> listOfValidSegments = new ArrayList<>(10);
+ List<Segment> listOfValidUpdatedSegments = new ArrayList<>(10);
+ List<Segment> listOfInvalidSegments = new ArrayList<>(10);
+ List<Segment> listOfStreamSegments = new ArrayList<>(10);
+ List<Segment> listOfInProgressSegments = new ArrayList<>(10);
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier());
String dataPath = carbonTablePath.getTableStatusFilePath();
DataInputStream dataInputStream = null;
@@ -113,7 +115,7 @@ public class SegmentStatusManager {
Gson gson = new Gson();
AtomicFileOperations fileOperation =
- new AtomicFileOperationsImpl(dataPath, FileFactory.getFileType(dataPath));
+ new AtomicFileOperationsImpl(dataPath, FileFactory.getFileType(dataPath));
LoadMetadataDetails[] loadFolderDetailsArray;
try {
if (FileFactory.isFileExist(dataPath, FileFactory.getFileType(dataPath))) {
@@ -127,37 +129,44 @@ public class SegmentStatusManager {
}
//just directly iterate Array
for (LoadMetadataDetails segment : loadFolderDetailsArray) {
- if (SegmentStatus.SUCCESS == segment.getSegmentStatus() ||
- SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus() ||
- SegmentStatus.LOAD_PARTIAL_SUCCESS == segment.getSegmentStatus() ||
- SegmentStatus.STREAMING == segment.getSegmentStatus() ||
- SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
+ if (SegmentStatus.SUCCESS == segment.getSegmentStatus()
+ || SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()
+ || SegmentStatus.LOAD_PARTIAL_SUCCESS == segment.getSegmentStatus()
+ || SegmentStatus.STREAMING == segment.getSegmentStatus()
+ || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
// check for merged loads.
if (null != segment.getMergedLoadName()) {
- if (!listOfValidSegments.contains(segment.getMergedLoadName())) {
- listOfValidSegments.add(segment.getMergedLoadName());
+ Segment seg = new Segment(segment.getMergedLoadName(), segment.getSegmentFile());
+ if (!listOfValidSegments.contains(seg)) {
+ listOfValidSegments.add(seg);
}
// if merged load is updated then put it in updated list
if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {
- listOfValidUpdatedSegments.add(segment.getMergedLoadName());
+ listOfValidUpdatedSegments.add(seg);
}
continue;
}
if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {
- listOfValidUpdatedSegments.add(segment.getLoadName());
+ listOfValidUpdatedSegments
+ .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
}
- if (SegmentStatus.STREAMING == segment.getSegmentStatus() ||
- SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
- listOfStreamSegments.add(segment.getLoadName());
+ if (SegmentStatus.STREAMING == segment.getSegmentStatus()
+ || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
+ listOfStreamSegments
+ .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
continue;
}
- listOfValidSegments.add(segment.getLoadName());
- } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus() ||
- SegmentStatus.COMPACTED == segment.getSegmentStatus() ||
- SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
- listOfInvalidSegments.add(segment.getLoadName());
+ listOfValidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+ } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
+ || SegmentStatus.COMPACTED == segment.getSegmentStatus()
+ || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
+ listOfInvalidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+ } else if (SegmentStatus.INSERT_IN_PROGRESS == segment.getSegmentStatus() ||
+ SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segment.getSegmentStatus()) {
+ listOfInProgressSegments
+ .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
}
}
}
@@ -168,7 +177,7 @@ public class SegmentStatusManager {
CarbonUtil.closeStreams(dataInputStream);
}
return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments,
- listOfInvalidSegments, listOfStreamSegments);
+ listOfInvalidSegments, listOfStreamSegments, listOfInProgressSegments);
}
/**
@@ -688,29 +697,35 @@ public class SegmentStatusManager {
public static class ValidAndInvalidSegmentsInfo {
- private final List<String> listOfValidSegments;
- private final List<String> listOfValidUpdatedSegments;
- private final List<String> listOfInvalidSegments;
- private final List<String> listOfStreamSegments;
-
- private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments,
- List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments,
- List<String> listOfStreamSegments) {
+ private final List<Segment> listOfValidSegments;
+ private final List<Segment> listOfValidUpdatedSegments;
+ private final List<Segment> listOfInvalidSegments;
+ private final List<Segment> listOfStreamSegments;
+ private final List<Segment> listOfInProgressSegments;
+
+ private ValidAndInvalidSegmentsInfo(List<Segment> listOfValidSegments,
+ List<Segment> listOfValidUpdatedSegments, List<Segment> listOfInvalidUpdatedSegments,
+ List<Segment> listOfStreamSegments, List<Segment> listOfInProgressSegments) {
this.listOfValidSegments = listOfValidSegments;
this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
this.listOfStreamSegments = listOfStreamSegments;
+ this.listOfInProgressSegments = listOfInProgressSegments;
}
- public List<String> getInvalidSegments() {
+ public List<Segment> getInvalidSegments() {
return listOfInvalidSegments;
}
- public List<String> getValidSegments() {
+ public List<Segment> getValidSegments() {
return listOfValidSegments;
}
- public List<String> getStreamSegments() {
+ public List<Segment> getStreamSegments() {
return listOfStreamSegments;
}
+
+ public List<Segment> getListOfInProgressSegments() {
+ return listOfInProgressSegments;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index e0e7b70..71b6ba8 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -33,6 +33,7 @@ import java.util.Map;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -70,6 +71,7 @@ public class SegmentUpdateStatusManager {
private SegmentUpdateDetails[] updateDetails;
private CarbonTablePath carbonTablePath;
private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
+ private boolean isPartitionTable;
/**
* @param absoluteTableIdentifier
@@ -83,6 +85,9 @@ public class SegmentUpdateStatusManager {
// on latest file status.
segmentDetails =
segmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+ if (segmentDetails.length > 0) {
+ isPartitionTable = segmentDetails[0].getSegmentFile() != null;
+ }
updateDetails = readLoadMetadata();
populateMap();
}
@@ -268,12 +273,14 @@ public class SegmentUpdateStatusManager {
* @return all delete delta files
* @throws Exception
*/
- public String[] getDeleteDeltaFilePath(String blockFilePath) throws Exception {
- int tableFactPathLength = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier()).getFactDir().length() + 1;
- String blockame = blockFilePath.substring(tableFactPathLength);
- String tupleId = CarbonTablePath.getShortBlockId(blockame);
+ public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception {
+ String blockId = CarbonUtil.getBlockId(absoluteTableIdentifier, blockFilePath, segmentId);
+ String tupleId;
+ if (isPartitionTable) {
+ tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);
+ } else {
+ tupleId = CarbonTablePath.getShortBlockId(blockId);
+ }
return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
.toArray(new String[0]);
}
@@ -292,12 +299,19 @@ public class SegmentUpdateStatusManager {
.getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());
String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
- String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment);
String completeBlockName = CarbonTablePath.addDataPartPrefix(
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
+ CarbonCommonConstants.FACT_FILE_EXT);
- String blockPath =
- carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
+ String blockPath;
+ if (isPartitionTable) {
+ blockPath = absoluteTableIdentifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
+ .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
+ } else {
+ String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment);
+ blockPath =
+ carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
+ }
CarbonFile file = FileFactory.getCarbonFile(blockPath, FileFactory.getFileType(blockPath));
if (!file.exists()) {
throw new Exception("Invalid tuple id " + tupleId);
@@ -306,8 +320,7 @@ public class SegmentUpdateStatusManager {
//blockName without timestamp
final String blockNameFromTuple =
blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-"));
- return getDeltaFiles(file, blockNameFromTuple, extension,
- segment);
+ return getDeltaFiles(file, blockNameFromTuple, extension, segment);
} catch (Exception ex) {
String errorMsg = "Invalid tuple id " + tupleId;
LOG.error(errorMsg);
@@ -418,20 +431,20 @@ public class SegmentUpdateStatusManager {
* @param blockName
* @return
*/
- public CarbonFile[] getDeleteDeltaFilesList(final String segmentId, final String blockName) {
+ public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) {
CarbonTablePath carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId.getSegmentNo());
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
for (SegmentUpdateDetails block : updateDetails) {
if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
- (block.getSegmentName().equalsIgnoreCase(segmentId))
+ (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))
&& !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) {
final long deltaStartTimestamp =
getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index c208154..eb0a9d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -47,6 +47,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -63,6 +64,7 @@ import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
@@ -1437,14 +1439,13 @@ public final class CarbonUtil {
* @param values
* @return comma separated segment string
*/
- public static String convertToString(List<String> values) {
+ public static String convertToString(List<Segment> values) {
if (values == null || values.isEmpty()) {
return "";
}
StringBuilder segmentStringbuilder = new StringBuilder();
for (int i = 0; i < values.size() - 1; i++) {
- String segmentNo = values.get(i);
- segmentStringbuilder.append(segmentNo);
+ segmentStringbuilder.append(values.get(i));
segmentStringbuilder.append(",");
}
segmentStringbuilder.append(values.get(values.size() - 1));
@@ -2145,7 +2146,8 @@ public final class CarbonUtil {
}
for (String value : values) {
if (!value.equalsIgnoreCase("*")) {
- Float aFloatValue = Float.parseFloat(value);
+ Segment segment = Segment.toSegment(value);
+ Float aFloatValue = Float.parseFloat(segment.getSegmentNo());
if (aFloatValue < 0 || aFloatValue > Float.MAX_VALUE) {
throw new InvalidConfigurationException(
"carbon.input.segments.<database_name>.<table_name> value range should be greater "
@@ -2298,7 +2300,7 @@ public final class CarbonUtil {
}
// Get the total size of carbon data and the total size of carbon index
- public static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath,
+ private static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath,
String segmentId) throws IOException {
long carbonDataSize = 0L;
long carbonIndexSize = 0L;
@@ -2350,6 +2352,51 @@ public final class CarbonUtil {
return dataAndIndexSize;
}
+ // Get the total size of carbon data and the total size of carbon index
+ private static HashMap<String, Long> getDataSizeAndIndexSize(SegmentFileStore fileStore)
+ throws IOException {
+ long carbonDataSize = 0L;
+ long carbonIndexSize = 0L;
+ HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
+ if (fileStore.getLocationMap() != null) {
+ fileStore.readIndexFiles();
+ Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+ for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+ carbonIndexSize += FileFactory.getCarbonFile(entry.getKey()).getSize();
+ for (String blockFile : entry.getValue()) {
+ carbonDataSize += FileFactory.getCarbonFile(blockFile).getSize();
+ }
+ }
+ }
+ dataAndIndexSize.put(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE, carbonDataSize);
+ dataAndIndexSize.put(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE, carbonIndexSize);
+ return dataAndIndexSize;
+ }
+
+ // Get the total size of carbon data and the total size of carbon index
+ public static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath,
+ Segment segment) throws IOException {
+ if (segment.getSegmentFileName() != null) {
+ SegmentFileStore fileStore =
+ new SegmentFileStore(carbonTablePath.getPath(), segment.getSegmentFileName());
+ return getDataSizeAndIndexSize(fileStore);
+ } else {
+ return getDataSizeAndIndexSize(carbonTablePath, segment.getSegmentNo());
+ }
+ }
+
+ // Get the total size of segment.
+ public static long getSizeOfSegment(CarbonTablePath carbonTablePath,
+ Segment segment) throws IOException {
+ HashMap<String, Long> dataSizeAndIndexSize = getDataSizeAndIndexSize(carbonTablePath, segment);
+ long size = 0;
+ for (Long eachSize: dataSizeAndIndexSize.values()) {
+ size += eachSize;
+ }
+ return size;
+ }
+
+
/**
* Utility function to check whether table has timseries datamap or not
* @param carbonTable
@@ -2449,5 +2496,43 @@ public final class CarbonUtil {
return updatedMinMaxValues;
}
+ /**
+ * Generate the blockid as per the block path
+ *
+ * @param identifier
+ * @param filePath
+ * @param segmentId
+ * @return
+ */
+ public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
+ String segmentId) {
+ String blockId;
+ String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
+ String tablePath = identifier.getTablePath();
+ if (filePath.startsWith(tablePath)) {
+ String factDir =
+ CarbonStorePath.getCarbonTablePath(tablePath, identifier.getCarbonTableIdentifier())
+ .getFactDir();
+ if (filePath.startsWith(factDir)) {
+ blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
+ + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+ } else {
+ // This is the case with partition table.
+ String partitionDir =
+ filePath.substring(tablePath.length() + 1, filePath.length() - blockName.length() - 1);
+
+ // Replace / with # on partition director to support multi level partitioning. And access
+ // them all as a single entity.
+ blockId = partitionDir.replace("/", "#") + CarbonCommonConstants.FILE_SEPARATOR + "Segment_"
+ + segmentId + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+ }
+ } else {
+ blockId = filePath.substring(0, filePath.length() - blockName.length()).replace("/", "#")
+ + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
+ + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+ }
+ return blockId;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
index d4b328d..f1603dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.core.util;
import java.io.Serializable;
+import java.math.BigDecimal;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -26,18 +27,33 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable {
private static final long serialVersionUID = -1718154403432354200L;
public Object convertToDecimal(Object data) {
- return new java.math.BigDecimal(data.toString());
+ if (null == data) {
+ return null;
+ }
+ if (data instanceof BigDecimal) {
+ return data;
+ }
+ return new BigDecimal(data.toString());
}
public Object convertFromByteToUTF8String(Object data) {
+ if (null == data) {
+ return null;
+ }
return new String((byte[]) data, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
}
public byte[] convertFromStringToByte(Object data) {
+ if (null == data) {
+ return null;
+ }
return data.toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
}
public Object convertFromStringToUTF8String(Object data) {
+ if (null == data) {
+ return null;
+ }
return data.toString();
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index c370b14..4602cc4 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -83,25 +83,7 @@ public final class DataTypeUtil {
*/
public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
CarbonMeasure carbonMeasure) {
- if (dataType == DataTypes.BOOLEAN) {
- return BooleanConvert.parseBoolean(msrValue);
- } else if (DataTypes.isDecimal(dataType)) {
- BigDecimal bigDecimal =
- new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
- return normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision());
- } else if (dataType == DataTypes.SHORT) {
- return Short.parseShort(msrValue);
- } else if (dataType == DataTypes.INT) {
- return Integer.parseInt(msrValue);
- } else if (dataType == DataTypes.LONG) {
- return Long.valueOf(msrValue);
- } else {
- Double parsedValue = Double.valueOf(msrValue);
- if (Double.isInfinite(parsedValue) || Double.isNaN(parsedValue)) {
- return null;
- }
- return parsedValue;
- }
+ return getMeasureValueBasedOnDataType(msrValue, dataType,carbonMeasure, false);
}
/**
@@ -112,15 +94,19 @@ public final class DataTypeUtil {
* @param carbonMeasure
* @return
*/
- public static Object getConvertedMeasureValueBasedOnDataType(String msrValue, DataType dataType,
- CarbonMeasure carbonMeasure) {
+ public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
+ CarbonMeasure carbonMeasure, boolean useConverter) {
if (dataType == DataTypes.BOOLEAN) {
return BooleanConvert.parseBoolean(msrValue);
} else if (DataTypes.isDecimal(dataType)) {
BigDecimal bigDecimal =
new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
- return converter
- .convertToDecimal(normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision()));
+ BigDecimal decimal = normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision());
+ if (useConverter) {
+ return converter.convertToDecimal(decimal);
+ } else {
+ return decimal;
+ }
} else if (dataType == DataTypes.SHORT) {
return Short.parseShort(msrValue);
} else if (dataType == DataTypes.INT) {
@@ -815,6 +801,8 @@ public final class DataTypeUtil {
*/
public static void setDataTypeConverter(DataTypeConverter converterLocal) {
converter = converterLocal;
+ timeStampformatter.remove();
+ dateformatter.remove();
}
public static DataTypeConverter getDataTypeConverter() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 5a63d2f..d70d9ef 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -48,7 +48,7 @@ public class CarbonTablePath extends Path {
public static final String CARBON_DATA_EXT = ".carbondata";
public static final String INDEX_FILE_EXT = ".carbonindex";
public static final String MERGE_INDEX_FILE_EXT = ".carbonindexmerge";
- public static final String PARTITION_MAP_EXT = ".partitionmap";
+ public static final String SEGMENT_EXT = ".segment";
private static final String STREAMING_DIR = ".streaming";
private static final String STREAMING_LOG_DIR = "log";
@@ -111,17 +111,6 @@ public class CarbonTablePath extends Path {
}
/**
- * Return true if the fileNameWithPath ends with partition map file extension name
- */
- public static boolean isPartitionMapFile(String fileNameWithPath) {
- int pos = fileNameWithPath.lastIndexOf('.');
- if (pos != -1) {
- return fileNameWithPath.substring(pos).startsWith(PARTITION_MAP_EXT);
- }
- return false;
- }
-
- /**
* check if it is carbon index file matching extension
*
* @param fileNameWithPath
@@ -667,6 +656,18 @@ public class CarbonTablePath extends Path {
}
/**
+ * This method will remove strings in path and return short block id
+ *
+ * @param blockId
+ * @return shortBlockId
+ */
+ public static String getShortBlockIdForPartitionTable(String blockId) {
+ return blockId.replace(SEGMENT_PREFIX, "")
+ .replace(DATA_PART_PREFIX, "")
+ .replace(CARBON_DATA_EXT, "");
+ }
+
+ /**
* This method will append strings in path and return block id
*
* @param shortBlockId
@@ -735,4 +736,12 @@ public class CarbonTablePath extends Path {
public static String getSegmentPath(String tablePath, String segmentId) {
return tablePath + "/Fact/Part0/Segment_" + segmentId;
}
+
+ /**
+ * Get the segment file locations of table
+ */
+ public static String getSegmentFilesLocation(String tablePath) {
+ return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + "segments";
+ }
+
}
\ No newline at end of file