You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by ravipesala <gi...@git.apache.org> on 2018/02/18 03:03:08 UTC
[GitHub] carbondata pull request #1984: [WIP] Partition restructure
GitHub user ravipesala opened a pull request:
https://github.com/apache/carbondata/pull/1984
[WIP] Partition restructure
Be sure to do all of the following checklist to help us incorporate
your contribution quickly and easily:
- [ ] Any interfaces changed?
- [ ] Any backward compatibility impacted?
- [ ] Document update required?
- [ ] Testing done
Please provide details on
- Whether new unit test cases have been added or why no new tests are required?
- How it is tested? Please attach test report.
- Is it a performance related change? Please attach the performance test report.
- Any additional information to help reviewers in testing this change.
- [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ravipesala/incubator-carbondata partition-restructure
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/carbondata/pull/1984.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1984
----
commit 9d12e9a2b0e5e14d8f70cfb6238b091ea2384e78
Author: ravipesala <ra...@...>
Date: 2018-02-09T04:07:02Z
Support global sort for standard hive partitioning
commit 3e5a031d111db98befaefffd09243001e25ee2f8
Author: ravipesala <ra...@...>
Date: 2018-02-14T19:01:56Z
Partition restructure for new folder structure and supporting partition location feature
----
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169540997
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---
@@ -161,9 +212,13 @@ private void readMergeFile(String mergeFilePath) throws IOException {
MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
List<String> file_names = indexHeader.getFile_names();
List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
+ CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
assert (file_names.size() == fileData.size());
for (int i = 0; i < file_names.size(); i++) {
carbonIndexMap.put(file_names.get(i), fileData.get(i).array());
+ carbonIndexMapWithFullPath.put(
+ mergeFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR
+ + file_names.get(i), fileData.get(i).array());
}
thriftReader.close();
--- End diff --
use try and finally block and close the thrift reader inside finally block
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3620/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2627/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3767/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3599/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3581/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3551/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3636/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2521/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by gvramana <gi...@git.apache.org>.
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169641109
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
--- End diff --
cleanup required for temporary folders, if abortJob is not called when process is killed.
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2586/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169635879
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+ }
+ });
+ if (listFiles != null && listFiles.length > 0) {
+ SegmentFile localSegmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(spec.getPartitions());
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : listFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ locationMap.put(location, folderDetails);
+ localSegmentFile.setLocationMap(locationMap);
+ if (segmentFile == null) {
+ segmentFile = localSegmentFile;
+ } else {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ }
+ return segmentFile;
+ }
+
+ /**
+ * This method reads the segment file which is written in json format
+ *
+ * @param segmentFilePath
+ * @return
+ */
+ private SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
+ Gson gsonObjectToRead = new Gson();
+ DataInputStream dataInputStream = null;
+ BufferedReader buffReader = null;
+ InputStreamReader inStream = null;
+ SegmentFile segmentFile;
+ AtomicFileOperations fileOperation =
+ new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+
+ try {
+ if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
+ return null;
+ }
+ dataInputStream = fileOperation.openForRead();
+ inStream = new InputStreamReader(dataInputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ buffReader = new BufferedReader(inStream);
+ segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
+ } finally {
+ if (inStream != null) {
+ CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+ }
+ }
+
+ return segmentFile;
+ }
+
+ /**
+ * Reads segment file.
+ */
+ public void readSegment(String tablePath, String segmentFileName) throws IOException {
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFileName;
+ SegmentFile segmentFile = readSegmentFile(segmentFilePath);
+ this.tablePath = tablePath;
+ this.segmentFile = segmentFile;
+ }
+
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ /**
+ * Gets all the index files and related carbondata files from this segment. First user needs to
+ * call @readIndexFiles method before calling it.
+ * @return
+ */
+ public Map<String, List<String>> getIndexFilesMap() {
+ return indexFilesMap;
+ }
+
+ /**
+ * Reads all index files which are located in this segment. First user needs to call
+ * @readSegment method before calling it.
+ * @throws IOException
+ */
+ public void readIndexFiles() throws IOException {
+ readIndexFiles(SegmentStatus.SUCCESS, false);
+ }
+
+ /**
+ * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
+ * reads all index files
+ * @param status
+ * @param ignoreStatus
+ * @throws IOException
+ */
+ private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException {
+ if (indexFilesMap != null) {
+ return;
+ }
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ indexFilesMap = new HashMap<>();
+ indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+ Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+ List<DataFileFooter> indexInfo =
+ fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+ List<String> blocks = new ArrayList<>();
+ for (DataFileFooter footer : indexInfo) {
+ blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
+ }
+ indexFilesMap.put(entry.getKey(), blocks);
+ }
+ }
+
+ /**
+ * Gets all index files from this segment
+ * @return
+ */
+ public Map<String, String> getIndexFiles() {
+ Map<String, String> indexFiles = new HashMap<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+ for (String indexFile : entry.getValue().getFiles()) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+ entry.getValue().mergeFileName);
+ }
+ }
+ }
+ }
+ return indexFiles;
+ }
+
+ /**
+ * Drops the partition related files from the segment file of the segment and writes
+ * to a new file. First iterator over segment file and check the path it needs to be dropped.
+ * And update the status with delete if it found.
+ *
+ * @param uniqueId
+ * @throws IOException
+ */
+ public void dropPartitions(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs,
+ String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments)
+ throws IOException {
+ readSegment(tablePath, segment.getSegmentFileName());
+ boolean updateSegment = false;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ Path path = new Path(location);
+ // Update the status to delete if path equals
+ for (PartitionSpec spec : partitionSpecs) {
+ if (path.equals(spec.getLocation())) {
+ entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
+ updateSegment = true;
+ break;
+ }
+ }
+ }
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+ writePath =
+ writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentId() + "_" + uniqueId
+ + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, writePath);
+ // Check whether we can completly remove the segment.
+ boolean deleteSegment = true;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+ deleteSegment = false;
+ }
+ }
+ if (deleteSegment) {
+ toBeDeletedSegments.add(segment.getSegmentId());
+ }
+ if (updateSegment) {
+ toBeUpdatedSegments.add(segment.getSegmentId());
+ }
+ }
+
+ /**
+ * Update the table status file with the dropped partitions information
+ *
+ * @param carbonTable
+ * @param uniqueId
+ * @param toBeUpdatedSegments
+ * @param toBeDeleteSegments
+ * @throws IOException
+ */
+ public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId,
+ List<String> toBeUpdatedSegments, List<String> toBeDeleteSegments) throws IOException {
+ Set<Segment> segmentSet = new HashSet<>(
--- End diff --
ok
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3871/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by gvramana <gi...@git.apache.org>.
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169392444
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Holds partition information.
+ */
+public class PartitionSpec implements Serializable {
+
+ private static final long serialVersionUID = 4828007433384867678L;
+
+ private List<String> partitions;
--- End diff --
is paritions signify list of partition columns?
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169544845
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+ }
+ });
+ if (listFiles != null && listFiles.length > 0) {
+ SegmentFile localSegmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(spec.getPartitions());
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : listFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ locationMap.put(location, folderDetails);
+ localSegmentFile.setLocationMap(locationMap);
+ if (segmentFile == null) {
+ segmentFile = localSegmentFile;
+ } else {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ }
+ return segmentFile;
+ }
+
+ /**
+ * This method reads the segment file which is written in json format
+ *
+ * @param segmentFilePath
+ * @return
+ */
+ private SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
+ Gson gsonObjectToRead = new Gson();
+ DataInputStream dataInputStream = null;
+ BufferedReader buffReader = null;
+ InputStreamReader inStream = null;
+ SegmentFile segmentFile;
+ AtomicFileOperations fileOperation =
+ new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+
+ try {
+ if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
+ return null;
+ }
+ dataInputStream = fileOperation.openForRead();
+ inStream = new InputStreamReader(dataInputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ buffReader = new BufferedReader(inStream);
+ segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
+ } finally {
+ if (inStream != null) {
+ CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+ }
+ }
+
+ return segmentFile;
+ }
+
+ /**
+ * Reads segment file.
+ */
+ public void readSegment(String tablePath, String segmentFileName) throws IOException {
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFileName;
+ SegmentFile segmentFile = readSegmentFile(segmentFilePath);
+ this.tablePath = tablePath;
+ this.segmentFile = segmentFile;
+ }
+
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ /**
+ * Gets all the index files and related carbondata files from this segment. First user needs to
+ * call @readIndexFiles method before calling it.
+ * @return
+ */
+ public Map<String, List<String>> getIndexFilesMap() {
+ return indexFilesMap;
+ }
+
+ /**
+ * Reads all index files which are located in this segment. First user needs to call
+ * @readSegment method before calling it.
+ * @throws IOException
+ */
+ public void readIndexFiles() throws IOException {
+ readIndexFiles(SegmentStatus.SUCCESS, false);
+ }
+
+ /**
+ * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
+ * reads all index files
+ * @param status
+ * @param ignoreStatus
+ * @throws IOException
+ */
+ private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException {
+ if (indexFilesMap != null) {
+ return;
+ }
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ indexFilesMap = new HashMap<>();
+ indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+ Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+ List<DataFileFooter> indexInfo =
+ fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+ List<String> blocks = new ArrayList<>();
+ for (DataFileFooter footer : indexInfo) {
+ blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
+ }
+ indexFilesMap.put(entry.getKey(), blocks);
+ }
+ }
+
+ /**
+ * Gets all index files from this segment
+ * @return
+ */
+ public Map<String, String> getIndexFiles() {
+ Map<String, String> indexFiles = new HashMap<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+ for (String indexFile : entry.getValue().getFiles()) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+ entry.getValue().mergeFileName);
+ }
+ }
+ }
+ }
+ return indexFiles;
+ }
+
+ /**
+ * Drops the partition related files from the segment file of the segment and writes
+ * to a new file. First iterator over segment file and check the path it needs to be dropped.
+ * And update the status with delete if it found.
+ *
+ * @param uniqueId
+ * @throws IOException
+ */
+ public void dropPartitions(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs,
+ String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments)
+ throws IOException {
+ readSegment(tablePath, segment.getSegmentFileName());
+ boolean updateSegment = false;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ Path path = new Path(location);
+ // Update the status to delete if path equals
+ for (PartitionSpec spec : partitionSpecs) {
+ if (path.equals(spec.getLocation())) {
+ entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
+ updateSegment = true;
+ break;
+ }
+ }
+ }
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+ writePath =
+ writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentId() + "_" + uniqueId
+ + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, writePath);
+ // Check whether we can completly remove the segment.
+ boolean deleteSegment = true;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+ deleteSegment = false;
+ }
+ }
+ if (deleteSegment) {
+ toBeDeletedSegments.add(segment.getSegmentId());
+ }
+ if (updateSegment) {
+ toBeUpdatedSegments.add(segment.getSegmentId());
+ }
+ }
+
+ /**
+ * Update the table status file with the dropped partitions information
+ *
+ * @param carbonTable
+ * @param uniqueId
+ * @param toBeUpdatedSegments
+ * @param toBeDeleteSegments
+ * @throws IOException
+ */
+ public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId,
+ List<String> toBeUpdatedSegments, List<String> toBeDeleteSegments) throws IOException {
+ Set<Segment> segmentSet = new HashSet<>(
--- End diff --
Move segmentSet creation inside the below if check
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3794/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3592/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3865/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2560/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2525/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3548/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3759/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2516/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3769/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by gvramana <gi...@git.apache.org>.
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169402456
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
--- End diff --
As segmentfile already has transactionTimestamp/LoadTimestamp, temp folder is not required.
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2652/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2519/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169619680
--- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents one load of carbondata
+ */
+public class Segment implements Serializable {
+
+ private static final long serialVersionUID = 7044555408162234064L;
+
+ private String segmentId;
+
+ private String segmentFileName;
+
+ public Segment(String segmentId, String segmentFileName) {
+ this.segmentId = segmentId;
+ this.segmentFileName = segmentFileName;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public String getSegmentFileName() {
+ return segmentFileName;
+ }
+
+ public static List<Segment> toSegmentList(String[] segmentIds) {
--- End diff --
ok, renamed
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169544627
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+ }
+ });
+ if (listFiles != null && listFiles.length > 0) {
+ SegmentFile localSegmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(spec.getPartitions());
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : listFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ locationMap.put(location, folderDetails);
+ localSegmentFile.setLocationMap(locationMap);
+ if (segmentFile == null) {
+ segmentFile = localSegmentFile;
+ } else {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ }
+ return segmentFile;
+ }
+
+ /**
+ * This method reads the segment file which is written in json format
+ *
+ * @param segmentFilePath
+ * @return
+ */
+ private SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
+ Gson gsonObjectToRead = new Gson();
+ DataInputStream dataInputStream = null;
+ BufferedReader buffReader = null;
+ InputStreamReader inStream = null;
+ SegmentFile segmentFile;
+ AtomicFileOperations fileOperation =
+ new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+
+ try {
+ if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
+ return null;
+ }
+ dataInputStream = fileOperation.openForRead();
+ inStream = new InputStreamReader(dataInputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ buffReader = new BufferedReader(inStream);
+ segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
+ } finally {
+ if (inStream != null) {
+ CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+ }
+ }
+
+ return segmentFile;
+ }
+
+ /**
+ * Reads segment file.
+ */
+ public void readSegment(String tablePath, String segmentFileName) throws IOException {
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFileName;
+ SegmentFile segmentFile = readSegmentFile(segmentFilePath);
+ this.tablePath = tablePath;
+ this.segmentFile = segmentFile;
+ }
+
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ /**
+ * Gets all the index files and related carbondata files from this segment. First user needs to
+ * call @readIndexFiles method before calling it.
+ * @return
+ */
+ public Map<String, List<String>> getIndexFilesMap() {
+ return indexFilesMap;
+ }
+
+ /**
+ * Reads all index files which are located in this segment. First user needs to call
+ * @readSegment method before calling it.
+ * @throws IOException
+ */
+ public void readIndexFiles() throws IOException {
+ readIndexFiles(SegmentStatus.SUCCESS, false);
+ }
+
+ /**
+ * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
+ * reads all index files
+ * @param status
+ * @param ignoreStatus
+ * @throws IOException
+ */
+ private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException {
+ if (indexFilesMap != null) {
+ return;
+ }
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ indexFilesMap = new HashMap<>();
+ indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+ Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+ List<DataFileFooter> indexInfo =
+ fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+ List<String> blocks = new ArrayList<>();
+ for (DataFileFooter footer : indexInfo) {
+ blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
+ }
+ indexFilesMap.put(entry.getKey(), blocks);
+ }
+ }
+
+ /**
+ * Gets all index files from this segment
+ * @return
+ */
+ public Map<String, String> getIndexFiles() {
+ Map<String, String> indexFiles = new HashMap<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+ for (String indexFile : entry.getValue().getFiles()) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+ entry.getValue().mergeFileName);
+ }
+ }
+ }
+ }
+ return indexFiles;
+ }
+
+ /**
+ * Drops the partition related files from the segment file of the segment and writes
+ * to a new file. First iterator over segment file and check the path it needs to be dropped.
+ * And update the status with delete if it found.
+ *
+ * @param uniqueId
+ * @throws IOException
+ */
+ public void dropPartitions(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs,
+ String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments)
+ throws IOException {
+ readSegment(tablePath, segment.getSegmentFileName());
+ boolean updateSegment = false;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ Path path = new Path(location);
+ // Update the status to delete if path equals
+ for (PartitionSpec spec : partitionSpecs) {
+ if (path.equals(spec.getLocation())) {
+ entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
+ updateSegment = true;
+ break;
+ }
+ }
+ }
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+ writePath =
+ writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentId() + "_" + uniqueId
+ + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, writePath);
+ // Check whether we can completly remove the segment.
+ boolean deleteSegment = true;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+ deleteSegment = false;
--- End diff --
break the loop once deleteSegment is set to false
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3597/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3785/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3809/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169543287
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
--- End diff --
Move this if check after the below if check for {if (listFiles != null && listFiles.length > 0)}
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/carbondata/pull/1984
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3833/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3614/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3584/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169520945
--- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java ---
@@ -217,6 +217,6 @@ public void convertValue(ColumnPageValueConverter codec) {
@Override
public void freeMemory() {
-
+ byteArrayData = null;
--- End diff --
other types like intData, longData can also be used, so please set the references to null for all the types here
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169628343
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---
@@ -79,6 +87,44 @@ public void readAllIIndexOfSegment(String segmentPath) throws IOException {
}
}
+ /**
+ * Read all index files and keep the cache in it.
+ *
+ * @param segmentFileStore
+ * @throws IOException
+ */
+ public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status,
+ boolean ignoreStatus) throws IOException {
+ List<CarbonFile> carbonIndexFiles = new ArrayList<>();
+ if (segmentFileStore.getLocationMap() == null) {
+ return;
+ }
+ for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore
+ .getLocationMap().entrySet()) {
+ String location = locations.getKey();
+ if (locations.getValue().isRelative()) {
+ location =
--- End diff --
ok
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3553/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by gvramana <gi...@git.apache.org>.
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169638631
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ public SegmentFileStore(String tablePath, String segmentFileName) throws IOException {
+ this.tablePath = tablePath;
+ this.segmentFile = readSegment(tablePath, segmentFileName);
+ }
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public static void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public static void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ brWriter.flush();
+ } finally {
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @throws IOException
+ */
+ public static SegmentFile mergeSegmentFiles(String readPath, String mergeFileName,
+ String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private static CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+ }
+ });
+ if (listFiles != null && listFiles.length > 0) {
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ SegmentFile localSegmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(spec.getPartitions());
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : listFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ locationMap.put(location, folderDetails);
+ localSegmentFile.setLocationMap(locationMap);
+ if (segmentFile == null) {
+ segmentFile = localSegmentFile;
+ } else {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ }
+ return segmentFile;
+ }
+
+ /**
+ * This method reads the segment file which is written in json format
+ *
+ * @param segmentFilePath
+ * @return
+ */
+ private static SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
+ Gson gsonObjectToRead = new Gson();
+ DataInputStream dataInputStream = null;
+ BufferedReader buffReader = null;
+ InputStreamReader inStream = null;
+ SegmentFile segmentFile;
+ AtomicFileOperations fileOperation =
+ new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+
+ try {
+ if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
+ return null;
+ }
+ dataInputStream = fileOperation.openForRead();
+ inStream = new InputStreamReader(dataInputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ buffReader = new BufferedReader(inStream);
+ segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
+ } finally {
+ if (inStream != null) {
+ CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+ }
+ }
+
+ return segmentFile;
+ }
+
+ /**
+ * Reads segment file.
+ */
+ private SegmentFile readSegment(String tablePath, String segmentFileName) throws IOException {
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFileName;
+ return readSegmentFile(segmentFilePath);
+ }
+
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ /**
+ * Gets all the index files and related carbondata files from this segment. First user needs to
+ * call @readIndexFiles method before calling it.
+ * @return
+ */
+ public Map<String, List<String>> getIndexFilesMap() {
+ return indexFilesMap;
+ }
+
+ /**
+ * Reads all index files which are located in this segment. First user needs to call
+ * @readSegment method before calling it.
+ * @throws IOException
+ */
+ public void readIndexFiles() throws IOException {
+ readIndexFiles(SegmentStatus.SUCCESS, false);
+ }
+
+ /**
+ * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
+ * reads all index files
+ * @param status
+ * @param ignoreStatus
+ * @throws IOException
+ */
+ private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException {
+ if (indexFilesMap != null) {
+ return;
+ }
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ indexFilesMap = new HashMap<>();
+ indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+ Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+ List<DataFileFooter> indexInfo =
+ fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+ List<String> blocks = new ArrayList<>();
+ for (DataFileFooter footer : indexInfo) {
+ blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
+ }
+ indexFilesMap.put(entry.getKey(), blocks);
+ }
+ }
+
+ /**
+ * Gets all index files from this segment
+ * @return
+ */
+ public Map<String, String> getIndexFiles() {
+ Map<String, String> indexFiles = new HashMap<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+ for (String indexFile : entry.getValue().getFiles()) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+ entry.getValue().mergeFileName);
+ }
+ }
+ }
+ }
+ return indexFiles;
+ }
+
+ /**
+ * Drops the partition related files from the segment file of the segment and writes
+ * to a new file. First iterator over segment file and check the path it needs to be dropped.
+ * And update the status with delete if it found.
+ *
+ * @param uniqueId
+ * @throws IOException
+ */
+ public void dropPartitions(Segment segment, List<PartitionSpec> partitionSpecs,
+ String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments)
+ throws IOException {
+ readSegment(tablePath, segment.getSegmentFileName());
+ boolean updateSegment = false;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ Path path = new Path(location);
+ // Update the status to delete if path equals
+ for (PartitionSpec spec : partitionSpecs) {
+ if (path.equals(spec.getLocation())) {
+ entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
+ updateSegment = true;
+ break;
+ }
+ }
+ }
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+ writePath =
+ writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentNo() + "_" + uniqueId
+ + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, writePath);
+ // Check whether we can completly remove the segment.
+ boolean deleteSegment = true;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+ deleteSegment = false;
+ break;
+ }
+ }
+ if (deleteSegment) {
+ toBeDeletedSegments.add(segment.getSegmentNo());
+ }
+ if (updateSegment) {
+ toBeUpdatedSegments.add(segment.getSegmentNo());
+ }
+ }
+
+ /**
+ * Update the table status file with the dropped partitions information
+ *
+ * @param carbonTable
+ * @param uniqueId
+ * @param toBeUpdatedSegments
+ * @param toBeDeleteSegments
+ * @throws IOException
+ */
+ public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId,
+ List<String> toBeUpdatedSegments, List<String> toBeDeleteSegments) throws IOException {
+ if (toBeDeleteSegments.size() > 0 || toBeUpdatedSegments.size() > 0) {
+ Set<Segment> segmentSet = new HashSet<>(
+ new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
+ .getValidAndInvalidSegments().getValidSegments());
+ CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
+ Segment.toSegmentList(toBeDeleteSegments), Segment.toSegmentList(toBeUpdatedSegments));
+ }
+ }
+
+ /**
+ * Clean up invalid data after drop partition in all segments of table
+ *
+ * @param table
+ * @param forceDelete Whether it should be deleted force or check the time for an hour creation
+ * to delete data.
+ * @throws IOException
+ */
+ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitionSpecs,
+ boolean forceDelete) throws IOException {
+
+ LoadMetadataDetails[] details =
+ SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath());
+ // scan through each segment.
+ for (LoadMetadataDetails segment : details) {
+ // if this segment is valid then only we will go for deletion of related
+ // dropped partition files. if the segment is mark for delete or compacted then any way
+ // it will get deleted.
+
+ if ((segment.getSegmentStatus() == SegmentStatus.SUCCESS
+ || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS)
+ && segment.getSegmentFile() != null) {
+ List<String> toBeDeletedIndexFiles = new ArrayList<>();
+ List<String> toBeDeletedDataFiles = new ArrayList<>();
+ // take the list of files from this segment.
+ SegmentFileStore fileStore =
+ new SegmentFileStore(table.getTablePath(), segment.getSegmentFile());
+ fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
+ if (forceDelete) {
+ deletePhysicalPartition(table.getTablePath(), partitionSpecs,
+ fileStore.segmentFile.locationMap);
+ }
+ for (Map.Entry<String, List<String>> entry : fileStore.indexFilesMap.entrySet()) {
+ String indexFile = entry.getKey();
+ // Check the partition information in the partiton mapper
+ Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
+ .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+ indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length()));
+ if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
+ toBeDeletedIndexFiles.add(indexFile);
+ // Add the corresponding carbondata files to the delete list.
+ toBeDeletedDataFiles.addAll(entry.getValue());
+ }
+ }
+ if (toBeDeletedIndexFiles.size() > 0) {
+ for (String dataFile : toBeDeletedIndexFiles) {
+ FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
+ }
+ for (String dataFile : toBeDeletedDataFiles) {
+ FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Deletes the segment file and its physical files like partition folders from disk
+ * @param tablePath
+ * @param segmentFile
+ * @param partitionSpecs
+ * @throws IOException
+ */
+ public static void deleteSegment(String tablePath, String segmentFile,
+ List<PartitionSpec> partitionSpecs) throws IOException {
+ SegmentFileStore fileStore = new SegmentFileStore(tablePath, segmentFile);
+ fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
+ Map<String, FolderDetails> locationMap = fileStore.getLocationMap();
+ Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+ for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+ FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey()));
+ for (String file : entry.getValue()) {
+ FileFactory.deleteFile(file, FileFactory.getFileType(file));
+ }
+ }
+ deletePhysicalPartition(tablePath, partitionSpecs, locationMap);
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFile;
+ // Deletes the physical segment file
+ FileFactory.deleteFile(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+ }
+
+ private static void deletePhysicalPartition(String tablePath, List<PartitionSpec> partitionSpecs,
+ Map<String, FolderDetails> locationMap) {
+ for (Map.Entry<String, FolderDetails> entry : locationMap.entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ boolean exists = pathExistsInPartitionSpec(partitionSpecs, new Path(location));
+ if (!exists) {
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location));
+ }
+ }
+ }
+
+ private static boolean pathExistsInPartitionSpec(List<PartitionSpec> partitionSpecs,
+ Path partitionPath) {
+ for (PartitionSpec spec : partitionSpecs) {
+ if (spec.getLocation().equals(partitionPath)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Move the loaded data from temp folder to respective partition folder.
+ * @param segmentFile
+ * @param tmpFolder
+ * @param tablePath
+ */
+ public static void moveFromTempFolder(SegmentFile segmentFile, String tmpFolder,
+ String tablePath) {
+
+ for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : segmentFile.getLocationMap()
+ .entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative()) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ CarbonFile oldFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tmpFolder);
+ CarbonFile[] oldFiles = oldFolder.listFiles();
+ for (CarbonFile file : oldFiles) {
+ file.renameForce(location + CarbonCommonConstants.FILE_SEPARATOR + file.getName());
+ }
+ oldFolder.delete();
+ }
+ }
+
+ /**
+ * Remove temp stage folder in case of job aborted.
+ *
+ * @param locationMap
+ * @param tmpFolder
+ * @param tablePath
+ */
+ public static void removeTempFolder(Map<String, FolderDetails> locationMap, String tmpFolder,
+ String tablePath) {
+ if (locationMap == null) {
+ return;
+ }
+ for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : locationMap.entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative()) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ CarbonFile oldFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tmpFolder);
+ if (oldFolder.exists()) {
+ FileFactory.deleteAllCarbonFilesOfDir(oldFolder);
+ }
+ }
+ }
+
+ /**
+ * Returns content of segment
+ * @return
+ */
+ public Map<String, FolderDetails> getLocationMap() {
+ if (segmentFile == null) {
+ return new HashMap<>();
+ }
+ return segmentFile.getLocationMap();
+ }
+
+ /**
+ * Returs the current partition specs of this segment
+ * @return
+ */
+ public List<PartitionSpec> getPartitionSpecs() {
+ List<PartitionSpec> partitionSpecs = new ArrayList<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+ partitionSpecs.add(new PartitionSpec(entry.getValue().partitions, location));
+ }
+ }
+ }
+ return partitionSpecs;
+ }
+
+ /**
+ * It contains the segment information like location, partitions and related index files
+ */
+ public static class SegmentFile implements Serializable {
+
+ private static final long serialVersionUID = 3582245668420401089L;
+
+ private Map<String, FolderDetails> locationMap;
+
+ public SegmentFile merge(SegmentFile mapper) {
+ if (this == mapper) {
+ return this;
+ }
+ if (locationMap != null && mapper.locationMap != null) {
+ for (Map.Entry<String, FolderDetails> entry : mapper.locationMap.entrySet()) {
+ FolderDetails folderDetails = locationMap.get(entry.getKey());
+ if (folderDetails != null) {
+ folderDetails.merge(entry.getValue());
+ } else {
+ locationMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ if (locationMap == null) {
+ locationMap = mapper.locationMap;
+ }
+ return this;
+ }
+
+ public Map<String, FolderDetails> getLocationMap() {
+ return locationMap;
+ }
+
+ public void setLocationMap(Map<String, FolderDetails> locationMap) {
+ this.locationMap = locationMap;
+ }
+ }
+
+ /**
+ * Represents one partition folder
+ */
+ public static class FolderDetails implements Serializable {
--- End diff --
rename FolderDetails to PartitionFolderDetails
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3625/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by gvramana <gi...@git.apache.org>.
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169636434
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,689 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
--- End diff --
Please write comment about that key is folderpath
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169629728
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
--- End diff --
Failure case is handled in abortJob committer
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3591/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3778/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2520/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3550/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169620760
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---
@@ -185,26 +212,27 @@ public void clear(String segmentId) {
@Override
public void clear() {
for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
- clear(segmentId);
+ clear(new Segment(segmentId, null));
}
}
@Override
public List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException {
BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
- if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
- identifiers.add(new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(),
- mapDistributable.getFilePath()));
- } else if (mapDistributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ String indexPath = mapDistributable.getFilePath();
+ if (indexPath.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+ String parent = indexPath.substring(0, indexPath.lastIndexOf("/"));
--- End diff --
Ok, I have changed to HDFS Path and get parent from it.
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3627/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169518712
--- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents one load of carbondata
+ */
+public class Segment implements Serializable {
+
+ private static final long serialVersionUID = 7044555408162234064L;
+
+ private String segmentId;
+
+ private String segmentFileName;
+
+ public Segment(String segmentId, String segmentFileName) {
+ this.segmentId = segmentId;
+ this.segmentFileName = segmentFileName;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public String getSegmentFileName() {
+ return segmentFileName;
+ }
+
+ public static List<Segment> toSegmentList(String[] segmentIds) {
+ List<Segment> list = new ArrayList<>(segmentIds.length);
+ for (String segmentId : segmentIds) {
+ list.add(toSegment(segmentId));
+ }
+ return list;
+ }
+
+ public static List<Segment> toSegmentList(List<String> segmentIds) {
+ List<Segment> list = new ArrayList<>(segmentIds.size());
+ for (String segmentId : segmentIds) {
+ list.add(toSegment(segmentId));
+ }
+ return list;
+ }
+
+ public static Segment toSegment(String segmentId) {
+ String[] split = segmentId.split("#");
+ if (split.length > 1) {
+ return new Segment(split[0], split[1]);
+ } else if (split.length > 0) {
+ return new Segment(split[0], null);
+ }
+ return new Segment(segmentId, null);
--- End diff --
Please provide a comment for this method with an example covering the if, else if and else case
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169626625
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
--- End diff --
ok
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2620/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169858244
--- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java ---
@@ -217,6 +217,6 @@ public void convertValue(ColumnPageValueConverter codec) {
@Override
public void freeMemory() {
-
+ byteArrayData = null;
--- End diff --
We implemented decimal compression wherein based on precision and scale we are storing decimal as integer, short and long. So other types can be used based on decimal precision and scale
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3638/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169627375
--- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents one load of carbondata
+ */
+public class Segment implements Serializable {
+
+ private static final long serialVersionUID = 7044555408162234064L;
+
+ private String segmentId;
+
+ private String segmentFileName;
+
+ public Segment(String segmentId, String segmentFileName) {
+ this.segmentId = segmentId;
+ this.segmentFileName = segmentFileName;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public String getSegmentFileName() {
+ return segmentFileName;
+ }
+
+ public static List<Segment> toSegmentList(String[] segmentIds) {
+ List<Segment> list = new ArrayList<>(segmentIds.length);
+ for (String segmentId : segmentIds) {
+ list.add(toSegment(segmentId));
+ }
+ return list;
+ }
+
+ public static List<Segment> toSegmentList(List<String> segmentIds) {
+ List<Segment> list = new ArrayList<>(segmentIds.size());
+ for (String segmentId : segmentIds) {
+ list.add(toSegment(segmentId));
+ }
+ return list;
+ }
+
+ public static Segment toSegment(String segmentId) {
+ String[] split = segmentId.split("#");
+ if (split.length > 1) {
+ return new Segment(split[0], split[1]);
+ } else if (split.length > 0) {
+ return new Segment(split[0], null);
+ }
+ return new Segment(segmentId, null);
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Segment segment = (Segment) o;
+ return Objects.equals(segmentId, segment.segmentId);
+ }
+
+ @Override public int hashCode() {
+
--- End diff --
ok
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2599/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3594/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2656/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3546/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2559/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169627883
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java ---
@@ -17,91 +17,59 @@
package org.apache.carbondata.core.indexstore;
+import java.util.Objects;
+
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
/**
- * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment
+ * Class holds the absoluteTableIdentifier and segment to uniquely identify a segment
--- End diff --
ok
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3610/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3647/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3800/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169629127
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
--- End diff --
ok
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3792/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by gvramana <gi...@git.apache.org>.
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169380463
--- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents one load of carbondata
+ */
+public class Segment implements Serializable {
+
+ private static final long serialVersionUID = 7044555408162234064L;
+
+ private String segmentId;
+
+ private String segmentFileName;
+
+ public Segment(String segmentId, String segmentFileName) {
+ this.segmentId = segmentId;
+ this.segmentFileName = segmentFileName;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public String getSegmentFileName() {
+ return segmentFileName;
+ }
+
+ public static List<Segment> toSegmentList(String[] segmentIds) {
--- End diff --
this segmentId parameter is different from this.segmentId. Parameter can be renamed to segmentFullId with (segementId#SegmentFileName). Or this.segmentId renamed to segmentNo.
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3653/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2587/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3547/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2522/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3758/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3763/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by gvramana <gi...@git.apache.org>.
Github user gvramana commented on the issue:
https://github.com/apache/carbondata/pull/1984
LGTM
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3552/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3545/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by gvramana <gi...@git.apache.org>.
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169413537
--- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---
@@ -2449,5 +2496,40 @@ public static String encodeToString(byte[] bytes) throws UnsupportedEncodingExce
return updatedMinMaxValues;
}
+ /**
+ * Generate the blockid as per the block path
+ *
+ * @param identifier
+ * @param filePath
+ * @param segmentId
+ * @return
+ */
+ public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
+ String segmentId) {
+ String blockId;
+ String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
+ String tablePath = identifier.getTablePath();
+ if (filePath.startsWith(tablePath)) {
+ String factDir =
+ CarbonStorePath.getCarbonTablePath(tablePath, identifier.getCarbonTableIdentifier())
+ .getFactDir();
+ if (filePath.startsWith(factDir)) {
+ blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
+ + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+ } else {
+ String partitionDir =
+ filePath.substring(tablePath.length() + 1, filePath.length() - blockName.length() - 1);
+
+ blockId = partitionDir.replace("/", "#") + CarbonCommonConstants.FILE_SEPARATOR + "Segment_"
--- End diff --
why / is replaced with \#, please write comment.
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2530/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169632391
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+ }
+ });
+ if (listFiles != null && listFiles.length > 0) {
+ SegmentFile localSegmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(spec.getPartitions());
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : listFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ locationMap.put(location, folderDetails);
+ localSegmentFile.setLocationMap(locationMap);
+ if (segmentFile == null) {
+ segmentFile = localSegmentFile;
+ } else {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ }
+ return segmentFile;
+ }
+
+ /**
+ * This method reads the segment file which is written in json format
+ *
+ * @param segmentFilePath
+ * @return
+ */
+ private SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
+ Gson gsonObjectToRead = new Gson();
+ DataInputStream dataInputStream = null;
+ BufferedReader buffReader = null;
+ InputStreamReader inStream = null;
+ SegmentFile segmentFile;
+ AtomicFileOperations fileOperation =
+ new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+
+ try {
+ if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
+ return null;
+ }
+ dataInputStream = fileOperation.openForRead();
+ inStream = new InputStreamReader(dataInputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ buffReader = new BufferedReader(inStream);
+ segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
+ } finally {
+ if (inStream != null) {
+ CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+ }
+ }
+
+ return segmentFile;
+ }
+
+ /**
+ * Reads segment file.
+ */
+ public void readSegment(String tablePath, String segmentFileName) throws IOException {
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFileName;
+ SegmentFile segmentFile = readSegmentFile(segmentFilePath);
+ this.tablePath = tablePath;
+ this.segmentFile = segmentFile;
--- End diff --
Ok, moved to constructor
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2594/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3804/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3618/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3760/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3782/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169544464
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+ }
+ });
+ if (listFiles != null && listFiles.length > 0) {
+ SegmentFile localSegmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(spec.getPartitions());
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : listFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ locationMap.put(location, folderDetails);
+ localSegmentFile.setLocationMap(locationMap);
+ if (segmentFile == null) {
+ segmentFile = localSegmentFile;
+ } else {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ }
+ return segmentFile;
+ }
+
+ /**
+ * This method reads the segment file which is written in json format
+ *
+ * @param segmentFilePath
+ * @return
+ */
+ private SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
+ Gson gsonObjectToRead = new Gson();
+ DataInputStream dataInputStream = null;
+ BufferedReader buffReader = null;
+ InputStreamReader inStream = null;
+ SegmentFile segmentFile;
+ AtomicFileOperations fileOperation =
+ new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+
+ try {
+ if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
+ return null;
+ }
+ dataInputStream = fileOperation.openForRead();
+ inStream = new InputStreamReader(dataInputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ buffReader = new BufferedReader(inStream);
+ segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
+ } finally {
+ if (inStream != null) {
+ CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+ }
+ }
+
+ return segmentFile;
+ }
+
+ /**
+ * Reads segment file.
+ */
+ public void readSegment(String tablePath, String segmentFileName) throws IOException {
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFileName;
+ SegmentFile segmentFile = readSegmentFile(segmentFilePath);
+ this.tablePath = tablePath;
+ this.segmentFile = segmentFile;
--- End diff --
Here we are directly updating the member variable. If it is called from multiple places then in concurrent operations it can cause problem. Kindly re-verify and if possible avoid updating the member variables if getting called from multiple places
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by gvramana <gi...@git.apache.org>.
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169411226
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
--- End diff --
flush can throw exception which will not close the fileWriter. Move brWriter.flush() to after write call.
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3757/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2611/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2558/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2565/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169518845
--- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents one load of carbondata
+ */
+public class Segment implements Serializable {
+
+ private static final long serialVersionUID = 7044555408162234064L;
+
+ private String segmentId;
+
+ private String segmentFileName;
+
+ public Segment(String segmentId, String segmentFileName) {
+ this.segmentId = segmentId;
+ this.segmentFileName = segmentFileName;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public String getSegmentFileName() {
+ return segmentFileName;
+ }
+
+ public static List<Segment> toSegmentList(String[] segmentIds) {
+ List<Segment> list = new ArrayList<>(segmentIds.length);
+ for (String segmentId : segmentIds) {
+ list.add(toSegment(segmentId));
+ }
+ return list;
+ }
+
+ public static List<Segment> toSegmentList(List<String> segmentIds) {
+ List<Segment> list = new ArrayList<>(segmentIds.size());
+ for (String segmentId : segmentIds) {
+ list.add(toSegment(segmentId));
+ }
+ return list;
+ }
+
+ public static Segment toSegment(String segmentId) {
+ String[] split = segmentId.split("#");
+ if (split.length > 1) {
+ return new Segment(split[0], split[1]);
+ } else if (split.length > 0) {
+ return new Segment(split[0], null);
+ }
+ return new Segment(segmentId, null);
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Segment segment = (Segment) o;
+ return Objects.equals(segmentId, segment.segmentId);
+ }
+
+ @Override public int hashCode() {
+
--- End diff --
Remove his extra line
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2557/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169626278
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
--- End diff --
Added in CarbonOutputCOmmiter abortjob
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by gvramana <gi...@git.apache.org>.
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169399178
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---
@@ -185,26 +212,27 @@ public void clear(String segmentId) {
@Override
public void clear() {
for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
- clear(segmentId);
+ clear(new Segment(segmentId, null));
}
}
@Override
public List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException {
BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
- if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
- identifiers.add(new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(),
- mapDistributable.getFilePath()));
- } else if (mapDistributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ String indexPath = mapDistributable.getFilePath();
+ if (indexPath.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+ String parent = indexPath.substring(0, indexPath.lastIndexOf("/"));
--- End diff --
Need to use filedelimiter, as it requires to work on windows path
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3756/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169537887
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---
@@ -660,22 +650,23 @@ public boolean isScanRequired(FilterResolverIntf filterExp) {
return blocklets;
}
- @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) {
+ @Override
+ public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) {
if (unsafeMemoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
- // First get the partitions which are stored inside datamap.
- List<String> storedPartitions = getPartitions();
// if it has partitioned datamap but there is no partitioned information stored, it means
// partitions are dropped so return empty list.
- if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) {
- return new ArrayList<>();
- }
- if (storedPartitions != null && storedPartitions.size() > 0) {
+ if (partitions != null) {
+ // First get the partitions which are stored inside datamap.
+ String[] fileDetails = getFileDetails();
// Check the exact match of partition information inside the stored partitions.
boolean found = false;
- if (partitions != null && partitions.size() > 0) {
- found = partitions.containsAll(storedPartitions);
+ Path folderPath = new Path(fileDetails[0]);
+ for (PartitionSpec spec : partitions) {
+ if (folderPath.equals(spec.getLocation())) {
+ found = true;
--- End diff --
break the loop once found
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3803/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169860093
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ public SegmentFileStore(String tablePath, String segmentFileName) throws IOException {
+ this.tablePath = tablePath;
+ this.segmentFile = readSegment(tablePath, segmentFileName);
+ }
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public static void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public static void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ brWriter.flush();
+ } finally {
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @throws IOException
+ */
+ public static SegmentFile mergeSegmentFiles(String readPath, String mergeFileName,
+ String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private static CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+ }
+ });
+ if (listFiles != null && listFiles.length > 0) {
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ SegmentFile localSegmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(spec.getPartitions());
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : listFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ locationMap.put(location, folderDetails);
+ localSegmentFile.setLocationMap(locationMap);
+ if (segmentFile == null) {
+ segmentFile = localSegmentFile;
+ } else {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ }
+ return segmentFile;
+ }
+
+ /**
+ * This method reads the segment file which is written in json format
+ *
+ * @param segmentFilePath
+ * @return
+ */
+ private static SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
+ Gson gsonObjectToRead = new Gson();
+ DataInputStream dataInputStream = null;
+ BufferedReader buffReader = null;
+ InputStreamReader inStream = null;
+ SegmentFile segmentFile;
+ AtomicFileOperations fileOperation =
+ new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+
+ try {
+ if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
+ return null;
+ }
+ dataInputStream = fileOperation.openForRead();
+ inStream = new InputStreamReader(dataInputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ buffReader = new BufferedReader(inStream);
+ segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
+ } finally {
+ if (inStream != null) {
+ CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+ }
+ }
+
+ return segmentFile;
+ }
+
+ /**
+ * Reads segment file.
+ */
+ private SegmentFile readSegment(String tablePath, String segmentFileName) throws IOException {
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFileName;
+ return readSegmentFile(segmentFilePath);
+ }
+
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ /**
+ * Gets all the index files and related carbondata files from this segment. First user needs to
+ * call @readIndexFiles method before calling it.
+ * @return
+ */
+ public Map<String, List<String>> getIndexFilesMap() {
+ return indexFilesMap;
+ }
+
+ /**
+ * Reads all index files which are located in this segment. First user needs to call
+ * @readSegment method before calling it.
+ * @throws IOException
+ */
+ public void readIndexFiles() throws IOException {
+ readIndexFiles(SegmentStatus.SUCCESS, false);
+ }
+
+ /**
+ * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
+ * reads all index files
+ * @param status
+ * @param ignoreStatus
+ * @throws IOException
+ */
+ private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException {
+ if (indexFilesMap != null) {
+ return;
+ }
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ indexFilesMap = new HashMap<>();
+ indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+ Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+ List<DataFileFooter> indexInfo =
+ fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+ List<String> blocks = new ArrayList<>();
+ for (DataFileFooter footer : indexInfo) {
+ blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
+ }
+ indexFilesMap.put(entry.getKey(), blocks);
+ }
+ }
+
+ /**
+ * Gets all index files from this segment
+ * @return
+ */
+ public Map<String, String> getIndexFiles() {
+ Map<String, String> indexFiles = new HashMap<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+ for (String indexFile : entry.getValue().getFiles()) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+ entry.getValue().mergeFileName);
+ }
+ }
+ }
+ }
+ return indexFiles;
+ }
+
+ /**
+ * Drops the partition related files from the segment file of the segment and writes
+ * to a new file. First iterator over segment file and check the path it needs to be dropped.
+ * And update the status with delete if it found.
+ *
+ * @param uniqueId
+ * @throws IOException
+ */
+ public void dropPartitions(Segment segment, List<PartitionSpec> partitionSpecs,
+ String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments)
+ throws IOException {
+ readSegment(tablePath, segment.getSegmentFileName());
+ boolean updateSegment = false;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ Path path = new Path(location);
+ // Update the status to delete if path equals
+ for (PartitionSpec spec : partitionSpecs) {
+ if (path.equals(spec.getLocation())) {
+ entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
+ updateSegment = true;
+ break;
+ }
+ }
+ }
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+ writePath =
+ writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentNo() + "_" + uniqueId
+ + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, writePath);
+ // Check whether we can completly remove the segment.
+ boolean deleteSegment = true;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+ deleteSegment = false;
+ break;
+ }
+ }
+ if (deleteSegment) {
+ toBeDeletedSegments.add(segment.getSegmentNo());
+ }
+ if (updateSegment) {
+ toBeUpdatedSegments.add(segment.getSegmentNo());
+ }
+ }
+
+ /**
+ * Update the table status file with the dropped partitions information
+ *
+ * @param carbonTable
+ * @param uniqueId
+ * @param toBeUpdatedSegments
+ * @param toBeDeleteSegments
+ * @throws IOException
+ */
+ public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId,
+ List<String> toBeUpdatedSegments, List<String> toBeDeleteSegments) throws IOException {
+ if (toBeDeleteSegments.size() > 0 || toBeUpdatedSegments.size() > 0) {
+ Set<Segment> segmentSet = new HashSet<>(
+ new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
+ .getValidAndInvalidSegments().getValidSegments());
+ CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
+ Segment.toSegmentList(toBeDeleteSegments), Segment.toSegmentList(toBeUpdatedSegments));
+ }
+ }
+
+ /**
+ * Clean up invalid data after drop partition in all segments of table
+ *
+ * @param table
+ * @param forceDelete Whether it should be deleted force or check the time for an hour creation
+ * to delete data.
+ * @throws IOException
+ */
+ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitionSpecs,
+ boolean forceDelete) throws IOException {
+
+ LoadMetadataDetails[] details =
+ SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath());
+ // scan through each segment.
+ for (LoadMetadataDetails segment : details) {
+ // if this segment is valid then only we will go for deletion of related
+ // dropped partition files. if the segment is mark for delete or compacted then any way
+ // it will get deleted.
+
+ if ((segment.getSegmentStatus() == SegmentStatus.SUCCESS
+ || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS)
+ && segment.getSegmentFile() != null) {
+ List<String> toBeDeletedIndexFiles = new ArrayList<>();
+ List<String> toBeDeletedDataFiles = new ArrayList<>();
+ // take the list of files from this segment.
+ SegmentFileStore fileStore =
+ new SegmentFileStore(table.getTablePath(), segment.getSegmentFile());
+ fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
+ if (forceDelete) {
+ deletePhysicalPartition(table.getTablePath(), partitionSpecs,
+ fileStore.segmentFile.locationMap);
+ }
+ for (Map.Entry<String, List<String>> entry : fileStore.indexFilesMap.entrySet()) {
+ String indexFile = entry.getKey();
+ // Check the partition information in the partiton mapper
+ Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
+ .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+ indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length()));
+ if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
+ toBeDeletedIndexFiles.add(indexFile);
+ // Add the corresponding carbondata files to the delete list.
+ toBeDeletedDataFiles.addAll(entry.getValue());
+ }
+ }
+ if (toBeDeletedIndexFiles.size() > 0) {
+ for (String dataFile : toBeDeletedIndexFiles) {
+ FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
+ }
+ for (String dataFile : toBeDeletedDataFiles) {
+ FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Deletes the segment file and its physical files like partition folders from disk
+ * @param tablePath
+ * @param segmentFile
+ * @param partitionSpecs
+ * @throws IOException
+ */
+ public static void deleteSegment(String tablePath, String segmentFile,
+ List<PartitionSpec> partitionSpecs) throws IOException {
+ SegmentFileStore fileStore = new SegmentFileStore(tablePath, segmentFile);
+ fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
+ Map<String, FolderDetails> locationMap = fileStore.getLocationMap();
+ Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+ for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+ FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey()));
+ for (String file : entry.getValue()) {
+ FileFactory.deleteFile(file, FileFactory.getFileType(file));
+ }
+ }
+ deletePhysicalPartition(tablePath, partitionSpecs, locationMap);
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFile;
+ // Deletes the physical segment file
+ FileFactory.deleteFile(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+ }
+
+ private static void deletePhysicalPartition(String tablePath, List<PartitionSpec> partitionSpecs,
+ Map<String, FolderDetails> locationMap) {
+ for (Map.Entry<String, FolderDetails> entry : locationMap.entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ boolean exists = pathExistsInPartitionSpec(partitionSpecs, new Path(location));
+ if (!exists) {
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location));
+ }
+ }
+ }
+
+ private static boolean pathExistsInPartitionSpec(List<PartitionSpec> partitionSpecs,
+ Path partitionPath) {
+ for (PartitionSpec spec : partitionSpecs) {
+ if (spec.getLocation().equals(partitionPath)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Move the loaded data from temp folder to respective partition folder.
+ * @param segmentFile
+ * @param tmpFolder
+ * @param tablePath
+ */
+ public static void moveFromTempFolder(SegmentFile segmentFile, String tmpFolder,
+ String tablePath) {
+
+ for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : segmentFile.getLocationMap()
+ .entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative()) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ CarbonFile oldFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tmpFolder);
+ CarbonFile[] oldFiles = oldFolder.listFiles();
+ for (CarbonFile file : oldFiles) {
+ file.renameForce(location + CarbonCommonConstants.FILE_SEPARATOR + file.getName());
+ }
+ oldFolder.delete();
+ }
+ }
+
+ /**
+ * Remove temp stage folder in case of job aborted.
+ *
+ * @param locationMap
+ * @param tmpFolder
+ * @param tablePath
+ */
+ public static void removeTempFolder(Map<String, FolderDetails> locationMap, String tmpFolder,
+ String tablePath) {
+ if (locationMap == null) {
+ return;
+ }
+ for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : locationMap.entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative()) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ CarbonFile oldFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tmpFolder);
+ if (oldFolder.exists()) {
+ FileFactory.deleteAllCarbonFilesOfDir(oldFolder);
+ }
+ }
+ }
+
+ /**
+ * Returns content of segment
+ * @return
+ */
+ public Map<String, FolderDetails> getLocationMap() {
+ if (segmentFile == null) {
+ return new HashMap<>();
+ }
+ return segmentFile.getLocationMap();
+ }
+
+ /**
+ * Returs the current partition specs of this segment
+ * @return
+ */
+ public List<PartitionSpec> getPartitionSpecs() {
+ List<PartitionSpec> partitionSpecs = new ArrayList<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+ partitionSpecs.add(new PartitionSpec(entry.getValue().partitions, location));
+ }
+ }
+ }
+ return partitionSpecs;
+ }
+
+ /**
+ * It contains the segment information like location, partitions and related index files
+ */
+ public static class SegmentFile implements Serializable {
+
+ private static final long serialVersionUID = 3582245668420401089L;
+
+ private Map<String, FolderDetails> locationMap;
+
+ public SegmentFile merge(SegmentFile mapper) {
+ if (this == mapper) {
+ return this;
+ }
+ if (locationMap != null && mapper.locationMap != null) {
+ for (Map.Entry<String, FolderDetails> entry : mapper.locationMap.entrySet()) {
+ FolderDetails folderDetails = locationMap.get(entry.getKey());
+ if (folderDetails != null) {
+ folderDetails.merge(entry.getValue());
+ } else {
+ locationMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ if (locationMap == null) {
+ locationMap = mapper.locationMap;
+ }
+ return this;
+ }
+
+ public Map<String, FolderDetails> getLocationMap() {
+ return locationMap;
+ }
+
+ public void setLocationMap(Map<String, FolderDetails> locationMap) {
+ this.locationMap = locationMap;
+ }
+ }
+
+ /**
+ * Represents one partition folder
+ */
+ public static class FolderDetails implements Serializable {
--- End diff --
I feel better we can keep as FolderDetails only because this concept can be extended normal table also.
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2589/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169628603
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---
@@ -161,9 +212,13 @@ private void readMergeFile(String mergeFilePath) throws IOException {
MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
List<String> file_names = indexHeader.getFile_names();
List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
+ CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
assert (file_names.size() == fileData.size());
for (int i = 0; i < file_names.size(); i++) {
carbonIndexMap.put(file_names.get(i), fileData.get(i).array());
+ carbonIndexMapWithFullPath.put(
+ mergeFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR
+ + file_names.get(i), fileData.get(i).array());
}
thriftReader.close();
--- End diff --
ok
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2563/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169543473
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
--- End diff --
Use try and finally here and call FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath)) inside finally block
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3802/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2581/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169627320
--- Diff: core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents one load of carbondata
+ */
+public class Segment implements Serializable {
+
+ private static final long serialVersionUID = 7044555408162234064L;
+
+ private String segmentId;
+
+ private String segmentFileName;
+
+ public Segment(String segmentId, String segmentFileName) {
+ this.segmentId = segmentId;
+ this.segmentFileName = segmentFileName;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public String getSegmentFileName() {
+ return segmentFileName;
+ }
+
+ public static List<Segment> toSegmentList(String[] segmentIds) {
+ List<Segment> list = new ArrayList<>(segmentIds.length);
+ for (String segmentId : segmentIds) {
+ list.add(toSegment(segmentId));
+ }
+ return list;
+ }
+
+ public static List<Segment> toSegmentList(List<String> segmentIds) {
+ List<Segment> list = new ArrayList<>(segmentIds.size());
+ for (String segmentId : segmentIds) {
+ list.add(toSegment(segmentId));
+ }
+ return list;
+ }
+
+ public static Segment toSegment(String segmentId) {
+ String[] split = segmentId.split("#");
+ if (split.length > 1) {
+ return new Segment(split[0], split[1]);
+ } else if (split.length > 0) {
+ return new Segment(split[0], null);
+ }
+ return new Segment(segmentId, null);
--- End diff --
Added
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3568/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2659/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3561/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3629/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3572/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2533/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169628065
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---
@@ -660,22 +650,23 @@ public boolean isScanRequired(FilterResolverIntf filterExp) {
return blocklets;
}
- @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) {
+ @Override
+ public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) {
if (unsafeMemoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
- // First get the partitions which are stored inside datamap.
- List<String> storedPartitions = getPartitions();
// if it has partitioned datamap but there is no partitioned information stored, it means
// partitions are dropped so return empty list.
- if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) {
- return new ArrayList<>();
- }
- if (storedPartitions != null && storedPartitions.size() > 0) {
+ if (partitions != null) {
+ // First get the partitions which are stored inside datamap.
+ String[] fileDetails = getFileDetails();
// Check the exact match of partition information inside the stored partitions.
boolean found = false;
- if (partitions != null && partitions.size() > 0) {
- found = partitions.containsAll(storedPartitions);
+ Path folderPath = new Path(fileDetails[0]);
+ for (PartitionSpec spec : partitions) {
+ if (folderPath.equals(spec.getLocation())) {
+ found = true;
--- End diff --
ok
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3564/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3823/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2610/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169628918
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
--- End diff --
ok
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3637/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3766/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3825/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3556/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3830/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3821/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3554/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3854/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2600/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3628/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3560/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169635618
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+ }
+ });
+ if (listFiles != null && listFiles.length > 0) {
+ SegmentFile localSegmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(spec.getPartitions());
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : listFiles) {
+ if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ locationMap.put(location, folderDetails);
+ localSegmentFile.setLocationMap(locationMap);
+ if (segmentFile == null) {
+ segmentFile = localSegmentFile;
+ } else {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ }
+ return segmentFile;
+ }
+
+ /**
+ * This method reads the segment file which is written in json format
+ *
+ * @param segmentFilePath
+ * @return
+ */
+ private SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
+ Gson gsonObjectToRead = new Gson();
+ DataInputStream dataInputStream = null;
+ BufferedReader buffReader = null;
+ InputStreamReader inStream = null;
+ SegmentFile segmentFile;
+ AtomicFileOperations fileOperation =
+ new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+
+ try {
+ if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
+ return null;
+ }
+ dataInputStream = fileOperation.openForRead();
+ inStream = new InputStreamReader(dataInputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ buffReader = new BufferedReader(inStream);
+ segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
+ } finally {
+ if (inStream != null) {
+ CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+ }
+ }
+
+ return segmentFile;
+ }
+
+ /**
+ * Reads segment file.
+ */
+ public void readSegment(String tablePath, String segmentFileName) throws IOException {
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFileName;
+ SegmentFile segmentFile = readSegmentFile(segmentFilePath);
+ this.tablePath = tablePath;
+ this.segmentFile = segmentFile;
+ }
+
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ /**
+ * Gets all the index files and related carbondata files from this segment. First user needs to
+ * call @readIndexFiles method before calling it.
+ * @return
+ */
+ public Map<String, List<String>> getIndexFilesMap() {
+ return indexFilesMap;
+ }
+
+ /**
+ * Reads all index files which are located in this segment. First user needs to call
+ * @readSegment method before calling it.
+ * @throws IOException
+ */
+ public void readIndexFiles() throws IOException {
+ readIndexFiles(SegmentStatus.SUCCESS, false);
+ }
+
+ /**
+ * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
+ * reads all index files
+ * @param status
+ * @param ignoreStatus
+ * @throws IOException
+ */
+ private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException {
+ if (indexFilesMap != null) {
+ return;
+ }
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ indexFilesMap = new HashMap<>();
+ indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+ Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+ List<DataFileFooter> indexInfo =
+ fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+ List<String> blocks = new ArrayList<>();
+ for (DataFileFooter footer : indexInfo) {
+ blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
+ }
+ indexFilesMap.put(entry.getKey(), blocks);
+ }
+ }
+
+ /**
+ * Gets all index files from this segment
+ * @return
+ */
+ public Map<String, String> getIndexFiles() {
+ Map<String, String> indexFiles = new HashMap<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+ for (String indexFile : entry.getValue().getFiles()) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+ entry.getValue().mergeFileName);
+ }
+ }
+ }
+ }
+ return indexFiles;
+ }
+
+ /**
+ * Drops the partition related files from the segment file of the segment and writes
+ * to a new file. First iterator over segment file and check the path it needs to be dropped.
+ * And update the status with delete if it found.
+ *
+ * @param uniqueId
+ * @throws IOException
+ */
+ public void dropPartitions(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs,
+ String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments)
+ throws IOException {
+ readSegment(tablePath, segment.getSegmentFileName());
+ boolean updateSegment = false;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ Path path = new Path(location);
+ // Update the status to delete if path equals
+ for (PartitionSpec spec : partitionSpecs) {
+ if (path.equals(spec.getLocation())) {
+ entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
+ updateSegment = true;
+ break;
+ }
+ }
+ }
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+ writePath =
+ writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentId() + "_" + uniqueId
+ + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, writePath);
+ // Check whether we can completly remove the segment.
+ boolean deleteSegment = true;
+ for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+ if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+ deleteSegment = false;
--- End diff --
ok
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3839/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2537/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169627561
--- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java ---
@@ -217,6 +217,6 @@ public void convertValue(ColumnPageValueConverter codec) {
@Override
public void freeMemory() {
-
+ byteArrayData = null;
--- End diff --
In decimal page it only uses byteArrayData, so not required
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3855/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by gvramana <gi...@git.apache.org>.
Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169392124
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Holds partition information.
+ */
+public class PartitionSpec implements Serializable {
+
+ private static final long serialVersionUID = 4828007433384867678L;
+
+ private List<String> partitions;
+
+ private transient Path locationPath;
+
+ private String location;
--- End diff --
what is difference between location and locationspec?
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3831/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3796/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3674/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3761/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3619/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3897/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3679/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3775/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3582/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3791/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3598/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169542459
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo, String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
--- End diff --
Move this complete if check inside below if check {if (carbonFiles != null && carbonFiles.length > 0)}
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2621/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169627117
--- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---
@@ -2449,5 +2496,40 @@ public static String encodeToString(byte[] bytes) throws UnsupportedEncodingExce
return updatedMinMaxValues;
}
+ /**
+ * Generate the blockid as per the block path
+ *
+ * @param identifier
+ * @param filePath
+ * @param segmentId
+ * @return
+ */
+ public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
+ String segmentId) {
+ String blockId;
+ String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
+ String tablePath = identifier.getTablePath();
+ if (filePath.startsWith(tablePath)) {
+ String factDir =
+ CarbonStorePath.getCarbonTablePath(tablePath, identifier.getCarbonTableIdentifier())
+ .getFactDir();
+ if (filePath.startsWith(factDir)) {
+ blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
+ + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+ } else {
+ String partitionDir =
+ filePath.substring(tablePath.length() + 1, filePath.length() - blockName.length() - 1);
+
+ blockId = partitionDir.replace("/", "#") + CarbonCommonConstants.FILE_SEPARATOR + "Segment_"
--- End diff --
Added comment
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2550/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3765/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169859849
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,689 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
--- End diff --
ok
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2577/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2548/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3681/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3901/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2523/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3807/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169619961
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Holds partition information.
+ */
+public class PartitionSpec implements Serializable {
+
+ private static final long serialVersionUID = 4828007433384867678L;
+
+ private List<String> partitions;
--- End diff --
It signifies column=value, I have added comment
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3904/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2609/
---
[GitHub] carbondata issue #1984: [WIP] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2540/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169540773
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---
@@ -79,6 +87,44 @@ public void readAllIIndexOfSegment(String segmentPath) throws IOException {
}
}
+ /**
+ * Read all index files and keep the cache in it.
+ *
+ * @param segmentFileStore
+ * @throws IOException
+ */
+ public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status,
+ boolean ignoreStatus) throws IOException {
+ List<CarbonFile> carbonIndexFiles = new ArrayList<>();
+ if (segmentFileStore.getLocationMap() == null) {
+ return;
+ }
+ for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore
+ .getLocationMap().entrySet()) {
+ String location = locations.getKey();
+ if (locations.getValue().isRelative()) {
+ location =
--- End diff --
This location variable fetching and value modification can be done after the below if check
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3866/
---
[GitHub] carbondata pull request #1984: [CARBONDATA-2187] Partition restructure
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169522942
--- Diff: core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java ---
@@ -17,91 +17,59 @@
package org.apache.carbondata.core.indexstore;
+import java.util.Objects;
+
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
/**
- * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment
+ * Class holds the absoluteTableIdentifier and segment to uniquely identify a segment
--- End diff --
Modify the comment to remove absoluteTableIdentifier
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3845/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/3856/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1984
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/2552/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3593/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3646/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3586/
---
[GitHub] carbondata issue #1984: [CARBONDATA-2187] Partition restructure
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1984
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/3612/
---