You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/04/02 02:42:01 UTC

[carbondata] 41/41: [HOTFIX]Fixed data map loading issue when number of segments are high

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 6a57b4b5884e2155b8b8f27fedf847a081c25c8f
Author: kumarvishal09 <ku...@gmail.com>
AuthorDate: Fri Mar 29 06:41:00 2019 +0530

    [HOTFIX]Fixed data map loading issue when number of segments are high
    
    Problem:
    When number of segments are high then sometimes data map loading is throwing NPE
    
    Solution:
    If two segments having same schema once first one is loaded and second loading is in progress first one tries to clear segment properties cache and clearing the min max
    Now added a check if min max is not present then get the min max again as after loading each segment will clear the min max cache so there will not be any leak
    
    This closes #3169
---
 .../block/SegmentPropertiesAndSchemaHolder.java    | 62 +++++++++++++---------
 .../indexstore/blockletindex/BlockDataMap.java     |  4 +-
 2 files changed, 39 insertions(+), 27 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
index 6f9a93d..34ce5d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
@@ -235,20 +235,20 @@ public class SegmentPropertiesAndSchemaHolder {
           segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
       synchronized (getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier())) {
         segmentIdAndSegmentPropertiesIndexWrapper.removeSegmentId(segmentId);
-      }
-      // if after removal of given SegmentId, the segmentIdSet becomes empty that means this
-      // segmentPropertiesWrapper is not getting used at all. In that case this object can be
-      // removed from all the holders
-      if (clearSegmentWrapperFromMap && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet
-          .isEmpty()) {
-        indexToSegmentPropertiesWrapperMapping.remove(segmentPropertiesIndex);
-        segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper);
-      } else if (!clearSegmentWrapperFromMap
-          && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) {
-        // min max columns can very when cache is modified. So even though entry is not required
-        // to be deleted from map clear the column cache so that it can filled again
-        segmentPropertiesWrapper.clear();
-        LOGGER.info("cleared min max for segmentProperties at index: " + segmentPropertiesIndex);
+        // if after removal of given SegmentId, the segmentIdSet becomes empty that means this
+        // segmentPropertiesWrapper is not getting used at all. In that case this object can be
+        // removed from all the holders
+        if (clearSegmentWrapperFromMap && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet
+            .isEmpty()) {
+          indexToSegmentPropertiesWrapperMapping.remove(segmentPropertiesIndex);
+          segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper);
+        } else if (!clearSegmentWrapperFromMap
+            && segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) {
+          // min max columns can very when cache is modified. So even though entry is not required
+          // to be deleted from map clear the column cache so that it can filled again
+          segmentPropertiesWrapper.clear();
+          LOGGER.info("cleared min max for segmentProperties at index: " + segmentPropertiesIndex);
+        }
       }
     }
   }
@@ -280,12 +280,13 @@ public class SegmentPropertiesAndSchemaHolder {
 
     private static final Object taskSchemaLock = new Object();
     private static final Object fileFooterSchemaLock = new Object();
+    private static final Object minMaxLock = new Object();
 
-    private AbsoluteTableIdentifier tableIdentifier;
     private List<ColumnSchema> columnsInTable;
     private int[] columnCardinality;
     private SegmentProperties segmentProperties;
     private List<CarbonColumn> minMaxCacheColumns;
+    private CarbonTable carbonTable;
     // in case of hybrid store we can have block as well as blocklet schema
     // Scenario: When there is a hybrid store in which few loads are from legacy store which do
     // not contain the blocklet information and hence they will be, by default have cache_level as
@@ -300,7 +301,7 @@ public class SegmentPropertiesAndSchemaHolder {
 
     public SegmentPropertiesWrapper(CarbonTable carbonTable,
         List<ColumnSchema> columnsInTable, int[] columnCardinality) {
-      this.tableIdentifier = carbonTable.getAbsoluteTableIdentifier();
+      this.carbonTable = carbonTable;
       this.columnsInTable = columnsInTable;
       this.columnCardinality = columnCardinality;
     }
@@ -320,8 +321,9 @@ public class SegmentPropertiesAndSchemaHolder {
      */
     public void clear() {
       if (null != minMaxCacheColumns) {
-        minMaxCacheColumns.clear();
+        minMaxCacheColumns = null;
       }
+
       taskSummarySchemaForBlock = null;
       taskSummarySchemaForBlocklet = null;
       fileFooterEntrySchemaForBlock = null;
@@ -334,7 +336,8 @@ public class SegmentPropertiesAndSchemaHolder {
       }
       SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper other =
           (SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper) obj;
-      return tableIdentifier.equals(other.tableIdentifier) && checkColumnSchemaEquality(
+      return carbonTable.getAbsoluteTableIdentifier()
+          .equals(other.carbonTable.getAbsoluteTableIdentifier()) && checkColumnSchemaEquality(
           columnsInTable, other.columnsInTable) && Arrays
           .equals(columnCardinality, other.columnCardinality);
     }
@@ -372,12 +375,12 @@ public class SegmentPropertiesAndSchemaHolder {
       for (ColumnSchema columnSchema: columnsInTable) {
         allColumnsHashCode = allColumnsHashCode + columnSchema.strictHashCode();
       }
-      return tableIdentifier.hashCode() + allColumnsHashCode + Arrays
+      return carbonTable.getAbsoluteTableIdentifier().hashCode() + allColumnsHashCode + Arrays
           .hashCode(columnCardinality);
     }
 
     public AbsoluteTableIdentifier getTableIdentifier() {
-      return tableIdentifier;
+      return carbonTable.getAbsoluteTableIdentifier();
     }
 
     public SegmentProperties getSegmentProperties() {
@@ -398,8 +401,8 @@ public class SegmentPropertiesAndSchemaHolder {
         synchronized (taskSchemaLock) {
           if (null == taskSummarySchemaForBlock) {
             taskSummarySchemaForBlock = SchemaGenerator
-                .createTaskSummarySchema(segmentProperties, minMaxCacheColumns, storeBlockletCount,
-                    filePathToBeStored);
+                .createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(),
+                    storeBlockletCount, filePathToBeStored);
           }
         }
       }
@@ -412,8 +415,8 @@ public class SegmentPropertiesAndSchemaHolder {
         synchronized (taskSchemaLock) {
           if (null == taskSummarySchemaForBlocklet) {
             taskSummarySchemaForBlocklet = SchemaGenerator
-                .createTaskSummarySchema(segmentProperties, minMaxCacheColumns, storeBlockletCount,
-                    filePathToBeStored);
+                .createTaskSummarySchema(segmentProperties, getMinMaxCacheColumns(),
+                    storeBlockletCount, filePathToBeStored);
           }
         }
       }
@@ -425,7 +428,7 @@ public class SegmentPropertiesAndSchemaHolder {
         synchronized (fileFooterSchemaLock) {
           if (null == fileFooterEntrySchemaForBlock) {
             fileFooterEntrySchemaForBlock =
-                SchemaGenerator.createBlockSchema(segmentProperties, minMaxCacheColumns);
+                SchemaGenerator.createBlockSchema(segmentProperties, getMinMaxCacheColumns());
           }
         }
       }
@@ -437,7 +440,7 @@ public class SegmentPropertiesAndSchemaHolder {
         synchronized (fileFooterSchemaLock) {
           if (null == fileFooterEntrySchemaForBlocklet) {
             fileFooterEntrySchemaForBlocklet =
-                SchemaGenerator.createBlockletSchema(segmentProperties, minMaxCacheColumns);
+                SchemaGenerator.createBlockletSchema(segmentProperties, getMinMaxCacheColumns());
           }
         }
       }
@@ -445,6 +448,13 @@ public class SegmentPropertiesAndSchemaHolder {
     }
 
     public List<CarbonColumn> getMinMaxCacheColumns() {
+      if (null == minMaxCacheColumns) {
+        synchronized (minMaxLock) {
+          if (null == minMaxCacheColumns) {
+            addMinMaxColumns(carbonTable);
+          }
+        }
+      }
       return minMaxCacheColumns;
     }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 4b32688..5b2132c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -460,7 +460,9 @@ public class BlockDataMap extends CoarseGrainDataMap
       addMinMaxFlagValues(row, schema[ordinal], minMaxFlagValuesForColumnsToBeCached, ordinal);
       memoryDMStore.addIndexRow(schema, row);
     } catch (Exception e) {
-      throw new RuntimeException(e);
+      String message = "Load to unsafe failed for block: " + filePath;
+      LOGGER.error(message, e);
+      throw new RuntimeException(message, e);
     }
     return summaryRow;
   }