You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by jackylk <gi...@git.apache.org> on 2018/07/02 16:10:12 UTC
[GitHub] carbondata pull request #2429: [CARBONDATA-2674][Streaming]Streaming with me...
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2429#discussion_r199548100
--- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
@@ -341,21 +343,32 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
for (Segment segment : streamSegments) {
- String segmentDir = CarbonTablePath.getSegmentPath(
- identifier.getTablePath(), segment.getSegmentNo());
+ String segmentDir =
+ CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
if (FileFactory.isFileExist(segmentDir, fileType)) {
- String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
- String indexPath = segmentDir + File.separator + indexName;
- CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
- // index file exists
- if (index.exists()) {
- // data file exists
- CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+ SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
+ segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
+ Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
+ Set<Map.Entry<String, byte[]>> entries = carbonIndexMap.entrySet();
+ CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+ for (Map.Entry<String, byte[]> entry : entries) {
+ byte[] value = entry.getValue();
--- End diff --
give a better name, `value` is too generic
---