You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/04/28 05:57:07 UTC

[carbondata] branch master updated: [CARBONDATA-3779]BlockletIndexInputFormat object instantiation failed due to mismatch in constructor params

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f179f1  [CARBONDATA-3779]BlockletIndexInputFormat object instantiation failed due to mismatch in constructor params
6f179f1 is described below

commit 6f179f1c97e8cbda9230c34baf620905d48c9486
Author: Venu Reddy <ve...@huawei.com>
AuthorDate: Wed Apr 22 12:39:42 2020 +0530

    [CARBONDATA-3779]BlockletIndexInputFormat object instantiation failed due to mismatch in constructor params
    
    Why is this PR needed?
    BlockletIndexInputFormat object instantiation failed due to mismatch in params passed to reflection constructor
    instantiation and actual parameters of BlockletIndexInputFormat constructor.
    
    What changes were proposed in this PR?
    1. Have modified to pass the correct parameters while instanting the BlockletIndexInputFormat through reflections
    2. Segment min-max based pruning to happen when CARBON_LOAD_INDEXES_PARALLEL is enabled.
    
    This closes #3723
---
 .../core/constants/CarbonCommonConstants.java      |  3 ++
 .../blockletindex/BlockletIndexFactory.java        | 11 +++----
 .../carbondata/core/util/CarbonProperties.java     |  6 ++--
 .../Jobs/BlockletIndexInputFormat.java             | 12 +++++--
 ...ryWithColumnMetCacheAndCacheLevelProperty.scala | 37 ++++++++++++++--------
 5 files changed, 44 insertions(+), 25 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 732d8d1..b5e7f0d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1561,6 +1561,9 @@ public final class CarbonCommonConstants {
   @CarbonProperty(dynamicConfigurable = true)
   public static final String CARBON_LOAD_INDEXES_PARALLEL = "carbon.load.indexes.parallel.";
 
+  // Default value for parallel index loading
+  public static final String CARBON_LOAD_INDEXES_PARALLEL_DEFAULT = "false";
+
   // by default lucene will not store or create index for stop words like "is","the", if this
   // property is set to true lucene will index for stop words also and gives result for the filter
   // with stop words(example: TEXT_MATCH('description':'the'))
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexFactory.java
index 77251c4..c62b644 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletIndexFactory.java
@@ -352,7 +352,7 @@ public class BlockletIndexFactory extends CoarseGrainIndexFactory
       throws IOException {
     SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.get(segment.getSegmentNo());
     Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = null;
-    if (null != segmentBlockIndexInfo) {
+    if (null != segmentBlockIndexInfo && null != segmentBlockIndexInfo.getSegmentMetaDataInfo()) {
       segment.setSegmentMetaDataInfo(
           segmentMap.get(segment.getSegmentNo()).getSegmentMetaDataInfo());
       return segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
@@ -698,12 +698,11 @@ public class BlockletIndexFactory extends CoarseGrainIndexFactory
 
   private Set<TableBlockIndexUniqueIdentifier> getTableSegmentUniqueIdentifiers(Segment segment)
       throws IOException {
-    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-        segmentMap.get(segment.getSegmentNo()).getTableBlockIndexUniqueIdentifiers();
-    if (tableBlockIndexUniqueIdentifiers == null) {
-      tableBlockIndexUniqueIdentifiers = BlockletIndexUtil.getSegmentUniqueIdentifiers(segment);
+    SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.get(segment.getSegmentNo());
+    if (segmentBlockIndexInfo == null) {
+      return BlockletIndexUtil.getSegmentUniqueIdentifiers(segment);
     }
-    return tableBlockIndexUniqueIdentifiers;
+    return segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
   }
 
   public void updateSegmentIndex(
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index c813569..1d8e6ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -624,12 +624,12 @@ public final class CarbonProperties {
   public boolean isIndexParallelLoadingEnabled(String databaseName, String tableName) {
     // Check for propertyKey.dbname.table name for session based set for a specific table.
     String loadIndexParallel = getSessionPropertyValue(
-        CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL + "." + databaseName + "." + tableName);
+        CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL + databaseName + "." + tableName);
     // If table table property is not specified then check for session for all the tables
     // otherwise check in carbon.properties
     if (loadIndexParallel == null) {
-      loadIndexParallel =
-          getProperty(CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL, "false");
+      loadIndexParallel = getProperty(CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL,
+          CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL_DEFAULT);
     }
     boolean configuredValue = Boolean.parseBoolean(loadIndexParallel);
     if (configuredValue) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexInputFormat.java b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexInputFormat.java
index bd5310e..71f445d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexInputFormat.java
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexInputFormat.java
@@ -21,9 +21,11 @@ package org.apache.spark.sql.secondaryindex.Jobs;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -37,6 +39,8 @@ import org.apache.carbondata.core.index.dev.CacheableIndex;
 import org.apache.carbondata.core.index.dev.IndexFactory;
 import org.apache.carbondata.core.index.dev.expr.IndexExprWrapper;
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.indexstore.BlockletIndexStore;
 import org.apache.carbondata.core.indexstore.BlockletIndexWrapper;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper;
@@ -123,8 +127,11 @@ public class BlockletIndexInputFormat
       Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletIndexWrapper> cache =
           CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_INDEX);
       private Iterator<TableBlockIndexUniqueIdentifier> iterator;
+      // Cache to avoid multiple times listing of files
+      private Map<String, Map<String, BlockMetaInfo>> segInfoCache = new HashMap<>();
 
-      @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      @Override
+      public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
           throws IOException, InterruptedException {
         BlockletIndexInputSplit segmentDistributable =
             (BlockletIndexInputSplit) inputSplit;
@@ -144,7 +151,8 @@ public class BlockletIndexInputFormat
               new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, table,
                   false, true, true);
           this.tableBlockIndexUniqueIdentifierWrapper = tableBlockIndexUniqueIdentifierWrapper;
-          wrapper = cache.get(tableBlockIndexUniqueIdentifierWrapper);
+          wrapper = ((BlockletIndexStore) cache)
+              .get(tableBlockIndexUniqueIdentifierWrapper, segInfoCache);
           return true;
         }
         return false;
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 995d9bf..ee2b62e 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -23,13 +23,13 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{CarbonEnv, Row}
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.index.dev.Index
-import org.apache.carbondata.core.index.{IndexStoreManager, IndexChooser, IndexFilter, Segment, TableIndex}
+import org.apache.carbondata.core.index.{IndexChooser, IndexFilter, IndexStoreManager, Segment, TableIndex}
 import org.apache.carbondata.core.indexstore.Blocklet
-import org.apache.carbondata.core.indexstore.blockletindex.{BlockIndex, BlockletIndexRowIndexes, BlockletIndex}
+import org.apache.carbondata.core.indexstore.blockletindex.{BlockIndex, BlockletIndex, BlockletIndexRowIndexes}
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
@@ -43,14 +43,20 @@ import org.apache.carbondata.core.util.CarbonProperties
 /**
  * test class for validating COLUMN_META_CACHE and CACHE_LEVEL
  */
-class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with BeforeAndAfterAll {
+class TestQueryWithColumnMetCacheAndCacheLevelProperty
+  extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
 
   override def beforeAll(): Unit = {
     dropSchema
   }
 
-  override def afterAll(): Unit = {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT,CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT_DEFAULT)
+  override def afterEach(): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT,
+        CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT_DEFAULT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL,
+        CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL_DEFAULT)
     dropSchema
   }
 
@@ -58,6 +64,7 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
     sql("drop table if exists metaCache")
     sql("drop table if exists column_min_max_cache_test")
     sql("drop table if exists minMaxSerialize")
+    sql("drop table if exists parallel_index_load")
   }
 
   private def createAndLoadTable(cacheLevel: String): Unit = {
@@ -96,7 +103,6 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
   }
 
   test("verify if number of columns cached are as per the COLUMN_META_CACHE property index instance is as per CACHE_LEVEL property") {
-    sql("drop table if exists metaCache")
     sql("create table metaCache(name string, c1 string, c2 string) STORED AS carbondata")
     sql("insert into metaCache select 'a','aa','aaa'")
     checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa"))
@@ -144,7 +150,6 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
   }
 
   test("test UPDATE scenario after column_meta_cache") {
-    sql("drop table if exists metaCache")
     sql("create table metaCache(name string, c1 string, c2 string) STORED AS carbondata TBLPROPERTIES('COLUMN_META_CACHE'='')")
     sql("insert into metaCache select 'a','aa','aaa'")
     sql("insert into metaCache select 'b','bb','bbb'")
@@ -153,7 +158,6 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
   }
 
   test("test queries with column_meta_cache and cache_level='BLOCK'") {
-    dropSchema
     // set cache_level
     createAndLoadTable("BLOCK")
     // check count(*)
@@ -189,7 +193,6 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
   }
 
   test("test queries with column_meta_cache and cache_level='BLOCKLET'") {
-    dropSchema
     // set cache_level
     createAndLoadTable("BLOCKLET")
     // check count(*)
@@ -225,7 +228,6 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
   }
 
   test("test update on column cached") {
-    dropSchema
     // set cache_level
     createAndLoadTable("BLOCKLET")
     sql("update column_min_max_cache_test set (designation)=('SEG') where empname='ayushi'").show()
@@ -236,7 +238,6 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
   }
 
   test("test update on column not cached") {
-    dropSchema
     // set cache_level
     createAndLoadTable("BLOCKLET")
     sql(
@@ -261,7 +262,6 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
   }
 
   test("verify min/max getting serialized to executor when cache_level = blocklet") {
-    sql("drop table if exists minMaxSerialize")
     sql("create table minMaxSerialize(name string, c1 string, c2 string) STORED AS carbondata TBLPROPERTIES('CACHE_LEVEL'='BLOCKLET', 'COLUMN_META_CACHE'='c1,c2')")
     sql("insert into minMaxSerialize select 'a','aa','aaa'")
     checkAnswer(sql("select * from minMaxSerialize where name='a'"), Row("a", "aa", "aaa"))
@@ -340,5 +340,14 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
     sql("DROP table IF EXISTS carbonCahe")
   }
 
-
+  test("Test query with parallel index load") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_LOAD_INDEXES_PARALLEL, "true")
+    sql("CREATE table parallel_index_load (a STRING, b STRING, c INT) STORED AS carbondata")
+    sql("insert into parallel_index_load select 'aa', 'bb', 1")
+    sql("insert into parallel_index_load select 'cc', 'dd', 2")
+    sql("insert into parallel_index_load select 'ee', 'ff', 3")
+    sql("select a, b from parallel_index_load").collect()
+    assert(sql("select a, b from parallel_index_load").count() == 3)
+  }
 }