You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/28 14:09:37 UTC

[carbondata] branch master updated: [CARBONDATA-3395] Fix Exception when concurrent readers built with same split object

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 36ee528  [CARBONDATA-3395] Fix Exception when concurrent readers built with same split object
36ee528 is described below

commit 36ee52836c7bb7bc8e7a4cc6c294d7b77fdba2ee
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Fri May 24 19:50:57 2019 +0530

    [CARBONDATA-3395] Fix Exception when concurrent readers built with same split object
    
    problem: Fix Exception when concurrent readers built with same split object
    
    cause: In CarbonInputSplit, BlockletDetailInfo and BlockletInfo are made lazy. so, BlockletInfo is prepared during reader builder.
    so, when two readers work on same split object, the state of this object is changed and leading to array out of bound issue.
    
    solution: a) synchronize BlockletInfo creation,
    b) load BlockletDetailInfo before passing to reader inside getSplit() API itself.
    c) Failure case get the proper identifier to cleanup the datamaps.
    d) build_with_splits, need to handle default projection filling if not configured.
    
    This closes #3232
---
 .../carbondata/core/indexstore/BlockletDetailInfo.java   |  6 +++++-
 .../carbondata/hadoop/api/CarbonFileInputFormat.java     | 16 ++++++++++------
 .../apache/carbondata/sdk/file/CarbonReaderBuilder.java  | 14 ++++++++++----
 3 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index a5aa899..af07f09 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -108,7 +108,11 @@ public class BlockletDetailInfo implements Serializable, Writable {
   public BlockletInfo getBlockletInfo() {
     if (null == blockletInfo) {
       try {
-        setBlockletInfoFromBinary();
+        synchronized (this) {
+          if (null == blockletInfo) {
+            setBlockletInfoFromBinary();
+          }
+        }
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index e83f898..1f34c4f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -200,17 +200,21 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
           }
         });
       }
-      if (getColumnProjection(job.getConfiguration()) == null) {
-        // If the user projection is empty, use default all columns as projections.
-        // All column name will be filled inside getSplits, so can update only here.
-        String[]  projectionColumns = projectAllColumns(carbonTable);
-        setColumnProjection(job.getConfiguration(), projectionColumns);
-      }
+      setAllColumnProjectionIfNotConfigured(job, carbonTable);
       return splits;
     }
     return null;
   }
 
+  public void setAllColumnProjectionIfNotConfigured(JobContext job, CarbonTable carbonTable) {
+    if (getColumnProjection(job.getConfiguration()) == null) {
+      // If the user projection is empty, use default all columns as projections.
+      // All column name will be filled inside getSplits, so can update only here.
+      String[]  projectionColumns = projectAllColumns(carbonTable);
+      setColumnProjection(job.getConfiguration(), projectionColumns);
+    }
+  }
+
   private List<CarbonFile> getAllCarbonDataFiles(String tablePath) {
     List<CarbonFile> carbonFiles;
     try {
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 6ead50d..2db92ea 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -358,8 +358,8 @@ public class CarbonReaderBuilder {
       }
     } catch (Exception ex) {
       // Clear the datamap cache as it can get added in getSplits() method
-      DataMapStoreManager.getInstance()
-          .clearDataMaps(format.getAbsoluteTableIdentifier(hadoopConf));
+      DataMapStoreManager.getInstance().clearDataMaps(
+          format.getOrCreateCarbonTable((job.getConfiguration())).getAbsoluteTableIdentifier());
       throw ex;
     }
   }
@@ -372,6 +372,8 @@ public class CarbonReaderBuilder {
     }
     final Job job = new Job(new JobConf(hadoopConf));
     CarbonFileInputFormat format = prepareFileInputFormat(job, false, true);
+    format.setAllColumnProjectionIfNotConfigured(job,
+        format.getOrCreateCarbonTable(job.getConfiguration()));
     try {
       List<RecordReader<Void, T>> readers = new ArrayList<>(1);
       RecordReader reader = getRecordReader(job, format, readers, inputSplit);
@@ -383,8 +385,8 @@ public class CarbonReaderBuilder {
       }
     } catch (Exception ex) {
       // Clear the datamap cache as it can get added in getSplits() method
-      DataMapStoreManager.getInstance()
-          .clearDataMaps(format.getAbsoluteTableIdentifier(hadoopConf));
+      DataMapStoreManager.getInstance().clearDataMaps(
+          format.getOrCreateCarbonTable((job.getConfiguration())).getAbsoluteTableIdentifier());
       throw ex;
     }
   }
@@ -407,6 +409,10 @@ public class CarbonReaderBuilder {
     CarbonFileInputFormat format = prepareFileInputFormat(job, enableBlockletDistribution, false);
     List<InputSplit> splits =
         format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
+    for (InputSplit split : splits) {
+      // Load the detailInfo
+      ((CarbonInputSplit) split).getDetailInfo();
+    }
     return splits.toArray(new InputSplit[splits.size()]);
   }
 }