You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by jackylk <gi...@git.apache.org> on 2018/07/02 16:10:12 UTC

[GitHub] carbondata pull request #2429: [CARBONDATA-2674][Streaming]Streaming with me...

Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2429#discussion_r199548100
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -341,21 +343,32 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
           long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
           long maxSize = getMaxSplitSize(job);
           for (Segment segment : streamSegments) {
    -        String segmentDir = CarbonTablePath.getSegmentPath(
    -            identifier.getTablePath(), segment.getSegmentNo());
    +        String segmentDir =
    +            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
             FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
             if (FileFactory.isFileExist(segmentDir, fileType)) {
    -          String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
    -          String indexPath = segmentDir + File.separator + indexName;
    -          CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
    -          // index file exists
    -          if (index.exists()) {
    -            // data file exists
    -            CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    +          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    +          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
    +          Set<Map.Entry<String, byte[]>> entries = carbonIndexMap.entrySet();
    +          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    +          for (Map.Entry<String, byte[]> entry : entries) {
    +            byte[] value = entry.getValue();
    --- End diff --
    
    give a better name, `value` is too generic


---