You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by QiangCai <gi...@git.apache.org> on 2018/08/17 11:25:12 UTC

[GitHub] carbondata pull request #2644: [WIP][CARBONDATA-2853] Implement file-level m...

GitHub user QiangCai opened a pull request:

    https://github.com/apache/carbondata/pull/2644

    [WIP][CARBONDATA-2853] Implement file-level min/max index for streaming segment

    Implement file-level min/max index to prune stream files in driver side.
    
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
    
     - [ ] Testing done
            Please provide details on 
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/QiangCai/carbondata stream_minmax

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2644.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2644
    
----
commit c8422231a738da282042c01ce51fa2c9250d3247
Author: QiangCai <qi...@...>
Date:   2018-08-17T11:07:13Z

    add file-level min/max index for streaming segement

----


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r211859543
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---
    @@ -334,6 +334,10 @@ public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) {
         return dataMap;
       }
     
    --- End diff --
    
    please add a testcase to test the bigint datatype, for overflow scenario


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement min/max index for stream...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    LGTM


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214313170
  
    --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala ---
    @@ -205,8 +205,9 @@ class StreamHandoffRDD[K, V](
         segmentList.add(Segment.toSegment(handOffSegmentId, null))
         val splits = inputFormat.getSplitsOfStreaming(
           job,
    -      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,
    -      segmentList
    +      segmentList,
    +      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
    +      null
    --- End diff --
    
    Once you add the overloaded method as explained in above comment you can call the method with 3 arguments from here


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214311953
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -342,60 +341,52 @@ public void refreshSegmentCacheIfRequired(JobContext job, CarbonTable carbonTabl
       /**
        * use file list in .carbonindex file to get the split of streaming.
        */
    -  public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
    -      List<Segment> streamSegments) throws IOException {
    +  public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
    +      CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException {
         List<InputSplit> splits = new ArrayList<InputSplit>();
         if (streamSegments != null && !streamSegments.isEmpty()) {
           numStreamSegments = streamSegments.size();
           long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
           long maxSize = getMaxSplitSize(job);
    -      for (Segment segment : streamSegments) {
    -        String segmentDir =
    -            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
    -        FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
    -        if (FileFactory.isFileExist(segmentDir, fileType)) {
    -          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    -          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    -          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
    -          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    -          for (byte[] fileData : carbonIndexMap.values()) {
    -            indexReader.openThriftReader(fileData);
    -            try {
    -              // map block index
    -              while (indexReader.hasNext()) {
    -                BlockIndex blockIndex = indexReader.readBlockIndexInfo();
    -                String filePath = segmentDir + File.separator + blockIndex.getFile_name();
    -                Path path = new Path(filePath);
    -                long length = blockIndex.getFile_size();
    -                if (length != 0) {
    -                  BlockLocation[] blkLocations;
    -                  FileSystem fs = FileFactory.getFileSystem(path);
    -                  FileStatus file = fs.getFileStatus(path);
    -                  blkLocations = fs.getFileBlockLocations(path, 0, length);
    -                  long blockSize = file.getBlockSize();
    -                  long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    -                  long bytesRemaining = length;
    -                  while (((double) bytesRemaining) / splitSize > 1.1) {
    -                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
    -                    splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
    -                        splitSize, blkLocations[blkIndex].getHosts(),
    -                        blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
    -                    bytesRemaining -= splitSize;
    -                  }
    -                  if (bytesRemaining != 0) {
    -                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
    -                    splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
    -                        bytesRemaining, blkLocations[blkIndex].getHosts(),
    -                        blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
    -                  }
    -                } else {
    -                  //Create empty hosts array for zero length files
    -                  splits.add(makeSplit(segment.getSegmentNo(), path, 0, length, new String[0],
    -                      FileFormat.ROW_V1));
    -                }
    -              }
    -            } finally {
    -              indexReader.closeThriftReader();
    +
    +      if (filterResolverIntf == null) {
    +        if (carbonTable != null) {
    +          Expression filter = getFilterPredicates(job.getConfiguration());
    +          if (filter != null) {
    +            carbonTable.processFilterExpression(filter, null, null);
    +            filterResolverIntf = carbonTable.resolveFilter(filter);
    +          }
    +        }
    +      }
    +      StreamDataMap streamDataMap =
    +          DataMapStoreManager.getInstance().getStreamDataMap(carbonTable);
    +      streamDataMap.init(filterResolverIntf);
    +      List<StreamFile> streamFiles = streamDataMap.prune(streamSegments);
    +      for (StreamFile streamFile : streamFiles) {
    +        if (FileFactory.isFileExist(streamFile.getFilePath())) {
    +          Path path = new Path(streamFile.getFilePath());
    +          long length = streamFile.getFileSize();
    +          if (length != 0) {
    +            BlockLocation[] blkLocations;
    +            FileSystem fs = FileFactory.getFileSystem(path);
    +            FileStatus file = fs.getFileStatus(path);
    +            blkLocations = fs.getFileBlockLocations(path, 0, length);
    +            long blockSize = file.getBlockSize();
    +            long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    +            long bytesRemaining = length;
    +            while (((double) bytesRemaining) / splitSize > 1.1) {
    --- End diff --
    
    Please add a comment here to clearly explain the logic of using 1.1 and explain the size computation


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/173/



---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214316607
  
    --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java ---
    @@ -212,9 +271,13 @@ private void initializeAtFirstRow() throws IOException, InterruptedException {
                 byte[] col = (byte[]) columnValue;
                 output.writeShort(col.length);
                 output.writeBytes(col);
    +            dimensionStatsCollectors[dimCount].update(col);
               } else {
                 output.writeInt((int) columnValue);
    +            dimensionStatsCollectors[dimCount].update(ByteUtil.toBytes((int) columnValue));
    --- End diff --
    
    For min/max comparison you are converting from Int to byte array for all the rows. This can impact the writing performance. Instead you can typecast into Int and do the comparison. After all the data is loaded then at the end you can convert all the values into byte array based on datatype. At that time it will be only one conversion for the final min/max values


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement min/max index for stream...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8284/



---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214886142
  
    --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java ---
    @@ -212,9 +213,13 @@ private void initializeAtFirstRow() throws IOException, InterruptedException {
                 byte[] col = (byte[]) columnValue;
                 output.writeShort(col.length);
                 output.writeBytes(col);
    +            output.dimStatsCollectors[dimCount].update(col);
               } else {
                 output.writeInt((int) columnValue);
    +            output.dimStatsCollectors[dimCount].update(ByteUtil.toBytes((int) columnValue));
    --- End diff --
    
    update(int ) isn't be implemented in KeyPageStatsCollector


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6405/



---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r211852629
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamFile.java ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
    +
    +public class StreamFile {
    --- End diff --
    
    For all public class, please add interface annotation


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214889120
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +
    +@InterfaceAudience.Internal
    +public class StreamDataMap {
    +
    +  private CarbonTable carbonTable;
    +  private FilterExecuter filterExecuter;
    +
    +  public StreamDataMap(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +  }
    +
    +  public void init(FilterResolverIntf filterExp) {
    +    if (filterExp != null) {
    +      // cache all columns
    +      List<CarbonColumn> minMaxCacheColumns = new ArrayList<>();
    +      for (CarbonDimension dimension : carbonTable.getDimensions()) {
    +        if (!dimension.isComplex()) {
    +          minMaxCacheColumns.add(dimension);
    +        }
    +      }
    +      minMaxCacheColumns.addAll(carbonTable.getMeasures());
    +      // prepare cardinality of all dimensions
    +      List<ColumnSchema> listOfColumns =
    +          carbonTable.getTableInfo().getFactTable().getListOfColumns();
    +      int[] columnCardinality = new int[listOfColumns.size()];
    +      for (int index = 0; index < columnCardinality.length; index++) {
    +        columnCardinality[index] = Integer.MAX_VALUE;
    +      }
    +      // initial filter executor
    +      SegmentProperties segmentProperties =
    +          new SegmentProperties(listOfColumns, columnCardinality);
    +      filterExecuter = FilterUtil.getFilterExecuterTree(
    +          filterExp, segmentProperties, null, minMaxCacheColumns);
    +    }
    +  }
    +
    +  public List<StreamFile> prune(List<Segment> segments) throws IOException {
    +    if (filterExecuter == null) {
    +      // if filter is null, list all steam files
    +      return listAllStreamFiles(segments, false);
    +    } else {
    +      List<StreamFile> streamFileList = new ArrayList<>();
    +      for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
    +        if (isScanRequire(streamFile)) {
    +          // if stream file is required to scan
    +          streamFileList.add(streamFile);
    +          streamFile.setMinMaxIndex(null);
    +        }
    +      }
    +      return streamFileList;
    +    }
    +  }
    +
    +  private boolean isScanRequire(StreamFile streamFile) {
    +    // backward compatibility, old stream file without min/max index
    +    if (streamFile.getMinMaxIndex() == null) {
    +      return true;
    +    }
    +    byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues();
    +    byte[][] minValue = streamFile.getMinMaxIndex().getMinValues();
    +    BitSet bitSet = filterExecuter.isScanRequired(maxValue, minValue);
    +    if (!bitSet.isEmpty()) {
    +      return true;
    +    } else {
    +      return false;
    +    }
    +  }
    +
    +  // TODO optimize and move the code to StreamSegment , but it's in the streaming module.
    +  private List<StreamFile> listAllStreamFiles(List<Segment> segments, boolean withMinMax)
    +      throws IOException {
    +    List<StreamFile> streamFileList = new ArrayList<>();
    +    for (Segment segment : segments) {
    +      String segmentDir = CarbonTablePath.getSegmentPath(
    +          carbonTable.getAbsoluteTableIdentifier().getTablePath(), segment.getSegmentNo());
    +      FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
    +      if (FileFactory.isFileExist(segmentDir, fileType)) {
    +        SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    +        segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    --- End diff --
    
    fixed


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214889642
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -339,63 +338,62 @@ public void refreshSegmentCacheIfRequired(JobContext job, CarbonTable carbonTabl
         return filteredSegmentToAccess;
       }
     
    +  public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
    +      CarbonTable carbonTable) throws IOException {
    +    return getSplitsOfStreaming(job, streamSegments, carbonTable, null);
    +  }
    +
       /**
        * use file list in .carbonindex file to get the split of streaming.
        */
    -  public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
    -      List<Segment> streamSegments) throws IOException {
    +  public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
    +      CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException {
         List<InputSplit> splits = new ArrayList<InputSplit>();
         if (streamSegments != null && !streamSegments.isEmpty()) {
           numStreamSegments = streamSegments.size();
           long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
           long maxSize = getMaxSplitSize(job);
    -      for (Segment segment : streamSegments) {
    -        String segmentDir =
    -            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
    -        FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
    -        if (FileFactory.isFileExist(segmentDir, fileType)) {
    -          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    -          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    -          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
    -          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    -          for (byte[] fileData : carbonIndexMap.values()) {
    -            indexReader.openThriftReader(fileData);
    -            try {
    -              // map block index
    -              while (indexReader.hasNext()) {
    -                BlockIndex blockIndex = indexReader.readBlockIndexInfo();
    -                String filePath = segmentDir + File.separator + blockIndex.getFile_name();
    -                Path path = new Path(filePath);
    -                long length = blockIndex.getFile_size();
    -                if (length != 0) {
    -                  BlockLocation[] blkLocations;
    -                  FileSystem fs = FileFactory.getFileSystem(path);
    -                  FileStatus file = fs.getFileStatus(path);
    -                  blkLocations = fs.getFileBlockLocations(path, 0, length);
    -                  long blockSize = file.getBlockSize();
    -                  long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    -                  long bytesRemaining = length;
    -                  while (((double) bytesRemaining) / splitSize > 1.1) {
    -                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
    -                    splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
    -                        splitSize, blkLocations[blkIndex].getHosts(),
    -                        blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
    -                    bytesRemaining -= splitSize;
    -                  }
    -                  if (bytesRemaining != 0) {
    -                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
    -                    splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
    -                        bytesRemaining, blkLocations[blkIndex].getHosts(),
    -                        blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
    -                  }
    -                } else {
    -                  //Create empty hosts array for zero length files
    -                  splits.add(makeSplit(segment.getSegmentNo(), path, 0, length, new String[0],
    -                      FileFormat.ROW_V1));
    -                }
    -              }
    -            } finally {
    -              indexReader.closeThriftReader();
    +      if (filterResolverIntf == null) {
    +        if (carbonTable != null) {
    +          Expression filter = getFilterPredicates(job.getConfiguration());
    +          if (filter != null) {
    +            carbonTable.processFilterExpression(filter, null, null);
    +            filterResolverIntf = carbonTable.resolveFilter(filter);
    +          }
    +        }
    +      }
    +      StreamDataMap streamDataMap =
    +          DataMapStoreManager.getInstance().getStreamDataMap(carbonTable);
    +      streamDataMap.init(filterResolverIntf);
    +      List<StreamFile> streamFiles = streamDataMap.prune(streamSegments);
    +
    +      for (StreamFile streamFile : streamFiles) {
    +        if (FileFactory.isFileExist(streamFile.getFilePath())) {
    --- End diff --
    
    fixed


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7959/



---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214877452
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +
    +@InterfaceAudience.Internal
    +public class StreamDataMap {
    --- End diff --
    
    I don't think it is inline with datamap interfaces. Either use datamap interface or remove the datamap name from class.


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214883334
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -339,63 +338,62 @@ public void refreshSegmentCacheIfRequired(JobContext job, CarbonTable carbonTabl
         return filteredSegmentToAccess;
       }
     
    +  public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
    +      CarbonTable carbonTable) throws IOException {
    +    return getSplitsOfStreaming(job, streamSegments, carbonTable, null);
    +  }
    +
       /**
        * use file list in .carbonindex file to get the split of streaming.
        */
    -  public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
    -      List<Segment> streamSegments) throws IOException {
    +  public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
    +      CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException {
         List<InputSplit> splits = new ArrayList<InputSplit>();
         if (streamSegments != null && !streamSegments.isEmpty()) {
           numStreamSegments = streamSegments.size();
           long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
           long maxSize = getMaxSplitSize(job);
    -      for (Segment segment : streamSegments) {
    -        String segmentDir =
    -            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
    -        FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
    -        if (FileFactory.isFileExist(segmentDir, fileType)) {
    -          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    -          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    -          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
    -          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    -          for (byte[] fileData : carbonIndexMap.values()) {
    -            indexReader.openThriftReader(fileData);
    -            try {
    -              // map block index
    -              while (indexReader.hasNext()) {
    -                BlockIndex blockIndex = indexReader.readBlockIndexInfo();
    -                String filePath = segmentDir + File.separator + blockIndex.getFile_name();
    -                Path path = new Path(filePath);
    -                long length = blockIndex.getFile_size();
    -                if (length != 0) {
    -                  BlockLocation[] blkLocations;
    -                  FileSystem fs = FileFactory.getFileSystem(path);
    -                  FileStatus file = fs.getFileStatus(path);
    -                  blkLocations = fs.getFileBlockLocations(path, 0, length);
    -                  long blockSize = file.getBlockSize();
    -                  long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    -                  long bytesRemaining = length;
    -                  while (((double) bytesRemaining) / splitSize > 1.1) {
    -                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
    -                    splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
    -                        splitSize, blkLocations[blkIndex].getHosts(),
    -                        blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
    -                    bytesRemaining -= splitSize;
    -                  }
    -                  if (bytesRemaining != 0) {
    -                    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
    -                    splits.add(makeSplit(segment.getSegmentNo(), path, length - bytesRemaining,
    -                        bytesRemaining, blkLocations[blkIndex].getHosts(),
    -                        blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
    -                  }
    -                } else {
    -                  //Create empty hosts array for zero length files
    -                  splits.add(makeSplit(segment.getSegmentNo(), path, 0, length, new String[0],
    -                      FileFormat.ROW_V1));
    -                }
    -              }
    -            } finally {
    -              indexReader.closeThriftReader();
    +      if (filterResolverIntf == null) {
    +        if (carbonTable != null) {
    +          Expression filter = getFilterPredicates(job.getConfiguration());
    +          if (filter != null) {
    +            carbonTable.processFilterExpression(filter, null, null);
    +            filterResolverIntf = carbonTable.resolveFilter(filter);
    +          }
    +        }
    +      }
    +      StreamDataMap streamDataMap =
    +          DataMapStoreManager.getInstance().getStreamDataMap(carbonTable);
    +      streamDataMap.init(filterResolverIntf);
    +      List<StreamFile> streamFiles = streamDataMap.prune(streamSegments);
    +
    +      for (StreamFile streamFile : streamFiles) {
    +        if (FileFactory.isFileExist(streamFile.getFilePath())) {
    --- End diff --
    
    Why do you need to do file exists check here? Please remove if not needed, will be costly for s3


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214887830
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +
    +@InterfaceAudience.Internal
    +public class StreamDataMap {
    --- End diff --
    
    fixed


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r213613888
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +
    +@InterfaceAudience.Internal
    +public class StreamDataMap {
    +
    +  private CarbonTable carbonTable;
    +
    +  private AbsoluteTableIdentifier identifier;
    +
    +  private FilterExecuter filterExecuter;
    +
    +  public StreamDataMap(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +    this.identifier = carbonTable.getAbsoluteTableIdentifier();
    +  }
    +
    +  public void init(FilterResolverIntf filterExp) {
    +    if (filterExp != null) {
    +
    +      List<CarbonColumn> minMaxCacheColumns = new ArrayList<>();
    +      for (CarbonDimension dimension : carbonTable.getDimensions()) {
    +        if (!dimension.isComplex()) {
    +          minMaxCacheColumns.add(dimension);
    +        }
    +      }
    +      minMaxCacheColumns.addAll(carbonTable.getMeasures());
    +
    +      List<ColumnSchema> listOfColumns =
    +          carbonTable.getTableInfo().getFactTable().getListOfColumns();
    +      int[] columnCardinality = new int[listOfColumns.size()];
    +      for (int index = 0; index < columnCardinality.length; index++) {
    +        columnCardinality[index] = Integer.MAX_VALUE;
    +      }
    +
    +      SegmentProperties segmentProperties =
    +          new SegmentProperties(listOfColumns, columnCardinality);
    +
    +      filterExecuter = FilterUtil.getFilterExecuterTree(
    +          filterExp, segmentProperties, null, minMaxCacheColumns);
    +    }
    +  }
    +
    +  public List<StreamFile> prune(List<Segment> segments) throws IOException {
    +    if (filterExecuter == null) {
    +      return listAllStreamFiles(segments, false);
    +    } else {
    +      List<StreamFile> streamFileList = new ArrayList<>();
    +      for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
    +        if (isScanRequire(streamFile)) {
    +          streamFileList.add(streamFile);
    +          streamFile.setMinMaxIndex(null);
    +        }
    +      }
    +      return streamFileList;
    +    }
    +  }
    +
    +  private boolean isScanRequire(StreamFile streamFile) {
    +    // backward compatibility, old stream file without min/max index
    +    if (streamFile.getMinMaxIndex() == null) {
    +      return true;
    +    }
    +
    +    byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues();
    +    byte[][] minValue = streamFile.getMinMaxIndex().getMinValues();
    +    BitSet bitSet;
    +    if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
    --- End diff --
    
    What does this for?


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r211868981
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +
    +public class StreamDataMap {
    +
    +  private CarbonTable carbonTable;
    +
    +  private AbsoluteTableIdentifier identifier;
    +
    +  private FilterExecuter filterExecuter;
    +
    +  public StreamDataMap(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +    this.identifier = carbonTable.getAbsoluteTableIdentifier();
    +  }
    +
    +  public void init(FilterResolverIntf filterExp) {
    +    if (filterExp != null) {
    +
    +      List<CarbonColumn> minMaxCacheColumns = new ArrayList<>();
    +      for (CarbonDimension dimension : carbonTable.getDimensions()) {
    +        if (!dimension.isComplex()) {
    +          minMaxCacheColumns.add(dimension);
    +        }
    +      }
    +      minMaxCacheColumns.addAll(carbonTable.getMeasures());
    +
    +      List<ColumnSchema> listOfColumns =
    +          carbonTable.getTableInfo().getFactTable().getListOfColumns();
    +      int[] columnCardinality = new int[listOfColumns.size()];
    +      for (int index = 0; index < columnCardinality.length; index++) {
    +        columnCardinality[index] = Integer.MAX_VALUE;
    +      }
    +
    +      SegmentProperties segmentProperties =
    +          new SegmentProperties(listOfColumns, columnCardinality);
    +
    +      filterExecuter = FilterUtil.getFilterExecuterTree(
    +          filterExp, segmentProperties, null, minMaxCacheColumns);
    +    }
    +  }
    +
    +  public List<StreamFile> prune(List<Segment> segments) throws IOException {
    +    if (filterExecuter == null) {
    +      return listAllStreamFiles(segments, false);
    +    } else {
    +      List<StreamFile> streamFileList = new ArrayList<>();
    +      for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
    +        if (hitStreamFile(streamFile)) {
    +          streamFileList.add(streamFile);
    +          streamFile.setMinMaxIndex(null);
    +        }
    +      }
    +      return streamFileList;
    +    }
    +  }
    +
    +  private boolean hitStreamFile(StreamFile streamFile) {
    --- End diff --
    
    I think change the name to 'isScanRequired' is better.


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8193/



---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214311329
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -342,60 +341,52 @@ public void refreshSegmentCacheIfRequired(JobContext job, CarbonTable carbonTabl
       /**
        * use file list in .carbonindex file to get the split of streaming.
        */
    -  public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
    -      List<Segment> streamSegments) throws IOException {
    +  public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
    +      CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException {
    --- End diff --
    
    You can write an overloaded method for getSplitsOfStreaming. One which accepts 3 parameters and one with 4 parameters.
    1.  getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,List<Segment> streamSegments)
    -- From this method you can the other method and pass null as the 4th argument. This will avoid passing null at all places above.
    2. getSplitsOfStreaming(JobContext job, List<Segment> streamSegments, CarbonTable carbonTable, FilterResolverIntf filterResolverIntf)


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r211852492
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +
    +public class StreamDataMap {
    --- End diff --
    
    add interface annotation


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r211855812
  
    --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.streaming.index;
    +
    +import java.io.Serializable;
    +
    +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +
    +public class StreamFileIndex implements Serializable {
    +
    +  private String fileName;
    --- End diff --
    
    please describe the content of `fileName`, whether it includes the whole path


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement min/max index for stream...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/232/



---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r211855246
  
    --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java ---
    @@ -171,9 +179,57 @@ private void initializeAtFirstRow() throws IOException, InterruptedException {
           writeFileHeader();
         }
     
    +    initializeStatsCollector();
    +
         isFirstRow = false;
       }
     
    +  private void initializeStatsCollector() {
    +    // initialize
    --- End diff --
    
    please explain why the length is isNoDictionaryDimensionColumn.length


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8255/



---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/122/



---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement min/max index for stream...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/213/



---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8063/



---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r211853068
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java ---
    @@ -669,4 +666,44 @@ public static int putBytes(byte[] tgtBytes, int tgtOffset, byte[] srcBytes, int
         return flattenedData;
       }
     
    +  /**
    +   * perform XOR operation on the value, and convert it to byte array for sorting
    +   */
    +  public static byte[] toXorBytes(short val) {
    +    val = (short)(val ^ Short.MIN_VALUE);
    +    return toBytes(val);
    +  }
    +
    +  public static byte[] toXorBytes(int val) {
    +    val = val ^ Integer.MIN_VALUE;
    +    return toBytes(val);
    +  }
    +
    +  public static byte[] toXorBytes(long val) {
    +    val = val ^ Long.MIN_VALUE;
    +    return toBytes(val);
    +  }
    +
    +  public static byte[] toXorBytes(double val) {
    +    return toXorBytes(Double.doubleToLongBits(val));
    +  }
    +
    +  /**
    +   * convert byte array to the value, perform XOR operation on it to recover the real value
    +   */
    +  public static short toXorShort(byte[] bytes, int offset, final int length) {
    +    return (short)(toShort(bytes, offset, length) ^ Short.MIN_VALUE);
    +  }
    +
    +  public static int toXorInt(byte[] bytes, int offset, final int length) {
    --- End diff --
    
    please add comment


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214887850
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---
    @@ -360,6 +360,10 @@ private String getKeyUsingTablePath(String tablePath) {
         return null;
       }
     
    +  public StreamDataMap getStreamDataMap(CarbonTable table) {
    --- End diff --
    
    fixed


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/5/



---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    @ravipesala 
    the blocklet level min/max will be added in another PR.


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r211805649
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java ---
    @@ -324,13 +324,13 @@ public static Object getDataBasedOnDataType(String data, DataType actualDataType
         if (actualDataType == DataTypes.BOOLEAN) {
           return ByteUtil.toBytes(BooleanConvert.parseBoolean(dimensionValue));
         } else if (actualDataType == DataTypes.SHORT) {
    -      return ByteUtil.toBytes(Short.parseShort(dimensionValue));
    +      return ByteUtil.toXorBytes(Short.parseShort(dimensionValue));
    --- End diff --
    
    Will this affect the legacy store?
    It seems that a value will be encoded differently before and after this modification. If somewhere has used this method to encode and store data before, it will be a problem.


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214893284
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---
    @@ -342,60 +341,52 @@ public void refreshSegmentCacheIfRequired(JobContext job, CarbonTable carbonTabl
       /**
        * use file list in .carbonindex file to get the split of streaming.
        */
    -  public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
    -      List<Segment> streamSegments) throws IOException {
    +  public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,
    +      CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException {
    --- End diff --
    
    fixed


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214881265
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +
    +@InterfaceAudience.Internal
    +public class StreamDataMap {
    +
    +  private CarbonTable carbonTable;
    +  private FilterExecuter filterExecuter;
    +
    +  public StreamDataMap(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +  }
    +
    +  public void init(FilterResolverIntf filterExp) {
    +    if (filterExp != null) {
    +      // cache all columns
    +      List<CarbonColumn> minMaxCacheColumns = new ArrayList<>();
    +      for (CarbonDimension dimension : carbonTable.getDimensions()) {
    +        if (!dimension.isComplex()) {
    +          minMaxCacheColumns.add(dimension);
    +        }
    +      }
    +      minMaxCacheColumns.addAll(carbonTable.getMeasures());
    +      // prepare cardinality of all dimensions
    +      List<ColumnSchema> listOfColumns =
    +          carbonTable.getTableInfo().getFactTable().getListOfColumns();
    +      int[] columnCardinality = new int[listOfColumns.size()];
    +      for (int index = 0; index < columnCardinality.length; index++) {
    +        columnCardinality[index] = Integer.MAX_VALUE;
    +      }
    +      // initial filter executor
    +      SegmentProperties segmentProperties =
    +          new SegmentProperties(listOfColumns, columnCardinality);
    +      filterExecuter = FilterUtil.getFilterExecuterTree(
    +          filterExp, segmentProperties, null, minMaxCacheColumns);
    +    }
    +  }
    +
    +  public List<StreamFile> prune(List<Segment> segments) throws IOException {
    +    if (filterExecuter == null) {
    +      // if filter is null, list all steam files
    +      return listAllStreamFiles(segments, false);
    +    } else {
    +      List<StreamFile> streamFileList = new ArrayList<>();
    +      for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
    +        if (isScanRequire(streamFile)) {
    +          // if stream file is required to scan
    +          streamFileList.add(streamFile);
    +          streamFile.setMinMaxIndex(null);
    +        }
    +      }
    +      return streamFileList;
    +    }
    +  }
    +
    +  private boolean isScanRequire(StreamFile streamFile) {
    +    // backward compatibility, old stream file without min/max index
    +    if (streamFile.getMinMaxIndex() == null) {
    +      return true;
    +    }
    +    byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues();
    +    byte[][] minValue = streamFile.getMinMaxIndex().getMinValues();
    +    BitSet bitSet = filterExecuter.isScanRequired(maxValue, minValue);
    +    if (!bitSet.isEmpty()) {
    +      return true;
    +    } else {
    +      return false;
    +    }
    +  }
    +
    +  // TODO optimize and move the code to StreamSegment , but it's in the streaming module.
    +  private List<StreamFile> listAllStreamFiles(List<Segment> segments, boolean withMinMax)
    +      throws IOException {
    +    List<StreamFile> streamFileList = new ArrayList<>();
    +    for (Segment segment : segments) {
    +      String segmentDir = CarbonTablePath.getSegmentPath(
    +          carbonTable.getAbsoluteTableIdentifier().getTablePath(), segment.getSegmentNo());
    +      FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
    +      if (FileFactory.isFileExist(segmentDir, fileType)) {
    +        SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
    +        segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    --- End diff --
    
    It involves the listing of a folder and gets the index file. Its a costly operation for s3 file system as you are not even caching it. So it is better fix the carbonindex file name in case of streaming and read it directly instead of listing.


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214307411
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +
    +@InterfaceAudience.Internal
    +public class StreamDataMap {
    +
    +  private CarbonTable carbonTable;
    +
    +  private AbsoluteTableIdentifier identifier;
    +
    +  private FilterExecuter filterExecuter;
    +
    +  public StreamDataMap(CarbonTable carbonTable) {
    +    this.carbonTable = carbonTable;
    +    this.identifier = carbonTable.getAbsoluteTableIdentifier();
    +  }
    +
    +  public void init(FilterResolverIntf filterExp) {
    +    if (filterExp != null) {
    +
    +      List<CarbonColumn> minMaxCacheColumns = new ArrayList<>();
    +      for (CarbonDimension dimension : carbonTable.getDimensions()) {
    +        if (!dimension.isComplex()) {
    +          minMaxCacheColumns.add(dimension);
    +        }
    +      }
    +      minMaxCacheColumns.addAll(carbonTable.getMeasures());
    +
    +      List<ColumnSchema> listOfColumns =
    +          carbonTable.getTableInfo().getFactTable().getListOfColumns();
    +      int[] columnCardinality = new int[listOfColumns.size()];
    +      for (int index = 0; index < columnCardinality.length; index++) {
    +        columnCardinality[index] = Integer.MAX_VALUE;
    +      }
    +
    +      SegmentProperties segmentProperties =
    +          new SegmentProperties(listOfColumns, columnCardinality);
    +
    +      filterExecuter = FilterUtil.getFilterExecuterTree(
    +          filterExp, segmentProperties, null, minMaxCacheColumns);
    +    }
    +  }
    +
    +  public List<StreamFile> prune(List<Segment> segments) throws IOException {
    +    if (filterExecuter == null) {
    +      return listAllStreamFiles(segments, false);
    +    } else {
    +      List<StreamFile> streamFileList = new ArrayList<>();
    +      for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
    +        if (isScanRequire(streamFile)) {
    +          streamFileList.add(streamFile);
    +          streamFile.setMinMaxIndex(null);
    +        }
    +      }
    +      return streamFileList;
    +    }
    +  }
    +
    +  private boolean isScanRequire(StreamFile streamFile) {
    +    // backward compatibility, old stream file without min/max index
    +    if (streamFile.getMinMaxIndex() == null) {
    +      return true;
    +    }
    +
    +    byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues();
    +    byte[][] minValue = streamFile.getMinMaxIndex().getMinValues();
    +    BitSet bitSet;
    +    if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
    +      String filePath = streamFile.getFilePath();
    +      String uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 1);
    +      bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
    +          .isFilterValuesPresentInBlockOrBlocklet(maxValue, minValue, uniqueBlockPath);
    +    } else {
    +      bitSet = filterExecuter.isScanRequired(maxValue, minValue);
    +    }
    --- End diff --
    
    No Need to check for if (filterExecuter instanceof ImplicitColumnFilterExecutor)
    You can directly call
    bitSet = filterExecuter.isScanRequired(maxValue, minValue);


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214882605
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamFile.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
    +
    +@InterfaceAudience.Internal
    +public class StreamFile {
    +
    +  private String segmentNo;
    +
    +  private String filePath;
    +
    +  private long fileSize;
    +
    +  private BlockletMinMaxIndex minMaxIndex;
    +
    +  public StreamFile(String segmentNo, String filePath, long fileSize) {
    +    this.segmentNo = segmentNo;
    +    this.filePath = filePath;
    +    this.fileSize = fileSize;
    +  }
    +
    +  public String getSegmentNo() {
    +    return segmentNo;
    +  }
    +
    +  public void setSegmentNo(String segmentNo) {
    --- End diff --
    
    Please remove unused setters


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by xuchuanyin <gi...@git.apache.org>.
Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r211854887
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java ---
    @@ -669,4 +666,44 @@ public static int putBytes(byte[] tgtBytes, int tgtOffset, byte[] srcBytes, int
         return flattenedData;
       }
     
    +  /**
    +   * perform XOR operation on the value, and convert it to byte array for sorting
    --- End diff --
    
    please add description for this method and the scenario


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    retest this please


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214303472
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +
    +@InterfaceAudience.Internal
    +public class StreamDataMap {
    +
    +  private CarbonTable carbonTable;
    +
    +  private AbsoluteTableIdentifier identifier;
    --- End diff --
    
    If carbonTable is getting stored then no need to store identifier...you can get it from carbontable


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214310465
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java ---
    @@ -96,14 +96,35 @@ private static FileFooter3 getFileFooter3(List<BlockletInfo3> infoList,
         return footer;
       }
     
    -  public static BlockletIndex getBlockletIndex(
    -      org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex info) {
    +  public static org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex
    +      convertExternalMinMaxIndex(BlockletMinMaxIndex minMaxIndex) {
    --- End diff --
    
    please add a method comment to explain what is meaning of convertExternalMinMaxIndex


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/184/



---

[GitHub] carbondata issue #2644: [WIP][CARBONDATA-2853] Implement file-level min/max ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/7944/



---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6683/



---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214877247
  
    --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java ---
    @@ -212,9 +213,13 @@ private void initializeAtFirstRow() throws IOException, InterruptedException {
                 byte[] col = (byte[]) columnValue;
                 output.writeShort(col.length);
                 output.writeBytes(col);
    +            output.dimStatsCollectors[dimCount].update(col);
               } else {
                 output.writeInt((int) columnValue);
    +            output.dimStatsCollectors[dimCount].update(ByteUtil.toBytes((int) columnValue));
    --- End diff --
    
    I think dictionary case never come for streaming. Anyway please use `output.dimStatsCollectors[dimCount].update((int) columnValue)` instead of converting binary


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214889635
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamFile.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
    +
    +@InterfaceAudience.Internal
    +public class StreamFile {
    +
    +  private String segmentNo;
    +
    +  private String filePath;
    +
    +  private long fileSize;
    +
    +  private BlockletMinMaxIndex minMaxIndex;
    +
    +  public StreamFile(String segmentNo, String filePath, long fileSize) {
    +    this.segmentNo = segmentNo;
    +    this.filePath = filePath;
    +    this.fileSize = fileSize;
    +  }
    +
    +  public String getSegmentNo() {
    +    return segmentNo;
    +  }
    +
    +  public void setSegmentNo(String segmentNo) {
    --- End diff --
    
    fixed


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/carbondata/pull/2644


---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214305126
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.core.datamap;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
    +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import org.apache.carbondata.core.reader.CarbonIndexFileReader;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.CarbonMetadataUtil;
    +import org.apache.carbondata.core.util.path.CarbonTablePath;
    +import org.apache.carbondata.format.BlockIndex;
    +
    +@InterfaceAudience.Internal
    +public class StreamDataMap {
    --- End diff --
    
    Please check the feasibility if we can extend DataMap interface and implement all its method to keep it similar like BlockDataMap. I think it should be feasible


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    @QiangCai ....In General I can see that you put empty lines at many places in the code. Please remove those empty lines everywhere and add some code comments for better understanding


---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement min/max index for stream...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8302/



---

[GitHub] carbondata issue #2644: [CARBONDATA-2853] Implement file-level min/max index...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/8244/



---

[GitHub] carbondata issue #2644: [WIP][CARBONDATA-2853] Implement file-level min/max ...

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/6667/



---

[GitHub] carbondata issue #2644: [WIP][CARBONDATA-2853] Implement file-level min/max ...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2644
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/6295/



---

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement min/max index for...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214877591
  
    --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---
    @@ -360,6 +360,10 @@ private String getKeyUsingTablePath(String tablePath) {
         return null;
       }
     
    +  public StreamDataMap getStreamDataMap(CarbonTable table) {
    --- End diff --
    
    Please remove from DataMap manager if you are not inline with datamap intefaces


---