You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/28 05:10:43 UTC

[carbondata] branch master updated: [CARBONDATA-3384] Fix NullPointerException for update/delete using index server

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new bd16325  [CARBONDATA-3384] Fix NullPointerException for update/delete using index server
bd16325 is described below

commit bd1632564acb248db7080b9fd5f76b8e8da79101
Author: kunal642 <ku...@gmail.com>
AuthorDate: Wed May 15 11:35:18 2019 +0530

    [CARBONDATA-3384] Fix NullPointerException for update/delete using index server
    
    Problem:
    After update the segment cache is cleared from the executor, then in any subsequent query only one index file is considered for creating the BlockUniqueIdentifier. Therefore the query throws NullPointer when accessing the segmentProperties.
    
    Solution:
    Consider all index file for the segment for Identifier creation.
    
    This closes #3218
---
 .../indexstore/blockletindex/BlockletDataMapFactory.java |  4 ++--
 .../carbondata/hadoop/api/CarbonTableInputFormat.java    |  4 +++-
 .../indexserver/InvalidateSegmentCacheRDD.scala          | 16 ++++++++++------
 3 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index e4a3ad8..446507f 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -344,6 +344,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(distributable.getSegment().getSegmentNo());
     if (tableBlockIndexUniqueIdentifiers == null) {
+      tableBlockIndexUniqueIdentifiers = new HashSet<>();
       Set<String> indexFiles = distributable.getSegment().getCommittedIndexFile().keySet();
       for (String indexFile : indexFiles) {
         CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile);
@@ -363,10 +364,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
         identifiersWrapper.add(
             new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
                 this.getCarbonTable()));
-        tableBlockIndexUniqueIdentifiers = new HashSet<>();
         tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
-        segmentMap.put(distributable.getSegment().getSegmentNo(), tableBlockIndexUniqueIdentifiers);
       }
+      segmentMap.put(distributable.getSegment().getSegmentNo(), tableBlockIndexUniqueIdentifiers);
     } else {
       for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
           tableBlockIndexUniqueIdentifiers) {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 458c95e..dd86dcb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -564,7 +564,9 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
                 allSegments.getInvalidSegments(), toBeCleanedSegments));
         for (InputSplit extendedBlocklet : extendedBlocklets) {
           CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
-          blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blocklet.getFilePath(),
+          String filePath = blocklet.getFilePath();
+          String blockName = filePath.substring(filePath.lastIndexOf("/") + 1);
+          blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName,
               (long) blocklet.getDetailInfo().getRowCount());
         }
       } else {
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
index 1aa8cd9..bc83d2f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
@@ -43,12 +43,16 @@ class InvalidateSegmentCacheRDD(@transient private val ss: SparkSession, databas
   }
 
   override protected def internalGetPartitions: Array[Partition] = {
-    executorsList.zipWithIndex.map {
-      case (executor, idx) =>
-        // create a dummy split for each executor to accumulate the cache size.
-        val dummySplit = new CarbonInputSplit()
-        dummySplit.setLocation(Array(executor))
-        new DataMapRDDPartition(id, idx, dummySplit)
+    if (invalidSegmentIds.isEmpty) {
+      Array()
+    } else {
+      executorsList.zipWithIndex.map {
+        case (executor, idx) =>
+          // create a dummy split for each executor to accumulate the cache size.
+          val dummySplit = new CarbonInputSplit()
+          dummySplit.setLocation(Array(executor))
+          new DataMapRDDPartition(id, idx, dummySplit)
+      }
     }
   }
 }