You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/13 14:45:02 UTC

carbondata git commit: [CARBONDATA-3065]make inverted index false by default

Repository: carbondata
Updated Branches:
  refs/heads/master 0ccbe1b4d -> c94c8ce56


[CARBONDATA-3065]make inverted index false by default

Why this PR?
Bottleneck with invertedIndex:
As for each page first we will sort the data and generate inverted index, data loading performance will get impacted.because of this
Store size is more because of stroing inverted index for each dimension column which results in more IO and it impacts query performance
One extra lookup happenes during query due to presence of inverted index which is causing many cachline miss and it impacts the query performance
Changes Proposed in this PR:
by default inverted index will be disabled for all the dimension columns, only row level sorting happens in sortstep and column level sorting will be skipped.
new table property will be added called INVERTED_INDEX which takes comma separated column names as value, and only for these columns inverted index will be generated.
NO_INVERTED_INDEX table property will be deprecated, and it will be kept only for compatibility.
Handled in describe formatted for inverted index columns
how this tested
existing UTs will take care and new test cases are added.

This closes #2886


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c94c8ce5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c94c8ce5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c94c8ce5

Branch: refs/heads/master
Commit: c94c8ce56052cae48d42b49fe660c19795eba0e4
Parents: 0ccbe1b
Author: akashrn5 <ak...@gmail.com>
Authored: Wed Oct 31 14:13:48 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Nov 13 20:14:51 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  7 +-
 .../schema/table/TableSchemaBuilder.java        | 23 +++----
 .../schema/table/TableSchemaBuilderSuite.java   | 10 +--
 docs/ddl-of-carbondata.md                       |  4 +-
 docs/sdk-guide.md                               | 13 ++++
 .../TestNoInvertedIndexLoadAndQuery.scala       | 71 +++++++++++++++++++-
 .../TestNonTransactionalCarbonTable.scala       | 29 +++++++-
 .../partition/TestDDLForPartitionTable.scala    |  6 +-
 ...ForPartitionTableWithDefaultProperties.scala |  9 +--
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 53 ++++++++++++++-
 .../command/carbonTableSchemaCommon.scala       | 26 +++++--
 .../datasources/CarbonSparkDataSourceUtil.scala | 10 +++
 .../datasource/SparkCarbonDataSourceTest.scala  |  4 +-
 .../table/CarbonDescribeFormattedCommand.scala  |  4 +-
 .../vectorreader/AddColumnTestCases.scala       | 24 ++++++-
 .../CarbonGetTableDetailComandTestCase.scala    |  4 +-
 .../sdk/file/CarbonWriterBuilder.java           | 70 ++++++++++++++++---
 .../apache/carbondata/tool/CarbonCliTest.java   | 34 +++++-----
 18 files changed, 329 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6edfd66..259f84e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -440,8 +440,13 @@ public final class CarbonCommonConstants {
   public static final String TABLE_BLOCKSIZE = "table_blocksize";
   // table blocklet size in MB
   public static final String TABLE_BLOCKLET_SIZE = "table_blocklet_size";
-  // set in column level to disable inverted index
+  /**
+   * set in column level to disable inverted index
+   * @Deprecated :This property is deprecated, it is kept just for compatibility
+   */
   public static final String NO_INVERTED_INDEX = "no_inverted_index";
+  // set in column level to enable inverted index
+  public static final String INVERTED_INDEX = "inverted_index";
   // table property name of major compaction size
   public static final String TABLE_MAJOR_COMPACTION_SIZE = "major_compaction_size";
   // table property name of auto load merge

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index b5ce725..3c290af 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -149,12 +149,13 @@ public class TableSchemaBuilder {
     this.sortColumns = sortColumns;
   }
 
-  public ColumnSchema addColumn(StructField field, AtomicInteger valIndex, boolean isSortColumn) {
-    return addColumn(field, null, valIndex, isSortColumn, false);
+  public ColumnSchema addColumn(StructField field, AtomicInteger valIndex, boolean isSortColumn,
+      boolean isInvertedIdxColumn) {
+    return addColumn(field, null, valIndex, isSortColumn, false, isInvertedIdxColumn);
   }
 
   private ColumnSchema addColumn(StructField field, String parentName, AtomicInteger valIndex,
-      boolean isSortColumn, boolean isComplexChild) {
+      boolean isSortColumn, boolean isComplexChild, boolean isInvertedIdxColumn) {
     Objects.requireNonNull(field);
     if (isComplexChild) {
       // if field is complex then append parent name to the child field to check
@@ -196,7 +197,8 @@ public class TableSchemaBuilder {
     // SO, this will not have any impact.
     newColumn.setColumnUniqueId(field.getFieldName());
     newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
-    newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn, isComplexChild));
+    newColumn
+        .setEncodingList(createEncoding(field.getDataType(), isInvertedIdxColumn, isComplexChild));
     if (field.getDataType().isComplexType()) {
       if (DataTypes.isArrayType(field.getDataType()) || DataTypes.isMapType(field.getDataType())) {
         newColumn.setNumberOfChild(1);
@@ -224,26 +226,23 @@ public class TableSchemaBuilder {
         }
       }
     }
-    if (newColumn.isDimensionColumn() && newColumn.isSortColumn()) {
-      newColumn.setUseInvertedIndex(true);
-    }
     if (field.getDataType().isComplexType()) {
       String parentFieldName = newColumn.getColumnName();
       if (DataTypes.isArrayType(field.getDataType())) {
         String colName = getColNameForArray(valIndex);
         addColumn(new StructField(colName, ((ArrayType) field.getDataType()).getElementType()),
-            field.getFieldName(), valIndex, false, true);
+            field.getFieldName(), valIndex, false, true, isInvertedIdxColumn);
       } else if (DataTypes.isStructType(field.getDataType())
           && ((StructType) field.getDataType()).getFields().size() > 0) {
         // This field has children.
         List<StructField> fields = ((StructType) field.getDataType()).getFields();
         for (int i = 0; i < fields.size(); i++) {
-          addColumn(fields.get(i), parentFieldName, valIndex, false, true);
+          addColumn(fields.get(i), parentFieldName, valIndex, false, true, isInvertedIdxColumn);
         }
       } else if (DataTypes.isMapType(field.getDataType())) {
         String colName = getColNameForArray(valIndex);
         addColumn(new StructField(colName, ((MapType) field.getDataType()).getValueType()),
-            parentFieldName, valIndex, false, true);
+            parentFieldName, valIndex, false, true, isInvertedIdxColumn);
       }
     }
     // todo: need more information such as long_string_columns
@@ -290,14 +289,14 @@ public class TableSchemaBuilder {
     }
   }
 
-  private List<Encoding> createEncoding(DataType dataType, boolean isSortColumn,
+  private List<Encoding> createEncoding(DataType dataType, boolean isInvertedIdxColumn,
       boolean isComplexChild) {
     List<Encoding> encodings = new LinkedList<>();
     if (dataType == DataTypes.DATE && !isComplexChild) {
       encodings.add(Encoding.DIRECT_DICTIONARY);
       encodings.add(Encoding.DICTIONARY);
     }
-    if (isSortColumn) {
+    if (isInvertedIdxColumn) {
       encodings.add(Encoding.INVERTED_INDEX);
     }
     return encodings;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java
index 48e5d1b..2cbe69b 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java
@@ -33,16 +33,16 @@ public class TableSchemaBuilderSuite {
   @Test(expected = NullPointerException.class)
   public void testNullField() {
     TableSchemaBuilder builder = TableSchema.builder();
-    builder.addColumn(null, new AtomicInteger(0), true);
+    builder.addColumn(null, new AtomicInteger(0), true, true);
   }
 
   @Test
   public void testBuilder() {
     TableSchemaBuilder builder = TableSchema.builder();
     ColumnSchema columnSchema =
-        builder.addColumn(new StructField("a", DataTypes.INT), new AtomicInteger(0), true);
+        builder.addColumn(new StructField("a", DataTypes.INT), new AtomicInteger(0), true, false);
     builder.setSortColumns(Arrays.asList(columnSchema));
-    builder.addColumn(new StructField("b", DataTypes.DOUBLE), new AtomicInteger(0), false);
+    builder.addColumn(new StructField("b", DataTypes.DOUBLE), new AtomicInteger(0), false, false);
     TableSchema schema = builder.build();
     Assert.assertEquals(2, schema.getListOfColumns().size());
     List<ColumnSchema> columns = schema.getListOfColumns();
@@ -54,9 +54,9 @@ public class TableSchemaBuilderSuite {
   public void testRepeatedColumn() {
     TableSchemaBuilder builder = TableSchema.builder();
     ColumnSchema columnSchema =
-        builder.addColumn(new StructField("a", DataTypes.INT), new AtomicInteger(0), true);
+        builder.addColumn(new StructField("a", DataTypes.INT), new AtomicInteger(0), true, false);
     builder.setSortColumns(Arrays.asList(columnSchema));
-    builder.addColumn(new StructField("a", DataTypes.DOUBLE), new AtomicInteger(0), false);
+    builder.addColumn(new StructField("a", DataTypes.DOUBLE), new AtomicInteger(0), false, false);
     TableSchema schema = builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/docs/ddl-of-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 6ec4929..7e7b210 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -123,11 +123,11 @@ CarbonData DDL statements are documented here,which includes:
 
    - ##### Inverted Index Configuration
 
-     By default inverted index is enabled, it might help to improve compression ratio and query speed, especially for low cardinality columns which are in reward position.
+     By default inverted index is disabled as store size will be reduced, it can be enabled by using a table property. It might help to improve compression ratio and query speed, especially for low cardinality columns which are in reward position.
      Suggested use cases : For high cardinality columns, you can disable the inverted index for improving the data loading performance.
 
      ```
-     TBLPROPERTIES ('NO_INVERTED_INDEX'='column1, column3')
+     TBLPROPERTIES ('NO_INVERTED_INDEX'='column1', 'INVERTED_INDEX'='column2, column3')
      ```
 
    - ##### Sort Columns Configuration

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 6f5c58d..c32477a 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -375,6 +375,8 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options);
 * j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "local_sort"
 * k. long_string_columns -- comma separated string columns which are more than 32k length. 
 *                           default value is null.
+* l. inverted_index -- comma separated string columns for which inverted index needs to be
+*                      generated
 *
 * @return updated CarbonWriterBuilder
 */
@@ -445,6 +447,17 @@ public CarbonWriterBuilder writtenBy(String appName) {
 
 ```
 /**
+* sets the list of columns for which inverted index needs to generated
+* @param invertedIndexColumns is a string array of columns for which inverted index needs to
+* generated.
+* If it is null or an empty array, inverted index will be generated for none of the columns
+* @return updated CarbonWriterBuilder
+*/
+public CarbonWriterBuilder invertedIndexFor(String[] invertedIndexColumns);
+```
+
+```
+/**
 * Build a {@link CarbonWriter}
 * This writer is not thread safe,
 * use withThreadSafe() configuration in multi thread environment

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestNoInvertedIndexLoadAndQuery.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestNoInvertedIndexLoadAndQuery.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestNoInvertedIndexLoadAndQuery.scala
index e8affc4..0033556 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestNoInvertedIndexLoadAndQuery.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestNoInvertedIndexLoadAndQuery.scala
@@ -17,11 +17,15 @@
 
 package org.apache.carbondata.integration.spark.testsuite.dataload
 
-import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.encoder.Encoding
 
 /**
  * Test Class for no inverted index load and query
@@ -289,6 +293,69 @@ class TestNoInvertedIndexLoadAndQuery extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("""select c2 from testNull where c2 is null"""), Seq(Row(null), Row(null), Row(null), Row(null), Row(null), Row(null)))
   }
 
+  test("inverted index with Dictionary_INCLUDE and INVERTED_INDEX") {
+    sql("drop table if exists index1")
+    sql(
+      """
+           CREATE TABLE IF NOT EXISTS index1
+           (id Int, name String, city String)
+           STORED BY 'org.apache.carbondata.format'
+           TBLPROPERTIES('DICTIONARY_INCLUDE'='id','INVERTED_INDEX'='city,name')
+      """)
+    sql(
+      s"""
+           LOAD DATA LOCAL INPATH '$testData1' into table index1
+           """)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "index1")
+    assert(carbonTable.getColumnByName("index1", "city").getColumnSchema.getEncodingList
+      .contains(Encoding.INVERTED_INDEX))
+    assert(carbonTable.getColumnByName("index1", "name").getColumnSchema.getEncodingList
+      .contains(Encoding.INVERTED_INDEX))
+    assert(!carbonTable.getColumnByName("index1", "id").getColumnSchema.getEncodingList
+      .contains(Encoding.INVERTED_INDEX))
+    checkAnswer(
+      sql(
+        """
+           SELECT * FROM index1 WHERE city = "Bangalore"
+        """),
+      Seq(Row(19.0, "Emily", "Bangalore")))
+  }
+
+  test("inverted index with measure column in INVERTED_INDEX") {
+    sql("drop table if exists index1")
+    sql(
+      """
+           CREATE TABLE IF NOT EXISTS index1
+           (id Int, name String, city String)
+           STORED BY 'org.apache.carbondata.format'
+           TBLPROPERTIES('INVERTED_INDEX'='city,name,id')
+      """)
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "index1")
+    assert(carbonTable.getColumnByName("index1", "city").getColumnSchema.getEncodingList
+      .contains(Encoding.INVERTED_INDEX))
+    assert(carbonTable.getColumnByName("index1", "name").getColumnSchema.getEncodingList
+      .contains(Encoding.INVERTED_INDEX))
+    assert(!carbonTable.getColumnByName("index1", "id").getColumnSchema.getEncodingList
+      .contains(Encoding.INVERTED_INDEX))
+  }
+
+  test("test same column configured in inverted and no inverted index"){
+    sql("drop table if exists index1")
+    val exception = intercept[MalformedCarbonCommandException] {
+      sql(
+        """
+           CREATE TABLE IF NOT EXISTS index1
+           (id Int, name String, city String)
+           STORED BY 'org.apache.carbondata.format'
+           TBLPROPERTIES('NO_INVERTED_INDEX'='city','INVERTED_INDEX'='city')
+      """)
+    }
+    assert(exception.getMessage
+      .contains(
+        "Column ambiguity as duplicate column(s):city is present in INVERTED_INDEX and " +
+        "NO_INVERTED_INDEX. Duplicate columns are not allowed."))
+  }
+
   override def afterAll {
     sql("drop table if exists index1")
     sql("drop table if exists index2")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 5b93553..cce4ff0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -2419,7 +2419,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       Map("table_blocksize" -> "12",
         "sort_columns" -> "name",
         "local_dictionary_threshold" -> "200",
-        "local_dictionary_enable" -> "true").asJava
+        "local_dictionary_enable" -> "true",
+        "inverted_index" -> "name").asJava
     val builder = CarbonWriter.builder
       .withTableProperties(tablePropertiesMap)
       .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath).writtenBy("TestNonTransactionalCarbonTable")
@@ -2435,6 +2436,10 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       case Some(row) => assert(row.get(1).toString.contains("true"))
       case None => assert(false)
     }
+    descLoc.find(_.get(0).toString.contains("name")) match {
+      case Some(row) => assert(row.get(2).toString.contains("INVERTEDINDEX"))
+      case None => assert(false)
+    }
     FileUtils.deleteDirectory(new File(writerPath))
   }
 
@@ -2485,6 +2490,28 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     FileUtils.deleteDirectory(new File(writerPath))
   }
 
+  test("test inverted index column by API") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    val builder = CarbonWriter.builder
+      .sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true)
+      .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
+      .invertedIndexFor(Array[String]("name")).writtenBy("TestNonTransactionalCarbonTable")
+    generateCarbonData(builder)
+    sql("DROP TABLE IF EXISTS sdkTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    FileUtils.deleteDirectory(new File(writerPath))
+    sql("insert into sdkTable select 's1','s2',23 ")
+    val descLoc = sql("describe formatted sdkTable").collect
+    descLoc.find(_.get(0).toString.contains("name")) match {
+      case Some(row) => assert(row.get(2).toString.contains("INVERTEDINDEX"))
+      case None => assert(false)
+    }
+    checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(1)))
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
   def generateCarbonData(builder :CarbonWriterBuilder): Unit ={
     val fields = new Array[Field](3)
     fields(0) = new Field("name", DataTypes.STRING)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
index cafd465..d5673bf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
@@ -78,10 +78,9 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
-    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 3)
+    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 2)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(0) == Encoding.DICTIONARY)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(1) == Encoding.DIRECT_DICTIONARY)
-    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(2) == Encoding.INVERTED_INDEX)
     assert(partitionInfo.getPartitionType == PartitionType.RANGE)
     assert(partitionInfo.getRangeInfo.size == 2)
     assert(partitionInfo.getRangeInfo.get(0).equals("2017-06-11 00:00:02"))
@@ -105,8 +104,7 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("workgroupcategory"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.STRING)
-    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 1)
-    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(0) == Encoding.INVERTED_INDEX)
+    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 0)
     assert(partitionInfo.getPartitionType == PartitionType.LIST)
     assert(partitionInfo.getListInfo.size == 3)
     assert(partitionInfo.getListInfo.get(0).size == 1)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
index 88f4487..ac2376a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
@@ -73,10 +73,9 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
-    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 3)
+    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 2)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(0) == Encoding.DICTIONARY)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(1) == Encoding.DIRECT_DICTIONARY)
-    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(2) == Encoding.INVERTED_INDEX)
     assert(partitionInfo.getPartitionType == PartitionType.RANGE)
     assert(partitionInfo.getRangeInfo.size == 2)
     assert(partitionInfo.getRangeInfo.get(0).equals("2017-06-11 00:00:02"))
@@ -101,10 +100,9 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
-    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 3)
+    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 2)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(0) == Encoding.DICTIONARY)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(1) == Encoding.DIRECT_DICTIONARY)
-    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(2) == Encoding.INVERTED_INDEX)
     assert(partitionInfo.getPartitionType == PartitionType.LIST)
     assert(partitionInfo.getListInfo.size == 2)
     assert(partitionInfo.getListInfo.get(0).size == 1)
@@ -133,10 +131,9 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.DATE)
-    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 3)
+    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 2)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(0) == Encoding.DICTIONARY)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(1) == Encoding.DIRECT_DICTIONARY)
-    assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(2) == Encoding.INVERTED_INDEX)
     assert(partitionInfo.getPartitionType == PartitionType.LIST)
     assert(partitionInfo.getListInfo.size == 2)
     assert(partitionInfo.getListInfo.get(0).size == 1)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 4a37a20..3ac2d2b 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -370,6 +370,24 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
 
     // get no inverted index columns from table properties.
     val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
+    // get inverted index columns from table properties
+    val invertedIdxCols = extractInvertedIndexColumns(fields, tableProperties)
+
+    // check for any duplicate columns in inverted and noinverted columns defined in tblproperties
+    if (invertedIdxCols.nonEmpty && noInvertedIdxCols.nonEmpty) {
+      invertedIdxCols.foreach { distCol =>
+        if (noInvertedIdxCols.exists(x => x.equalsIgnoreCase(distCol.trim))) {
+          val duplicateColumns = (invertedIdxCols ++ noInvertedIdxCols)
+            .diff((invertedIdxCols ++ noInvertedIdxCols).distinct).distinct
+          val errMsg = "Column ambiguity as duplicate column(s):" +
+                       duplicateColumns.mkString(",") +
+                       " is present in INVERTED_INDEX " +
+                       "and NO_INVERTED_INDEX. Duplicate columns are not allowed."
+          throw new MalformedCarbonCommandException(errMsg)
+        }
+      }
+    }
+
     // get partitionInfo
     val partitionInfo = getPartitionInfo(partitionCols, tableProperties)
     if (tableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) {
@@ -444,6 +462,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       Option(varcharColumns),
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
+      Option(invertedIdxCols),
       Some(colProps),
       bucketFields: Option[BucketFields],
       partitionInfo,
@@ -623,7 +642,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
 
   /**
    * This will extract the no inverted columns fields.
-   * By default all dimensions use inverted index.
+   * This is still kept for backward compatibility, from carbondata-1.6 onwards,  by default all the
+   * dimensions will be no inverted index only
    *
    * @param fields
    * @param tableProperties
@@ -637,7 +657,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
 
     if (tableProperties.get(CarbonCommonConstants.NO_INVERTED_INDEX).isDefined) {
       noInvertedIdxColsProps =
-        tableProperties.get(CarbonCommonConstants.NO_INVERTED_INDEX).get.split(',').map(_.trim)
+        tableProperties(CarbonCommonConstants.NO_INVERTED_INDEX).split(',').map(_.trim)
       noInvertedIdxColsProps.foreach { noInvertedIdxColProp =>
         if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
           val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
@@ -658,6 +678,35 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     noInvertedIdxCols
   }
 
+  protected def extractInvertedIndexColumns(fields: Seq[Field],
+      tableProperties: Map[String, String]): Seq[String] = {
+    // check whether the column name is in fields
+    var invertedIdxColsProps: Array[String] = Array[String]()
+    var invertedIdxCols: Seq[String] = Seq[String]()
+
+    if (tableProperties.get(CarbonCommonConstants.INVERTED_INDEX).isDefined) {
+      invertedIdxColsProps =
+        tableProperties(CarbonCommonConstants.INVERTED_INDEX).split(',').map(_.trim)
+      invertedIdxColsProps.foreach { invertedIdxColProp =>
+        if (!fields.exists(x => x.column.equalsIgnoreCase(invertedIdxColProp))) {
+          val errormsg = "INVERTED_INDEX column: " + invertedIdxColProp +
+                         " does not exist in table. Please check create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
+        }
+      }
+    }
+    // check duplicate columns and only 1 col left
+    val distinctCols = invertedIdxColsProps.toSet
+    // extract the inverted index columns
+    fields.foreach(field => {
+      if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
+        invertedIdxCols :+= field.column
+      }
+    }
+    )
+    invertedIdxCols
+  }
+
   /**
    * This will extract the Dimensions and NoDictionary Dimensions fields.
    * By default all string cols are dimensions.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 3bb7731..f5149e8 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -58,6 +58,7 @@ case class TableModel(
     varcharCols: Option[Seq[String]],
     highcardinalitydims: Option[Seq[String]],
     noInvertedIdxCols: Option[Seq[String]],
+    innvertedIdxCols: Option[Seq[String]],
     colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None,
     bucketFields: Option[BucketFields],
     partitionInfo: Option[PartitionInfo],
@@ -236,6 +237,11 @@ class AlterTableColumnSchemaGenerator(
     val existingColsSize = tableCols.size
     var allColumns = tableCols.filter(x => x.isDimensionColumn)
     var newCols = Seq[ColumnSchema]()
+    var invertedIndxCols: Array[String] = Array[String]()
+    if (alterTableModel.tableProperties.get(CarbonCommonConstants.INVERTED_INDEX).isDefined) {
+      invertedIndxCols = alterTableModel.tableProperties(CarbonCommonConstants.INVERTED_INDEX)
+        .split(',').map(_.trim)
+    }
 
     alterTableModel.dimCols.foreach(field => {
       val encoders = new java.util.ArrayList[Encoding]()
@@ -271,6 +277,14 @@ class AlterTableColumnSchemaGenerator(
       newCols ++= Seq(columnSchema)
     })
 
+    if (invertedIndxCols.nonEmpty) {
+      for (column <- newCols) {
+        if (invertedIndxCols.contains(column.getColumnName) && column.isDimensionColumn) {
+          column.setUseInvertedIndex(true)
+        }
+      }
+    }
+
     // Check if there is any duplicate measures or dimensions.
     // Its based on the dimension name and measure name
     allColumns.filter(x => !x.isInvisible).groupBy(_.getColumnName)
@@ -537,7 +551,6 @@ object TableNewProcessor {
     columnSchema.setPrecision(precision)
     columnSchema.setScale(scale)
     columnSchema.setSchemaOrdinal(schemaOrdinal)
-    columnSchema.setUseInvertedIndex(isDimensionCol)
     columnSchema.setSortColumn(isSortColumn)
     columnSchema
   }
@@ -773,19 +786,20 @@ class TableNewProcessor(cm: TableModel) {
 
     val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
 
-    // Setting the boolean value of useInvertedIndex in column schema, if Paranet table is defined
+    // Setting the boolean value of useInvertedIndex in column schema, if Parent table is defined
     // Encoding is already decided above
     if (!cm.parentTable.isDefined) {
       val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq())
+      val invertedIndexCols = cm.innvertedIdxCols.getOrElse(Seq())
       LOGGER.info("NoINVERTEDINDEX columns are : " + noInvertedIndexCols.mkString(","))
       for (column <- allColumns) {
         // When the column is measure or the specified no inverted index column in DDL,
         // set useInvertedIndex to false, otherwise true.
-        if (noInvertedIndexCols.contains(column.getColumnName) ||
-            cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
-          column.setUseInvertedIndex(false)
-        } else {
+        if (invertedIndexCols.contains(column.getColumnName) &&
+            !cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
           column.setUseInvertedIndex(true)
+        } else {
+          column.setUseInvertedIndex(false)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
index 59cf2d8..0d7ddae 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -244,6 +244,16 @@ object CarbonSparkDataSourceUtil {
       case _ => null
     }
     builder.sortBy(sortCols)
+    val invertedIdxCols = options.get(CarbonCommonConstants.INVERTED_INDEX) match {
+      case Some(cols) =>
+        if (cols.trim.isEmpty) {
+          Array[String]()
+        } else {
+          cols.split(",").map(_.trim)
+        }
+      case _ => null
+    }
+    builder.invertedIndexFor(invertedIdxCols)
     val longStringColumns: String = options
       .getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS, null)
     if (longStringColumns != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 7564158..937f0d9 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -386,8 +386,10 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     // Saves dataframe to carbon file
     df.write
       .format("parquet").saveAsTable("testparquet")
-    spark.sql("create table carbon_table(c1 string, c2 string, number int) using carbon options('table_blocksize'='256')")
+    spark.sql("create table carbon_table(c1 string, c2 string, number int) using carbon options('table_blocksize'='256','inverted_index'='c1')")
+    spark.sql("describe formatted carbon_table").show()
     TestUtil.checkExistence(spark.sql("describe formatted carbon_table"), true, "table_blocksize")
+    TestUtil.checkExistence(spark.sql("describe formatted carbon_table"), true, "inverted_index")
     spark.sql("insert into carbon_table select * from testparquet")
     spark.sql("select * from carbon_table").show()
     spark.sql("drop table if exists carbon_table")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 8edc854..ebef152 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -63,13 +63,13 @@ private[sql] case class CarbonDescribeFormattedCommand(
         if (dimension.hasEncoding(Encoding.DICTIONARY) &&
             !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
           "DICTIONARY, KEY COLUMN" + (if (dimension.hasEncoding(Encoding.INVERTED_INDEX)) {
-            "".concat(",").concat(colComment)
+            ",INVERTEDINDEX".concat(",").concat(colComment)
           } else {
             ",NOINVERTEDINDEX".concat(",").concat(colComment)
           })
         } else {
           "KEY COLUMN" + (if (dimension.hasEncoding(Encoding.INVERTED_INDEX)) {
-            "".concat(",").concat(colComment)
+            ",INVERTEDINDEX".concat(",").concat(colComment)
           } else {
             ",NOINVERTEDINDEX".concat(",").concat(colComment)
           })

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index 363e526..625fb89 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -641,7 +641,29 @@ class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
       """)
 
     sql("alter table NO_INVERTED_CARBON add columns(col1 string,col2 string) tblproperties('NO_INVERTED_INDEX'='col2')")
-    checkExistenceCount(sql("desc formatted NO_INVERTED_CARBON"),2,"NOINVERTEDINDEX")
+    checkExistenceCount(sql("desc formatted NO_INVERTED_CARBON"),4,"NOINVERTEDINDEX")
+  }
+
+  test("inverted index after alter command") {
+    sql("drop table if exists NO_INVERTED_CARBON")
+    sql(
+      """
+           CREATE TABLE IF NOT EXISTS NO_INVERTED_CARBON
+           (id Int, name String, city String)
+           STORED BY 'org.apache.carbondata.format'
+           TBLPROPERTIES('INVERTED_INDEX'='city')
+      """)
+
+    sql("alter table NO_INVERTED_CARBON add columns(col1 string,col2 string) tblproperties('INVERTED_INDEX'='col2')")
+    val descLoc = sql("describe formatted NO_INVERTED_CARBON").collect
+    descLoc.find(_.get(0).toString.contains("name")) match {
+      case Some(row) => assert(row.get(2).toString.contains("INVERTEDINDEX"))
+      case None => assert(false)
+    }
+    descLoc.find(_.get(0).toString.contains("col2")) match {
+      case Some(row) => assert(row.get(2).toString.contains("INVERTEDINDEX"))
+      case None => assert(false)
+    }
   }
 
   test("test if adding column in pre-aggregate table throws exception") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index ce57e46..51be5a8 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -44,9 +44,9 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
     assertResult("table_info1")(result(0).getString(0))
     // 2282 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata
     // and more metadata like written_by and version details are added
-    assertResult(2282)(result(0).getLong(1))
+    assertResult(2267)(result(0).getLong(1))
     assertResult("table_info2")(result(1).getString(0))
-    assertResult(2282)(result(1).getLong(1))
+    assertResult(2267)(result(1).getLong(1))
   }
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 877777f..d3aaf3b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -68,6 +68,7 @@ public class CarbonWriterBuilder {
   private short numOfThreads;
   private Configuration hadoopConf;
   private String writtenByApp;
+  private String[] invertedIndexColumns;
   private enum WRITER_TYPE {
     CSV, AVRO, JSON
   }
@@ -104,6 +105,23 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * sets the list of columns for which inverted index needs to generated
+   * @param invertedIndexColumns is a string array of columns for which inverted index needs to
+   * generated.
+   * If it is null or an empty array, inverted index will be generated for none of the columns
+   * @return updated CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder invertedIndexFor(String[] invertedIndexColumns) {
+    if (invertedIndexColumns != null) {
+      for (int i = 0; i < invertedIndexColumns.length; i++) {
+        invertedIndexColumns[i] = invertedIndexColumns[i].toLowerCase();
+      }
+    }
+    this.invertedIndexColumns = invertedIndexColumns;
+    return this;
+  }
+
+  /**
    * sets the taskNo for the writer. SDKs concurrently running
    * will set taskNo in order to avoid conflicts in file's name during write.
    * @param taskNo is the TaskNo user wants to specify.
@@ -195,6 +213,8 @@ public class CarbonWriterBuilder {
    * j. sort_scope -- "local_sort", "no_sort", "batch_sort". default value is "local_sort"
    * k. long_string_columns -- comma separated string columns which are more than 32k length.
    *                           default value is null.
+   * l. inverted_index -- comma separated string columns for which inverted index needs to be
+   *                      generated
    *
    * @return updated CarbonWriterBuilder
    */
@@ -202,7 +222,8 @@ public class CarbonWriterBuilder {
     Objects.requireNonNull(options, "Table properties should not be null");
     Set<String> supportedOptions = new HashSet<>(Arrays
         .asList("table_blocksize", "table_blocklet_size", "local_dictionary_threshold",
-            "local_dictionary_enable", "sort_columns", "sort_scope", "long_string_columns"));
+            "local_dictionary_enable", "sort_columns", "sort_scope", "long_string_columns",
+            "inverted_index"));
 
     for (String key : options.keySet()) {
       if (!supportedOptions.contains(key.toLowerCase())) {
@@ -233,6 +254,15 @@ public class CarbonWriterBuilder {
         this.withSortScope(entry);
       } else if (entry.getKey().equalsIgnoreCase("long_string_columns")) {
         updateToLoadOptions(entry);
+      } else if (entry.getKey().equalsIgnoreCase("inverted_index")) {
+        //inverted index columns
+        String[] invertedIndexColumns;
+        if (entry.getValue().trim().isEmpty()) {
+          invertedIndexColumns = new String[0];
+        } else {
+          invertedIndexColumns = entry.getValue().split(",");
+        }
+        this.invertedIndexFor(invertedIndexColumns);
       }
     }
     return this;
@@ -535,8 +565,13 @@ public class CarbonWriterBuilder {
       sortColumnsList = Arrays.asList(sortColumns);
     }
     ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()];
+    List<String> invertedIdxColumnsList = new ArrayList<>();
+    if (null != invertedIndexColumns) {
+      invertedIdxColumnsList = Arrays.asList(invertedIndexColumns);
+    }
     Field[] fields = schema.getFields();
-    buildTableSchema(fields, tableSchemaBuilder, sortColumnsList, sortColumnsSchemaList);
+    buildTableSchema(fields, tableSchemaBuilder, sortColumnsList, sortColumnsSchemaList,
+        invertedIdxColumnsList);
 
     tableSchemaBuilder.setSortColumns(Arrays.asList(sortColumnsSchemaList));
     String tableName;
@@ -552,7 +587,8 @@ public class CarbonWriterBuilder {
   }
 
   private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuilder,
-      List<String> sortColumnsList, ColumnSchema[] sortColumnsSchemaList) {
+      List<String> sortColumnsList, ColumnSchema[] sortColumnsSchemaList,
+      List<String> invertedIdxColumnsList) {
     Set<String> uniqueFields = new HashSet<>();
     // a counter which will be used in case of complex array type. This valIndex will be assigned
     // to child of complex array type in the order val1, val2 so that each array type child is
@@ -572,6 +608,20 @@ public class CarbonWriterBuilder {
             "column: " + sortColumn + " specified in sort columns does not exist in schema");
       }
     }
+    // Check if any of the columns specified in inverted index are missing from schema.
+    for (String invertedIdxColumn: invertedIdxColumnsList) {
+      boolean exists = false;
+      for (Field field : fields) {
+        if (field.getFieldName().equalsIgnoreCase(invertedIdxColumn)) {
+          exists = true;
+          break;
+        }
+      }
+      if (!exists) {
+        throw new RuntimeException(
+            "column: " + invertedIdxColumn + " specified in sort columns does not exist in schema");
+      }
+    }
     int i = 0;
     for (Field field : fields) {
       if (null != field) {
@@ -580,6 +630,7 @@ public class CarbonWriterBuilder {
               "Duplicate column " + field.getFieldName() + " found in table schema");
         }
         int isSortColumn = sortColumnsList.indexOf(field.getFieldName());
+        int isInvertedIdxColumn = invertedIdxColumnsList.indexOf(field.getFieldName());
         if (isSortColumn > -1) {
           // unsupported types for ("array", "struct", "double", "float", "decimal")
           if (field.getDataType() == DataTypes.DOUBLE || field.getDataType() == DataTypes.FLOAT
@@ -597,7 +648,8 @@ public class CarbonWriterBuilder {
             DataType complexType =
                 DataTypes.createArrayType(field.getChildren().get(0).getDataType());
             tableSchemaBuilder
-                .addColumn(new StructField(field.getFieldName(), complexType), valIndex, false);
+                .addColumn(new StructField(field.getFieldName(), complexType), valIndex, false,
+                    isInvertedIdxColumn > -1);
           } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")) {
             // Loop through the inner columns and for a StructData
             List<StructField> structFieldsArray =
@@ -608,18 +660,20 @@ public class CarbonWriterBuilder {
             }
             DataType complexType = DataTypes.createStructType(structFieldsArray);
             tableSchemaBuilder
-                .addColumn(new StructField(field.getFieldName(), complexType), valIndex, false);
+                .addColumn(new StructField(field.getFieldName(), complexType), valIndex, false,
+                    isInvertedIdxColumn > -1);
           } else if (field.getDataType().getName().equalsIgnoreCase("MAP")) {
             // Loop through the inner columns for MapType
             DataType mapType = DataTypes.createMapType(((MapType) field.getDataType()).getKeyType(),
                 field.getChildren().get(0).getDataType());
             tableSchemaBuilder
-                .addColumn(new StructField(field.getFieldName(), mapType), valIndex, false);
+                .addColumn(new StructField(field.getFieldName(), mapType), valIndex, false,
+                    isInvertedIdxColumn > -1);
           }
         } else {
           ColumnSchema columnSchema = tableSchemaBuilder
-              .addColumn(new StructField(field.getFieldName(), field.getDataType()),
-                  valIndex, isSortColumn > -1);
+              .addColumn(new StructField(field.getFieldName(), field.getDataType()), valIndex,
+                  isSortColumn > -1, isInvertedIdxColumn > -1);
           if (isSortColumn > -1) {
             columnSchema.setSortColumn(true);
             sortColumnsSchemaList[isSortColumn] = columnSchema;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c94c8ce5/tools/cli/src/test/java/org/apache/carbondata/tool/CarbonCliTest.java
----------------------------------------------------------------------
diff --git a/tools/cli/src/test/java/org/apache/carbondata/tool/CarbonCliTest.java b/tools/cli/src/test/java/org/apache/carbondata/tool/CarbonCliTest.java
index e2db442..e929b50 100644
--- a/tools/cli/src/test/java/org/apache/carbondata/tool/CarbonCliTest.java
+++ b/tools/cli/src/test/java/org/apache/carbondata/tool/CarbonCliTest.java
@@ -94,7 +94,7 @@ public class CarbonCliTest {
     String expectedOutput = buildLines(
         "Input Folder: ./CarbonCliTest",
         "## Summary",
-        "total: 6 blocks, 2 shards, 14 blocklets, 314 pages, 10,000,000 rows, 32.27MB",
+        "total: 6 blocks, 2 shards, 14 blocklets, 314 pages, 10,000,000 rows, 32.26MB",
         "avg: 5.38MB/block, 2.30MB/blocklet, 1,666,666 rows/block, 714,285 rows/blocklet");
     Assert.assertTrue(output.contains(expectedOutput));
 
@@ -105,9 +105,9 @@ public class CarbonCliTest {
     output = new String(out.toByteArray());
 
     expectedOutput = buildLines(
-        "Column Name  Data Type  Column Type  SortColumn  Encoding          Ordinal  Id  ",
-        "name         STRING     dimension    true        [INVERTED_INDEX]  0        NA  ",
-        "age          INT        measure      false       []                1        NA  ");
+        "Column Name  Data Type  Column Type  SortColumn  Encoding  Ordinal  Id  ",
+        "name         STRING     dimension    true        []        0        NA  ",
+        "age          INT        measure      false       []        1        NA  ");
     Assert.assertTrue(output.contains(expectedOutput));
 
     String[] args3 = {"-cmd", "summary", "-p", path, "-t"};
@@ -135,7 +135,7 @@ public class CarbonCliTest {
         "1    1      25        800,000  2.58MB    ",
         "2    0      25        800,000  2.58MB    ",
         "2    1      25        800,000  2.58MB    ",
-        "2    2      7         200,000  660.79KB  ");
+        "2    2      7         200,000  660.70KB  ");
     Assert.assertTrue(output.contains(expectedOutput));
 
     String[] args5 = {"-cmd", "summary", "-p", path, "-c", "name"};
@@ -146,13 +146,13 @@ public class CarbonCliTest {
 
     expectedOutput = buildLines(
         "BLK  BLKLT  Meta Size  Data Size  LocalDict  DictEntries  DictSize  AvgPageSize  Min%  Max%  Min     Max     ",
-        "0    0      1.81KB     295.98KB   false      0            0.0B      11.77KB      NA    NA    robot0  robot1  ",
-        "0    1      1.81KB     295.99KB   false      0            0.0B      11.77KB      NA    NA    robot1  robot3  ",
-        "1    0      1.81KB     295.98KB   false      0            0.0B      11.77KB      NA    NA    robot3  robot4  ",
-        "1    1      1.81KB     295.99KB   false      0            0.0B      11.77KB      NA    NA    robot4  robot6  ",
-        "2    0      1.81KB     295.98KB   false      0            0.0B      11.77KB      NA    NA    robot6  robot7  ",
-        "2    1      1.81KB     295.98KB   false      0            0.0B      11.77KB      NA    NA    robot8  robot9  ",
-        "2    2      519.0B     74.06KB    false      0            0.0B      10.51KB      NA    NA    robot9  robot9  ");
+        "0    0      1.74KB     295.67KB   false      0            0.0B      11.76KB      NA    NA    robot0  robot1  ",
+        "0    1      1.74KB     295.67KB   false      0            0.0B      11.76KB      NA    NA    robot1  robot3  ",
+        "1    0      1.74KB     295.67KB   false      0            0.0B      11.76KB      NA    NA    robot3  robot4  ",
+        "1    1      1.74KB     295.67KB   false      0            0.0B      11.76KB      NA    NA    robot4  robot6  ",
+        "2    0      1.74KB     295.67KB   false      0            0.0B      11.76KB      NA    NA    robot6  robot7  ",
+        "2    1      1.74KB     295.67KB   false      0            0.0B      11.76KB      NA    NA    robot8  robot9  ",
+        "2    2      498.0B     73.97KB    false      0            0.0B      10.50KB      NA    NA    robot9  robot9  ");
     Assert.assertTrue(output.contains(expectedOutput));
   }
 
@@ -167,15 +167,15 @@ public class CarbonCliTest {
     String expectedOutput = buildLines(
         "Input Folder: ./CarbonCliTest",
         "## Summary",
-        "total: 6 blocks, 2 shards, 14 blocklets, 314 pages, 10,000,000 rows, 32.27MB",
+        "total: 6 blocks, 2 shards, 14 blocklets, 314 pages, 10,000,000 rows, 32.26MB",
         "avg: 5.38MB/block, 2.30MB/blocklet, 1,666,666 rows/block, 714,285 rows/blocklet");
 
     Assert.assertTrue(output.contains(expectedOutput));
 
     expectedOutput = buildLines(
-        "Column Name  Data Type  Column Type  SortColumn  Encoding          Ordinal  Id  ",
-        "name         STRING     dimension    true        [INVERTED_INDEX]  0        NA  ",
-        "age          INT        measure      false       []                1        NA  ");
+        "Column Name  Data Type  Column Type  SortColumn  Encoding  Ordinal  Id  ",
+        "name         STRING     dimension    true        []        0        NA  ",
+        "age          INT        measure      false       []        1        NA  ");
     Assert.assertTrue(output.contains(expectedOutput));
 
     expectedOutput = buildLines(
@@ -219,7 +219,7 @@ public class CarbonCliTest {
     System.out.println(output);
     String expectedOutput = buildLines(
         "Blocklet 0:",
-        "Page 0 (offset 0, length 12049): DataChunk2(chunk_meta:ChunkCompressionMeta(compression_codec:DEPRECATED, total_uncompressed_size:256000, total_compressed_size:12049, compressor_name:snappy), rowMajor:false, data_page_length:12039, rowid_page_length:10, presence:PresenceMeta(represents_presence:false, present_bit_stream:00), sort_state:SORT_EXPLICIT, encoders:[INVERTED_INDEX], encoder_meta:[], min_max:BlockletMinMaxIndex(min_values:[72 6F 62 6F 74 30], max_values:[72 6F 62 6F 74 30], min_max_presence:[true]), numberOfRowsInpage:32000)");
+        "Page 0 (offset 0, length 12039): DataChunk2(chunk_meta:ChunkCompressionMeta(compression_codec:DEPRECATED, total_uncompressed_size:256000, total_compressed_size:12039, compressor_name:snappy), rowMajor:false, data_page_length:12039, presence:PresenceMeta(represents_presence:false, present_bit_stream:00), sort_state:SORT_NATIVE, encoders:[], encoder_meta:[], min_max:BlockletMinMaxIndex(min_values:[72 6F 62 6F 74 30], max_values:[72 6F 62 6F 74 30], min_max_presence:[true]), numberOfRowsInpage:32000)");
     Assert.assertTrue(output.contains(expectedOutput));
   }