You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/14 17:50:31 UTC

[09/18] carbondata git commit: [CARBONDATA-1617] Fixed atomic file operations and output stream close in Carbon index merge files

[CARBONDATA-1617] Fixed atomic file operations and output stream close in Carbon index merge files

The merge file should be written to temp file and then renamed to actual after success.
And output stream close to Carbon index merge files

This closes #1461


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/17d07319
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/17d07319
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/17d07319

Branch: refs/heads/fgdatamap
Commit: 17d07319c4174b007a65f9a95f0ddd0ada57798d
Parents: b2b030e
Author: ravipesala <ra...@gmail.com>
Authored: Thu Nov 2 19:54:40 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Nov 14 22:04:18 2017 +0800

----------------------------------------------------------------------
 .../blockletindex/SegmentIndexFileStore.java    |  1 +
 .../core/writer/CarbonIndexFileMergeWriter.java | 27 +++++++++++---------
 2 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/17d07319/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 22d3d43..244e8bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -133,6 +133,7 @@ public class SegmentIndexFileStore {
     byte[] bytes = new byte[(int) indexFile.getSize()];
     dataInputStream.readFully(bytes);
     carbonIndexMap.put(indexFile.getName(), bytes);
+    dataInputStream.close();
   }
 
   private MergedBlockIndexHeader readMergeBlockIndexHeader(ThriftReader thriftReader)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17d07319/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index e19ab24..85f08cc 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -23,8 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.MergedBlockIndex;
@@ -43,17 +42,13 @@ public class CarbonIndexFileMergeWriter {
    * @throws IOException
    */
   public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws IOException {
-    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
-    CarbonFile[] indexFiles = carbonFile.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-        return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT);
-      }
-    });
-    if (indexFiles != null && indexFiles.length > 0) {
+    CarbonFile[] indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+    if (isCarbonIndexFilePresent(indexFiles)) {
       SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
       fileStore.readAllIIndexOfSegment(segmentPath);
-      openThriftWriter(segmentPath + "/" +
-          System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT);
+      openThriftWriter(
+          segmentPath + "/" +
+              System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT);
       Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
       MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
       MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
@@ -72,7 +67,15 @@ public class CarbonIndexFileMergeWriter {
         indexFile.delete();
       }
     }
+  }
 
+  private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) {
+    for (CarbonFile file : indexFiles) {
+      if (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        return true;
+      }
+    }
+    return false;
   }
 
   /**
@@ -103,7 +106,7 @@ public class CarbonIndexFileMergeWriter {
     // create thrift writer instance
     thriftWriter = new ThriftWriter(filePath, false);
     // open the file stream
-    thriftWriter.open();
+    thriftWriter.open(FileWriteOperation.OVERWRITE);
   }
 
   /**