You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/29 06:21:12 UTC

[carbondata] branch master updated: [CARBONDATA-3272]fix ArrayIndexOutOfBoundsException of horizontal compaction during update, when cardinality changes within a segment

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new faa85fb  [CARBONDATA-3272]fix ArrayIndexOutOfBoundsException of horizontal compaction during update, when cardinality changes within a segment
faa85fb is described below

commit faa85fb93e1829be29df2349e9e55b90af5a5271
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Fri Jan 25 14:35:32 2019 +0530

    [CARBONDATA-3272]fix ArrayIndexOutOfBoundsException of horizontal compaction during update, when cardinality changes within a segment
    
    Problem:
    During horizontal compaction in update, we prepare a taskBlockMapping to get the resultIterators. horizontal compaction will be done within a segment. Here, source segment properties will be always prepared by the filefooter of first block in the blocklist for a corresponding task. source segment properties will contain the dimensionKeyGenerator which will be used to convert the rows. If the cardinality is different for two blcoks for a task, then the dimensionKeyGenerator will be dif [...]
    
    Solution
    so get all the blocks present in a task and then split into multiple lists of same key length and create separate RawResultIterator for each list of same key length. If all the blocks have same keylength, then make a single RawResultIterator for all the blocks
    
    This closes #3102
---
 .../core/scan/wrappers/IntArrayWrapper.java        | 47 +++++++++++++
 .../merger/CarbonCompactionExecutor.java           | 77 +++++++++++++++++-----
 .../processing/merger/CarbonCompactionUtil.java    |  2 +-
 3 files changed, 109 insertions(+), 17 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/IntArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/IntArrayWrapper.java
new file mode 100644
index 0000000..c1a75d5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/IntArrayWrapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.wrappers;
+
+import java.util.Arrays;
+
+/**
+ * Wrapper class for int[] data
+ */
+public class IntArrayWrapper {
+
+  private final int[] data;
+
+  public IntArrayWrapper(int[] data) {
+    this.data = data;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    IntArrayWrapper that = (IntArrayWrapper) o;
+    return Arrays.equals(data, that.data);
+  }
+
+  @Override public int hashCode() {
+    return Arrays.hashCode(data);
+  }
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 79b66e2..5961cd7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.scan.wrappers.IntArrayWrapper;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
@@ -117,7 +118,7 @@ public class CarbonCompactionExecutor {
     resultList.put(CarbonCompactionUtil.SORTED_IDX,
         new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE));
 
-    List<TableBlockInfo> list = null;
+    List<TableBlockInfo> tableBlockInfos = null;
     QueryModelBuilder builder = new QueryModelBuilder(carbonTable)
         .projectAllColumns()
         .dataConverter(dataTypeConverter)
@@ -130,7 +131,6 @@ public class CarbonCompactionExecutor {
     for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
       String segmentId = taskMap.getKey();
       List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
-      SegmentProperties sourceSegProperties = getSourceSegmentProperties(listMetadata);
       // for each segment get taskblock info
       TaskBlockInfo taskBlockInfo = taskMap.getValue();
       Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
@@ -139,26 +139,71 @@ public class CarbonCompactionExecutor {
           CarbonCompactionUtil.isRestructured(listMetadata, carbonTable.getTableLastUpdatedTime())
               || !CarbonCompactionUtil.isSorted(listMetadata.get(0));
       for (String task : taskBlockListMapping) {
-        list = taskBlockInfo.getTableBlockInfoList(task);
-        Collections.sort(list);
-        LOGGER.info(
-            "for task -" + task + "- in segment id -" + segmentId + "- block size is -" + list
-                .size());
-        queryModel.setTableBlockInfos(list);
-        if (sortingRequired) {
-          resultList.get(CarbonCompactionUtil.UNSORTED_IDX).add(
-              new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
-                  sourceSegProperties, destinationSegProperties, false));
-        } else {
-          resultList.get(CarbonCompactionUtil.SORTED_IDX).add(
-              new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
-                  sourceSegProperties, destinationSegProperties, false));
+        tableBlockInfos = taskBlockInfo.getTableBlockInfoList(task);
+        // during update there may be a chance that the cardinality may change within the segment
+        // which may lead to failure while converting the row, so get all the blocks present in a
+        // task and then split into multiple lists of same column values and create separate
+        // RawResultIterator for each tableBlockInfo of same column values. If all the blocks have
+        // same column values, then make a single RawResultIterator for all the blocks
+        List<List<TableBlockInfo>> listOfTableBlocksBasedOnKeyLength =
+            getListOfTableBlocksBasedOnColumnValueSize(tableBlockInfos);
+        for (List<TableBlockInfo> tableBlockInfoList : listOfTableBlocksBasedOnKeyLength) {
+          Collections.sort(tableBlockInfoList);
+          LOGGER.info("for task -" + task + "- in segment id -" + segmentId + "- block size is -"
+              + tableBlockInfos.size());
+          queryModel.setTableBlockInfos(tableBlockInfoList);
+          if (sortingRequired) {
+            resultList.get(CarbonCompactionUtil.UNSORTED_IDX).add(
+                getRawResultIterator(configuration, segmentId, task, tableBlockInfoList));
+          } else {
+            resultList.get(CarbonCompactionUtil.SORTED_IDX).add(
+                getRawResultIterator(configuration, segmentId, task, tableBlockInfoList));
+          }
         }
       }
     }
     return resultList;
   }
 
+  private RawResultIterator getRawResultIterator(Configuration configuration, String segmentId,
+      String task, List<TableBlockInfo> tableBlockInfoList)
+      throws QueryExecutionException, IOException {
+    return new RawResultIterator(
+        executeBlockList(tableBlockInfoList, segmentId, task, configuration),
+        getSourceSegmentProperties(
+            Collections.singletonList(tableBlockInfoList.get(0).getDataFileFooter())),
+        destinationSegProperties, false);
+  }
+
+  /**
+   * This method returns the List of TableBlockInfoList, where each listOfTableBlockInfos will have
+   * same columnvalues
+   * @param tableBlockInfos List of tableBlockInfos present in each task
+   */
+  private List<List<TableBlockInfo>> getListOfTableBlocksBasedOnColumnValueSize(
+      List<TableBlockInfo> tableBlockInfos) {
+    List<List<TableBlockInfo>> listOfTableBlockInfoListOnColumnvaluesSize = new ArrayList<>();
+    Map<IntArrayWrapper, List<TableBlockInfo>> columnvalueSizeToTableBlockInfoMap = new HashMap<>();
+    for (TableBlockInfo tableBlock : tableBlockInfos) {
+      // get the columnValueSize for the dataFileFooter
+      IntArrayWrapper columnValueSize = new IntArrayWrapper(
+          getSourceSegmentProperties(Collections.singletonList(tableBlock.getDataFileFooter()))
+              .getColumnsValueSize());
+      List<TableBlockInfo> tempBlockInfoList =
+          columnvalueSizeToTableBlockInfoMap.get(columnValueSize);
+      if (tempBlockInfoList == null) {
+        tempBlockInfoList = new ArrayList<>();
+        columnvalueSizeToTableBlockInfoMap.put(columnValueSize, tempBlockInfoList);
+      }
+      tempBlockInfoList.add(tableBlock);
+    }
+    for (Map.Entry<IntArrayWrapper, List<TableBlockInfo>> taskMap :
+        columnvalueSizeToTableBlockInfoMap.entrySet()) {
+      listOfTableBlockInfoListOnColumnvaluesSize.add(taskMap.getValue());
+    }
+    return listOfTableBlockInfoListOnColumnvaluesSize;
+  }
+
   /**
    * This method will create the source segment properties based on restructured block existence
    *
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 1bf30b5..ffcfe0c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -144,10 +144,10 @@ public class CarbonCompactionUtil {
         if (null == dataFileMatadata.isSorted()) {
           dataFileMatadata.setSorted(isSortedTable);
         }
-        blockInfo.setDataFileFooter(dataFileMatadata);
       } else {
         dataFileMatadata = CarbonUtil.readMetadataFile(blockInfo);
       }
+      blockInfo.setDataFileFooter(dataFileMatadata);
       if (null == metadataList) {
         // if it is not present
         eachSegmentBlocks.add(dataFileMatadata);