You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/16 19:05:35 UTC

[carbondata] 02/22: [CARBONDATA-3001] configurable page size in MB

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit eeb7e3a9b52f07e8298091252638175328a7aa9c
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Mon Oct 15 18:49:33 2018 +0530

    [CARBONDATA-3001] configurable page size in MB
    
    Changes proposed in this PR:
    
    supported a table property table_page_size_inmb (1 MB to 1755 MB), configurable page size for each table in below scenarios.
    
    TBLProperties in creating table
    API in SdkWriter
    Options in creating table using spark file format
    Options in DataFrameWriter
    If this table properties is not configured, Default vaue will be taken (1 MB) [currently no default value, will set in next version].
    Based on this property value. Page will be cut if it crosses the value before 32000 rows. This enables in fitting pages into cache.
    
    This closes #2814
---
 .../core/constants/CarbonCommonConstants.java      |  14 ++
 .../core/datastore/blocklet/EncodedBlocklet.java   |  19 +++
 .../blockletindex/BlockletDataRefNode.java         |   6 +-
 .../core/metadata/blocklet/BlockletInfo.java       |  10 ++
 .../metadata/schema/table/TableSchemaBuilder.java  |  10 ++
 .../carbondata/core/util/CarbonMetadataUtil.java   |  15 +-
 .../core/util/DataFileFooterConverterV3.java       |   8 +
 docs/carbon-as-spark-datasource-guide.md           |   1 +
 docs/ddl-of-carbondata.md                          |  13 ++
 docs/sdk-guide.md                                  |   1 +
 format/src/main/thrift/carbondata.thrift           |   1 +
 .../TestCreateTableWithPageSizeInMb.scala          |  67 ++++++++
 .../TestNonTransactionalCarbonTable.scala          |  49 ++++++
 .../org/apache/carbondata/spark/CarbonOption.scala |   2 +
 .../apache/carbondata/spark/util/CommonUtil.scala  |  32 ++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |   3 +-
 .../datasources/CarbonSparkDataSourceUtil.scala    |   4 +
 .../apache/spark/sql/CarbonDataFrameWriter.scala   |   1 +
 .../table/CarbonDescribeFormattedCommand.scala     |   9 +-
 .../sql/CarbonGetTableDetailComandTestCase.scala   |   0
 .../processing/datatypes/ArrayDataType.java        |  15 ++
 .../processing/datatypes/GenericDataType.java      |   5 +
 .../processing/datatypes/PrimitiveDataType.java    |   6 +
 .../processing/datatypes/StructDataType.java       |  14 ++
 .../store/CarbonFactDataHandlerColumnar.java       | 190 ++++++++++++++++-----
 .../store/CarbonFactDataHandlerModel.java          | 106 ++++++------
 .../carbondata/processing/store/TablePage.java     |  36 +---
 .../writer/v3/CarbonFactDataWriterImplV3.java      |   7 +
 .../carbondata/sdk/file/CarbonWriterBuilder.java   |  26 ++-
 29 files changed, 540 insertions(+), 130 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 69374ad..e02241e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1999,6 +1999,20 @@ public final class CarbonCommonConstants {
    */
   public static final int CARBON_ALLOW_DIRECT_FILL_DICT_COLS_LIMIT = 100;
 
+  /**
+   * page size in mb. If page size exceeds this value before 32000 rows count, page will be cut.
+   * And remaining rows will written in next page.
+   */
+  public static final String TABLE_PAGE_SIZE_INMB = "table_page_size_inmb";
+
+  public static final int TABLE_PAGE_SIZE_MIN_INMB = 1;
+
+  // default 1 MB
+  public static final int TABLE_PAGE_SIZE_INMB_DEFAULT = 1;
+
+  // As due to SnappyCompressor.MAX_BYTE_TO_COMPRESS is 1.75 GB
+  public static final int TABLE_PAGE_SIZE_MAX_INMB = 1755;
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Unused constants and parameters start here
   //////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
index d017145..8a19522 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/blocklet/EncodedBlocklet.java
@@ -63,6 +63,11 @@ public class EncodedBlocklet {
   private int numberOfPages;
 
   /**
+   * row count in each page
+   */
+  private List<Short> rowCountInPage;
+
+  /**
    * is decoder based fallback is enabled or not
    */
   private boolean isDecoderBasedFallBackEnabled;
@@ -77,6 +82,7 @@ public class EncodedBlocklet {
     this.executorService = executorService;
     this.isDecoderBasedFallBackEnabled = isDecoderBasedFallBackEnabled;
     this.localDictionaryGeneratorMap = localDictionaryGeneratorMap;
+    this.rowCountInPage = new ArrayList<>();
   }
 
   /**
@@ -90,10 +96,14 @@ public class EncodedBlocklet {
     if (null == pageMetadataList) {
       pageMetadataList = new ArrayList<>();
     }
+    if (null == rowCountInPage) {
+      rowCountInPage = new ArrayList<>();
+    }
     // update details
     blockletSize += encodedTablePage.getPageSize();
     pageMetadataList.add(encodedTablePage.getPageKey());
     this.numberOfPages++;
+    rowCountInPage.add((short)encodedTablePage.getPageSize());
   }
 
   /**
@@ -187,11 +197,20 @@ public class EncodedBlocklet {
     return this.numberOfPages;
   }
 
+  public List<Short> getRowCountInPage() {
+    return rowCountInPage;
+  }
+
+  public void setRowCountInPage(List<Short> rowCountInPage) {
+    this.rowCountInPage = rowCountInPage;
+  }
+
   public void clear() {
     this.numberOfPages = 0;
     this.encodedDimensionColumnPages = null;
     this.blockletSize = 0;
     this.encodedMeasureColumnPages = null;
     this.pageMetadataList = null;
+    this.rowCountInPage = null;
   }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
index 9046ade..fe372e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
@@ -85,7 +85,11 @@ public class BlockletDataRefNode implements DataRefNode {
       if (lastPageRowCount > 0) {
         pageRowCount[pageRowCount.length - 1] = lastPageRowCount;
       }
-      detailInfo.getBlockletInfo().setNumberOfRowsPerPage(pageRowCount);
+      // V3 old store to V3 new store compatibility. V3 new store will get this info in thrift.
+      // so don't overwrite it with hardcoded values.
+      if (detailInfo.getBlockletInfo().getNumberOfRowsPerPage() == null) {
+        detailInfo.getBlockletInfo().setNumberOfRowsPerPage(pageRowCount);
+      }
     }
     this.index = index;
     this.dimensionLens = dimensionLens;
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
index 104ef1a..717bdbf 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -230,6 +230,12 @@ public class BlockletInfo implements Serializable, Writable {
     if (isSortedPresent) {
       output.writeBoolean(isSorted);
     }
+    if (null != getNumberOfRowsPerPage()) {
+      output.writeShort(getNumberOfRowsPerPage().length);
+      for (int i = 0; i < getNumberOfRowsPerPage().length; i++) {
+        output.writeInt(getNumberOfRowsPerPage()[i]);
+      }
+    }
   }
 
   /**
@@ -301,6 +307,10 @@ public class BlockletInfo implements Serializable, Writable {
     if (isSortedPresent) {
       this.isSorted = input.readBoolean();
     }
+    numberOfRowsPerPage = new int[input.readShort()];
+    for (int i = 0; i < numberOfRowsPerPage.length; i++) {
+      numberOfRowsPerPage[i] = input.readInt();
+    }
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 3c290af..53542d5 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -60,6 +60,8 @@ public class TableSchemaBuilder {
 
   private int blockletSize;
 
+  private int pageSizeInMb;
+
   private String tableName;
   private boolean isLocalDictionaryEnabled;
   private String localDictionaryThreshold;
@@ -80,6 +82,11 @@ public class TableSchemaBuilder {
     return this;
   }
 
+  public TableSchemaBuilder pageSizeInMb(int pageSizeInMb) {
+    this.pageSizeInMb = pageSizeInMb;
+    return this;
+  }
+
   public TableSchemaBuilder localDictionaryThreshold(int localDictionaryThreshold) {
     this.localDictionaryThreshold = String.valueOf(localDictionaryThreshold);
     return this;
@@ -121,6 +128,9 @@ public class TableSchemaBuilder {
     if (blockletSize > 0) {
       property.put(CarbonCommonConstants.TABLE_BLOCKLET_SIZE, String.valueOf(blockletSize));
     }
+    if (pageSizeInMb > 0) {
+      property.put(CarbonCommonConstants.TABLE_PAGE_SIZE_INMB, String.valueOf(pageSizeInMb));
+    }
 
     // Adding local dictionary, applicable only for String(dictionary exclude)
     if (isLocalDictionaryEnabled) {
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 0fe33b0..f35afc0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -403,9 +403,18 @@ public class CarbonMetadataUtil {
     dimensionChunkOffsets.addAll(blockletInfo.getMeasureChunkOffsets());
     List<Integer> dimensionChunksLength = blockletInfo.getDimensionChunksLength();
     dimensionChunksLength.addAll(blockletInfo.getMeasureChunksLength());
-    return new BlockletInfo3(blockletInfo.getNumberOfRows(), dimensionChunkOffsets,
-        dimensionChunksLength, blockletInfo.getDimensionOffset(), blockletInfo.getMeasureOffsets(),
-        blockletInfo.getNumberOfPages());
+    BlockletInfo3 blockletInfo3 =
+        new BlockletInfo3(blockletInfo.getNumberOfRows(), dimensionChunkOffsets,
+            dimensionChunksLength, blockletInfo.getDimensionOffset(),
+            blockletInfo.getMeasureOffsets(), blockletInfo.getNumberOfPages());
+    List<Integer> rowsPerPage = new ArrayList<>();
+    if (null != blockletInfo.getNumberOfRowsPerPage()) {
+      for (int i = 0; i < blockletInfo.getNumberOfRowsPerPage().length; i++) {
+        rowsPerPage.add(blockletInfo.getNumberOfRowsPerPage()[i]);
+      }
+      blockletInfo3.setRow_count_in_page(rowsPerPage);
+    }
+    return blockletInfo3;
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index d6d91ed..d278f7a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -144,6 +144,14 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
     blockletInfo.setDimensionOffset(blockletInfoThrift.getDimension_offsets());
     blockletInfo.setMeasureOffsets(blockletInfoThrift.getMeasure_offsets());
     blockletInfo.setNumberOfPages(blockletInfoThrift.getNumber_number_of_pages());
+    if (blockletInfoThrift.getRow_count_in_page() != null
+        && blockletInfoThrift.getRow_count_in_page().size() != 0) {
+      int[] rowCountInPages = new int[blockletInfoThrift.getRow_count_in_page().size()];
+      for (int i = 0; i < blockletInfoThrift.getRow_count_in_page().size(); i++) {
+        rowCountInPages[i] = blockletInfoThrift.getRow_count_in_page().get(i);
+      }
+      blockletInfo.setNumberOfRowsPerPage(rowCountInPages);
+    }
     return blockletInfo;
   }
 
diff --git a/docs/carbon-as-spark-datasource-guide.md b/docs/carbon-as-spark-datasource-guide.md
index bc56a54..598acb0 100644
--- a/docs/carbon-as-spark-datasource-guide.md
+++ b/docs/carbon-as-spark-datasource-guide.md
@@ -44,6 +44,7 @@ Now you can create Carbon table using Spark's datasource DDL syntax.
 |-----------|--------------|------------|
 | table_blocksize | 1024 | Size of blocks to write onto hdfs. For  more details, see [Table Block Size Configuration](./ddl-of-carbondata.md#table-block-size-configuration). |
 | table_blocklet_size | 64 | Size of blocklet to write. |
+| table_page_size_inmb | 0 | Size of each page in carbon table, if page size crosses this value before 32000 rows, page will be cut to that may rows. Helps in keep page size to fit cache size |
 | local_dictionary_threshold | 10000 | Cardinality upto which the local dictionary can be generated. For  more details, see [Local Dictionary Configuration](./ddl-of-carbondata.md#local-dictionary-configuration). |
 | local_dictionary_enable | false | Enable local dictionary generation. For  more details, see [Local Dictionary Configuration](./ddl-of-carbondata.md#local-dictionary-configuration). |
 | sort_columns | all dimensions are sorted | Columns to include in sort and its order of sort. For  more details, see [Sort Columns Configuration](./ddl-of-carbondata.md#sort-columns-configuration). |
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 07a2670..88615a2 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -94,6 +94,7 @@ CarbonData DDL statements are documented here,which includes:
 | [SORT_SCOPE](#sort-scope-configuration)                      | Sort scope of the load.Options include no sort, local sort ,batch sort and global sort |
 | [TABLE_BLOCKSIZE](#table-block-size-configuration)           | Size of blocks to write onto hdfs                            |
 | [TABLE_BLOCKLET_SIZE](#table-blocklet-size-configuration)    | Size of blocklet to write in the file                        |
+| [TABLE_PAGE_SIZE_INMB](#table-page-size-configuration)       | Size of page in MB; if page size crosses this value before 32000 rows, page will be cut to this many rows and remaining rows are processed in the subsequent pages. This helps in keeping page size to fit in cpu cache size|
 | [MAJOR_COMPACTION_SIZE](#table-compaction-configuration)     | Size upto which the segments can be combined into one        |
 | [AUTO_LOAD_MERGE](#table-compaction-configuration)           | Whether to auto compact the segments                         |
 | [COMPACTION_LEVEL_THRESHOLD](#table-compaction-configuration) | Number of segments to compact into one segment               |
@@ -283,6 +284,18 @@ CarbonData DDL statements are documented here,which includes:
      TBLPROPERTIES ('TABLE_BLOCKLET_SIZE'='8')
      ```
 
+   - ##### Table page Size Configuration
+
+     This property is for setting page size in the carbondata file 
+     and supports a range of 1 MB to 1755 MB.
+     If page size crosses this value before 32000 rows, page will be cut to that many rows. 
+     Helps in keeping page size to fit cpu cache size.
+
+     Example usage:
+     ```
+     TBLPROPERTIES ('TABLE_PAGE_SIZE_INMB'='5')
+     ```
+
    - ##### Table Compaction Configuration
    
      These properties are table level compaction configurations, if not specified, system level configurations in carbon.properties will be used.
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 573b595..e040e64 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -382,6 +382,7 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options);
  *                           default value is null.
  * l. inverted_index -- comma separated string columns for which inverted index needs to be
  *                      generated
+ * m. table_page_size_inmb -- [1-1755] MB. 
  *
  * @return updated CarbonWriterBuilder
  */
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index 5cad5ac..7dcd4d3 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -180,6 +180,7 @@ struct BlockletInfo3{
     4: required i64 dimension_offsets;
     5: required i64 measure_offsets;
     6: required i32 number_number_of_pages; // This is rquired for alter table, in case of alter table when filter is only selected on new added column this will help
+    7: optional list<i32> row_count_in_page; // This will contain the row count in each page.
   }
 
 /**
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithPageSizeInMb.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithPageSizeInMb.scala
new file mode 100644
index 0000000..ce374eb
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithPageSizeInMb.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
+/**
+ * Test functionality of create table with page size
+ */
+class TestCreateTableWithPageSizeInMb extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("use default")
+    sql("drop table if exists source")
+  }
+
+  test("test create table with invalid page size") {
+    val ex = intercept[MalformedCarbonCommandException] {
+      sql(
+        "CREATE TABLE T1(name String) STORED AS CARBONDATA TBLPROPERTIES" +
+        "('table_page_size_inmb'='3X')")
+    }
+    assert(ex.getMessage.toLowerCase.contains("invalid table_page_size_inmb"))
+    val ex1 = intercept[MalformedCarbonCommandException] {
+      sql(
+        "CREATE TABLE T1(name String) STORED AS CARBONDATA TBLPROPERTIES" +
+        "('table_page_size_inmb'='0')")
+    }
+    assert(ex1.getMessage.toLowerCase.contains("invalid table_page_size_inmb"))
+    val ex2 = intercept[MalformedCarbonCommandException] {
+      sql(
+        "CREATE TABLE T1(name String) STORED AS CARBONDATA TBLPROPERTIES" +
+        "('table_page_size_inmb'='-1')")
+    }
+    assert(ex2.getMessage.toLowerCase.contains("invalid table_page_size_inmb"))
+    val ex3 = intercept[MalformedCarbonCommandException] {
+      sql(
+        "CREATE TABLE T1(name String) STORED AS CARBONDATA TBLPROPERTIES" +
+        "('table_page_size_inmb'='1999')")
+    }
+    assert(ex3.getMessage.toLowerCase.contains("invalid table_page_size_inmb"))
+  }
+
+  override def afterAll {
+    sql("use default")
+    sql("drop table if exists source")
+  }
+
+}
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 06d41b1..274cca9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -33,6 +33,7 @@ import org.apache.avro.file.DataFileWriter
 import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
 import org.apache.avro.io.{DecoderFactory, Encoder}
 import org.apache.commons.io.FileUtils
+import org.apache.commons.lang.RandomStringUtils
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
 import org.junit.Assert
@@ -51,6 +52,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.reader.CarbonFooterReaderV3
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.{CarbonMetadataUtil, CarbonProperties, CarbonUtil, DataFileFooterConverterV3}
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.sdk.file._
@@ -2537,6 +2540,52 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     FileUtils.deleteDirectory(new File(writerPath))
   }
 
+  test("Test with long string columns with 1 MB pageSize") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    // here we specify the long string column as varchar
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"  address    \":\"varchar\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"note\":\"varchar\"}\n")
+      .append("]")
+      .toString()
+    val builder = CarbonWriter.builder()
+    val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema))
+      .withPageSizeInMb(1)
+      .writtenBy("TestCreateTableUsingSparkCarbonFileFormat").build()
+    val totalRecordsNum = 10
+    for (i <- 0 until totalRecordsNum) {
+      // write a varchar with 250,000 length
+      writer
+        .write(Array[String](s"name_$i",
+          RandomStringUtils.randomAlphabetic(250000),
+          i.toString,
+          RandomStringUtils.randomAlphabetic(250000)))
+    }
+    writer.close()
+
+    // read footer and verify number of pages
+    val folder = FileFactory.getCarbonFile(writerPath)
+    val files = folder.listFiles(true)
+    import scala.collection.JavaConverters._
+    val dataFiles = files.asScala.filter(_.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT))
+    dataFiles.foreach { dataFile =>
+      val fileReader = FileFactory
+        .getFileHolder(FileFactory.getFileType(dataFile.getPath))
+      val buffer = fileReader
+        .readByteBuffer(FileFactory.getUpdatedFilePath(dataFile.getPath), dataFile.getSize - 8, 8)
+      val footerReader = new CarbonFooterReaderV3(
+        dataFile.getAbsolutePath,
+        buffer.getLong)
+      val footer = footerReader.readFooterVersion3
+      // without page_size configuration there will be only 1 page, now it will be more.
+      assert(footer.getBlocklet_info_list3.get(0).number_number_of_pages != 1)
+    }
+
+  }
+
   def generateCarbonData(builder :CarbonWriterBuilder): Unit ={
     val fields = new Array[Field](3)
     fields(0) = new Field("name", DataTypes.STRING)
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index fbbca56..a22c8cb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -62,6 +62,8 @@ class CarbonOption(options: Map[String, String]) {
 
   lazy val tableBlockletSize: Option[String] = options.get("table_blocklet_size")
 
+  lazy val tablePageSizeInMb: Option[String] = options.get("table_page_size_inmb")
+
   lazy val bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt
 
   lazy val bucketColumns: String = options.getOrElse("bucketcolumns", "")
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 7887d87..d90c6b2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -543,6 +543,38 @@ object CommonUtil {
   }
 
   /**
+   * This method will validate the table page size
+   *
+   * @param tableProperties table property specified by user
+   * @param propertyName property name
+   */
+  def validatePageSizeInmb(tableProperties: Map[String, String], propertyName: String): Unit = {
+    var size: Integer = 0
+    if (tableProperties.get(propertyName).isDefined) {
+      val pageSize: String =
+        parsePropertyValueStringInMB(tableProperties(propertyName))
+      val minPageSize = CarbonCommonConstants.TABLE_PAGE_SIZE_MIN_INMB
+      val maxPageSize = CarbonCommonConstants.TABLE_PAGE_SIZE_MAX_INMB
+      try {
+        size = Integer.parseInt(pageSize)
+      } catch {
+        case e: NumberFormatException =>
+          throw new MalformedCarbonCommandException(s"Invalid $propertyName value found: " +
+                                                    s"$pageSize, only int value from $minPageSize" +
+                                                    s" to " +
+                                                    s"$maxPageSize is supported.")
+      }
+      if (size < minPageSize || size > maxPageSize) {
+        throw new MalformedCarbonCommandException(s"Invalid $propertyName value found: " +
+                                                  s"$pageSize, only int value from $minPageSize " +
+                                                  s"to " +
+                                                  s"$maxPageSize is supported.")
+      }
+      tableProperties.put(propertyName, pageSize)
+    }
+  }
+
+  /**
    * This method will parse the configure string from 'XX MB/M' to 'XX'
    *
    * @param propertyValueString
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index e03bebd..3cb068f 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -450,9 +450,10 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
           partitionColIntersecLongStrCols.mkString(",")
         } both in partition and long_string_columns which is not allowed.")
     }
-    // validate the block size and blocklet size in table properties
+    // validate the block size and blocklet size, page size in table properties
     CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKSIZE)
     CommonUtil.validateSize(tableProperties, CarbonCommonConstants.TABLE_BLOCKLET_SIZE)
+    CommonUtil.validatePageSizeInmb(tableProperties, CarbonCommonConstants.TABLE_PAGE_SIZE_INMB)
     // validate table level properties for compaction
     CommonUtil.validateTableLevelCompactionProperties(tableProperties)
     // validate flat folder property.
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
index 1649afd..71dba3d 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -229,6 +229,10 @@ object CarbonSparkDataSourceUtil {
     if (blockletSize.isDefined) {
       builder.withBlockletSize(blockletSize.get)
     }
+    val pageSizeInMb = options.get("table_page_size_inmb").map(_.toInt)
+    if (pageSizeInMb.isDefined) {
+      builder.withPageSizeInMb(pageSizeInMb.get)
+    }
     builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
       CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean)
     builder.localDictionaryThreshold(
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index f335509..8885f4a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -87,6 +87,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
       "LONG_STRING_COLUMNS" -> options.longStringColumns,
       "TABLE_BLOCKSIZE" -> options.tableBlockSize,
       "TABLE_BLOCKLET_SIZE" -> options.tableBlockletSize,
+      "TABLE_PAGE_SIZE_INMB" -> options.tablePageSizeInMb,
       "STREAMING" -> Option(options.isStreaming.toString)
     ).filter(_._2.isDefined)
       .map(property => s"'${property._1}' = '${property._2.get}'").mkString(",")
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index e2a2451..e8f0f23 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -83,6 +83,12 @@ private[sql] case class CarbonDescribeFormattedCommand(
     val catalog = sparkSession.sessionState.catalog
     val catalogTable = catalog.getTableMetadata(tblIdentifier)
 
+    val pageSizeInMb: String = if (tblProps.get(CarbonCommonConstants.TABLE_PAGE_SIZE_INMB)
+      .isDefined) {
+      tblProps(CarbonCommonConstants.TABLE_PAGE_SIZE_INMB)
+    } else {
+      ""
+    }
     //////////////////////////////////////////////////////////////////////////////
     // Table Basic Information
     //////////////////////////////////////////////////////////////////////////////
@@ -122,7 +128,8 @@ private[sql] case class CarbonDescribeFormattedCommand(
         carbonTable.getMinMaxCachedColumnsInCreateOrder.asScala.mkString(", "), ""),
       ("Min/Max Index Cache Level",
         tblProps.getOrElse(CarbonCommonConstants.CACHE_LEVEL,
-          CarbonCommonConstants.CACHE_LEVEL_DEFAULT_VALUE), "")
+          CarbonCommonConstants.CACHE_LEVEL_DEFAULT_VALUE), ""),
+      ("Table page size in mb", pageSizeInMb, "")
     )
 
     //////////////////////////////////////////////////////////////////////////////
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
new file mode 100644
index 0000000..e69de29
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 4cdd2b9..b64e2a8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -72,6 +72,9 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
    */
   private int dataCounter;
 
+  /* flat complex datatype length, including the children*/
+  private int depth;
+
   private ArrayDataType(int outputArrayIndex, int dataCounter, GenericDataType children,
       String name) {
     this.outputArrayIndex = outputArrayIndex;
@@ -322,4 +325,16 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
             name, false));
     children.getComplexColumnInfo(columnInfoList);
   }
+
+  @Override
+  public int getDepth() {
+    if (depth == 0) {
+      // calculate only one time
+      List<ComplexColumnInfo> complexColumnInfoList = new ArrayList<>();
+      getComplexColumnInfo(complexColumnInfoList);
+      depth = complexColumnInfoList.size();
+    }
+    return depth;
+  }
+
 }
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 8fe4923..fb8b513 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -158,4 +158,9 @@ public interface GenericDataType<T> {
   GenericDataType<T> deepCopy();
 
   void getComplexColumnInfo(List<ComplexColumnInfo> columnInfoList);
+
+  /**
+   * @return depth of the complex columns , this is the length of flattened complex data.
+   */
+  int getDepth();
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 18dc89d..200a9f6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -581,4 +581,10 @@ public class PrimitiveDataType implements GenericDataType<Object> {
             name, !isDictionary));
   }
 
+  @Override
+  public int getDepth() {
+    // primitive type has no children
+    return 1;
+  }
+
 }
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 3697a09..76ccf17 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -68,6 +68,9 @@ public class StructDataType implements GenericDataType<StructObject> {
    */
   private int dataCounter;
 
+  /* flat complex datatype length, including the children*/
+  private int depth;
+
   private StructDataType(List<GenericDataType> children, int outputArrayIndex, int dataCounter,
       String name) {
     this.children = children;
@@ -357,4 +360,15 @@ public class StructDataType implements GenericDataType<StructObject> {
       children.get(i).getComplexColumnInfo(columnInfoList);
     }
   }
+
+  @Override
+  public int getDepth() {
+    if (depth == 0) {
+      // calculate only one time
+      List<ComplexColumnInfo> complexColumnInfoList = new ArrayList<>();
+      getComplexColumnInfo(complexColumnInfoList);
+      depth = complexColumnInfoList.size();
+    }
+    return depth;
+  }
 }
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 1270b1f..76c5613 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -17,9 +17,14 @@
 
 package org.apache.carbondata.processing.store;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -47,7 +52,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
 
@@ -88,7 +92,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private ExecutorService consumerExecutorService;
   private List<Future<Void>> consumerExecutorServiceTaskList;
   private List<CarbonRow> dataRows;
-  private int[] varcharColumnSizeInByte;
+  private int[] noDictColumnPageSize;
   /**
    * semaphore which will used for managing node holder objects
    */
@@ -120,6 +124,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private ColumnarFormatVersion version;
 
+  /*
+  * cannot use the indexMap of model directly,
+  * modifying map in model directly will create problem if accessed later,
+  * Hence take a copy and work on it.
+  * */
+  private Map<Integer, GenericDataType> complexIndexMapCopy = null;
+
+  /* configured page size in MB*/
+  private int configuredPageSizeInBytes = 0;
+
   /**
    * CarbonFactDataHandler constructor
    */
@@ -137,6 +151,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Columns considered as NoInverted Index are " + noInvertedIdxCol.toString());
     }
+    this.complexIndexMapCopy = new HashMap<>();
+    for (Map.Entry<Integer, GenericDataType> entry: model.getComplexIndexMap().entrySet()) {
+      this.complexIndexMapCopy.put(entry.getKey(), entry.getValue().deepCopy());
+    }
+    String pageSizeStrInBytes =
+        model.getTableSpec().getCarbonTable().getTableInfo().getFactTable().getTableProperties()
+            .get(CarbonCommonConstants.TABLE_PAGE_SIZE_INMB);
+    if (pageSizeStrInBytes != null) {
+      configuredPageSizeInBytes = Integer.parseInt(pageSizeStrInBytes) * 1024 * 1024;
+    }
   }
 
   private void initParameters(CarbonFactDataHandlerModel model) {
@@ -196,11 +220,21 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * @throws CarbonDataWriterException
    */
   public void addDataToStore(CarbonRow row) throws CarbonDataWriterException {
+    int totalComplexColumnDepth = setFlatCarbonRowForComplex(row);
+    if (noDictColumnPageSize == null) {
+      // initialization using first row.
+      model.setNoDictAllComplexColumnDepth(totalComplexColumnDepth);
+      if (model.getNoDictDataTypesList().size() + model.getNoDictAllComplexColumnDepth() > 0) {
+        noDictColumnPageSize =
+            new int[model.getNoDictDataTypesList().size() + model.getNoDictAllComplexColumnDepth()];
+      }
+    }
+
     dataRows.add(row);
     this.entryCount++;
     // if entry count reaches to leaf node size then we are ready to write
     // this to leaf node file and update the intermediate files
-    if (this.entryCount == this.pageSize || isVarcharColumnFull(row)) {
+    if (this.entryCount == this.pageSize || needToCutThePage(row)) {
       try {
         semaphore.acquire();
 
@@ -218,6 +252,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         }
         dataRows = new ArrayList<>(this.pageSize);
         this.entryCount = 0;
+        // re-init the complexIndexMap
+        this.complexIndexMapCopy = new HashMap<>();
+        for (Map.Entry<Integer, GenericDataType> entry : model.getComplexIndexMap().entrySet()) {
+          this.complexIndexMapCopy.put(entry.getKey(), entry.getValue().deepCopy());
+        }
+        noDictColumnPageSize =
+            new int[model.getNoDictDataTypesList().size() + model.getNoDictAllComplexColumnDepth()];
       } catch (InterruptedException e) {
         LOGGER.error(e.getMessage(), e);
         throw new CarbonDataWriterException(e);
@@ -227,50 +268,120 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
   /**
    * Check if column page can be added more rows after adding this row to page.
+   * only few no-dictionary dimensions columns (string, varchar,
+   * complex columns) can grow huge in size.
    *
-   * A varchar column page uses SafeVarLengthColumnPage/UnsafeVarLengthColumnPage to store data
-   * and encoded using HighCardDictDimensionIndexCodec which will call getByteArrayPage() from
-   * column page and flatten into byte[] for compression.
-   * Limited by the index of array, we can only put number of Integer.MAX_VALUE bytes in a page.
-   *
-   * Another limitation is from Compressor. Currently we use snappy as default compressor,
-   * and it will call MaxCompressedLength method to estimate the result size for preparing output.
-   * For safety, the estimate result is oversize: `32 + source_len + source_len/6`.
-   * So the maximum bytes to compress by snappy is (2GB-32)*6/7≈1.71GB.
    *
-   * Size of a row does not exceed 2MB since UnsafeSortDataRows uses 2MB byte[] as rowBuffer.
-   * Such that we can stop adding more row here if any long string column reach this limit.
-   *
-   * If use unsafe column page, please ensure the memory configured is enough.
-   * @param row
-   * @return false if any varchar column page cannot add one more value(2MB)
+   * @param row carbonRow
+   * @return false if next rows can be added to same page.
+   * true if next rows cannot be added to same page
    */
-  private boolean isVarcharColumnFull(CarbonRow row) {
-    //TODO: test and remove this as now  UnsafeSortDataRows can exceed 2MB
-    if (model.getVarcharDimIdxInNoDict().size() > 0) {
+  private boolean needToCutThePage(CarbonRow row) {
+    List<DataType> noDictDataTypesList = model.getNoDictDataTypesList();
+    int totalNoDictPageCount = noDictDataTypesList.size() + model.getNoDictAllComplexColumnDepth();
+    if (totalNoDictPageCount > 0) {
+      int currentElementLength;
+      int bucketCounter = 0;
+      if (configuredPageSizeInBytes == 0) {
+        // no need to cut the page
+        // use default value
+        /*configuredPageSizeInBytes =
+            CarbonCommonConstants.TABLE_PAGE_SIZE_INMB_DEFAULT * 1024 * 1024;*/
+        return false;
+      }
       Object[] nonDictArray = WriteStepRowUtil.getNoDictAndComplexDimension(row);
-      for (int i = 0; i < model.getVarcharDimIdxInNoDict().size(); i++) {
-        if (DataTypeUtil
-            .isPrimitiveColumn(model.getNoDictAndComplexColumns()[i].getDataType())) {
-          // get the size from the data type
-          varcharColumnSizeInByte[i] +=
-              model.getNoDictAndComplexColumns()[i].getDataType().getSizeInBytes();
-        } else {
-          varcharColumnSizeInByte[i] +=
-              ((byte[]) nonDictArray[model.getVarcharDimIdxInNoDict().get(i)]).length;
-        }
-        if (SnappyCompressor.MAX_BYTE_TO_COMPRESS -
-                (varcharColumnSizeInByte[i] + dataRows.size() * 4) < (2 << 20)) {
-          LOGGER.debug("Limited by varchar column, page size is " + dataRows.size());
-          // re-init for next page
-          varcharColumnSizeInByte = new int[model.getVarcharDimIdxInNoDict().size()];
-          return true;
+      for (int i = 0; i < noDictDataTypesList.size(); i++) {
+        DataType columnType = noDictDataTypesList.get(i);
+        if ((columnType == DataTypes.STRING) || (columnType == DataTypes.VARCHAR)) {
+          currentElementLength = ((byte[]) nonDictArray[i]).length;
+          noDictColumnPageSize[bucketCounter] += currentElementLength;
+          canSnappyHandleThisRow(noDictColumnPageSize[bucketCounter]);
+          // If current page size is more than configured page size, cut the page here.
+          if (noDictColumnPageSize[bucketCounter] + dataRows.size() * 4
+              >= configuredPageSizeInBytes) {
+            if (LOGGER.isDebugEnabled()) {
+              LOGGER.debug("cutting the page. Rows count in this page: " + dataRows.size());
+            }
+            // re-init for next page
+            noDictColumnPageSize = new int[totalNoDictPageCount];
+            return true;
+          }
+          bucketCounter++;
+        } else if (columnType.isComplexType()) {
+          // this is for depth of each complex column, model is having only total depth.
+          GenericDataType genericDataType = complexIndexMapCopy
+              .get(i - model.getNoDictionaryCount() + model.getPrimitiveDimLens().length);
+          int depth = genericDataType.getDepth();
+          List<ArrayList<byte[]>> flatComplexColumnList = (List<ArrayList<byte[]>>) nonDictArray[i];
+          for (int k = 0; k < depth; k++) {
+            ArrayList<byte[]> children = flatComplexColumnList.get(k);
+            // Add child element from inner list.
+            int complexElementSize = 0;
+            for (byte[] child : children) {
+              complexElementSize += child.length;
+            }
+            noDictColumnPageSize[bucketCounter] += complexElementSize;
+            canSnappyHandleThisRow(noDictColumnPageSize[bucketCounter]);
+            // If current page size is more than configured page size, cut the page here.
+            if (noDictColumnPageSize[bucketCounter] + dataRows.size() * 4
+                >= configuredPageSizeInBytes) {
+              LOGGER.info("cutting the page. Rows count: " + dataRows.size());
+              // re-init for next page
+              noDictColumnPageSize = new int[totalNoDictPageCount];
+              return true;
+            }
+            bucketCounter++;
+          }
         }
       }
     }
     return false;
   }
 
+  private int setFlatCarbonRowForComplex(CarbonRow row) {
+    int noDictTotalComplexChildDepth = 0;
+    Object[] noDictAndComplexDimension = WriteStepRowUtil.getNoDictAndComplexDimension(row);
+    for (int i = 0; i < noDictAndComplexDimension.length; i++) {
+      // complex types starts after no dictionary dimensions
+      if (i >= model.getNoDictionaryCount() && (model.getTableSpec().getNoDictionaryDimensionSpec()
+          .get(i).getSchemaDataType().isComplexType())) {
+        // this is for depth of each complex column, model is having only total depth.
+        GenericDataType genericDataType = complexIndexMapCopy
+            .get(i - model.getNoDictionaryCount() + model.getPrimitiveDimLens().length);
+        int depth = genericDataType.getDepth();
+        // initialize flatComplexColumnList
+        List<ArrayList<byte[]>> flatComplexColumnList = new ArrayList<>(depth);
+        for (int k = 0; k < depth; k++) {
+          flatComplexColumnList.add(new ArrayList<byte[]>());
+        }
+        // flatten the complex byteArray as per depth
+        try {
+          ByteBuffer byteArrayInput = ByteBuffer.wrap((byte[])noDictAndComplexDimension[i]);
+          ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
+          DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
+          genericDataType.parseComplexValue(byteArrayInput, dataOutputStream,
+              model.getComplexDimensionKeyGenerator());
+          genericDataType.getColumnarDataForComplexType(flatComplexColumnList,
+              ByteBuffer.wrap(byteArrayOutput.toByteArray()));
+          byteArrayOutput.close();
+        } catch (IOException | KeyGenException e) {
+          throw new CarbonDataWriterException("Problem in splitting and writing complex data", e);
+        }
+        noDictTotalComplexChildDepth += flatComplexColumnList.size();
+        // update the complex column data with the flat data
+        noDictAndComplexDimension[i] = flatComplexColumnList;
+      }
+    }
+    return noDictTotalComplexChildDepth;
+  }
+
+  private void canSnappyHandleThisRow(int currentRowSize) {
+    if (currentRowSize > SnappyCompressor.MAX_BYTE_TO_COMPRESS) {
+      throw new RuntimeException(" page size: " + currentRowSize + " exceed snappy size: "
+          + SnappyCompressor.MAX_BYTE_TO_COMPRESS + " Bytes. Snappy cannot compress it ");
+    }
+  }
+
   /**
    * generate the EncodedTablePage from the input rows (one page in case of V3 format)
    */
@@ -419,11 +530,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       LOGGER.debug("Number of rows per column page is configured as pageSize = " + pageSize);
     }
     dataRows = new ArrayList<>(this.pageSize);
-
-    if (model.getVarcharDimIdxInNoDict().size() > 0) {
-      varcharColumnSizeInByte = new int[model.getVarcharDimIdxInNoDict().size()];
-    }
-
     int dimSet =
         Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
     // if at least one dimension is present then initialize column splitter otherwise null
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 9d8202e..e66e233 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGener
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -99,7 +98,7 @@ public class CarbonFactDataHandlerModel {
   private int[] dimLens;
 
   /**
-   * total number of no dictionary dimension in the table
+   * total number of no dictionary dimension in the table (without complex type)
    */
   private int noDictionaryCount;
   /**
@@ -183,10 +182,14 @@ public class CarbonFactDataHandlerModel {
 
   private int numberOfCores;
 
-  private List<Integer> varcharDimIdxInNoDict;
-
   private String columnCompressor;
 
+  private List<DataType> noDictDataTypesList;
+
+  // For each complex columns, we will have multiple children. so, this will have count of all child
+  // this will help in knowing complex byte array will be divided into how may new pages.
+  private int noDictAllComplexColumnDepth;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    */
@@ -230,25 +233,32 @@ public class CarbonFactDataHandlerModel {
     for (int i = 0; i < simpleDimsCount; i++) {
       simpleDimsLen[i] = dimLens[i];
     }
-
-    int noDictionayDimensionIndex = 0;
-    // for dynamic page size in write step if varchar columns exist
-    List<Integer> varcharDimIdxInNoDict = new ArrayList<>();
-    for (DataField dataField : configuration.getDataFields()) {
-      CarbonColumn column = dataField.getColumn();
-      if (!dataField.hasDictionaryEncoding()) {
-        if (!column.isComplex() && column.getDataType() == DataTypes.VARCHAR) {
-          varcharDimIdxInNoDict.add(noDictionayDimensionIndex);
+    //To Set MDKey Index of each primitive type in complex type
+    int surrIndex = simpleDimsCount;
+    Iterator<Map.Entry<String, GenericDataType>> complexMap = CarbonDataProcessorUtil
+        .getComplexTypesMap(configuration.getDataFields(),
+            configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+                .toString()).entrySet().iterator();
+    Map<Integer, GenericDataType> complexIndexMap = new HashMap<>(complexDimensionCount);
+    while (complexMap.hasNext()) {
+      Map.Entry<String, GenericDataType> complexDataType = complexMap.next();
+      complexDataType.getValue().setOutputArrayIndex(0);
+      complexIndexMap.put(simpleDimsCount, complexDataType.getValue());
+      simpleDimsCount++;
+      List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>();
+      complexDataType.getValue().getAllPrimitiveChildren(primitiveTypes);
+      for (GenericDataType eachPrimitive : primitiveTypes) {
+        if (eachPrimitive.getIsColumnDictionary()) {
+          eachPrimitive.setSurrogateIndex(surrIndex++);
         }
-        noDictionayDimensionIndex++;
       }
     }
-
-    //To Set MDKey Index of each primitive type in complex type
-    Map<Integer, GenericDataType> complexIndexMap = getComplexMap(
-        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
-            .toString(), simpleDimsCount, configuration.getDataFields());
-
+    List<DataType> noDictDataTypesList = new ArrayList<>();
+    for (DataField dataField : configuration.getDataFields()) {
+      if (!dataField.hasDictionaryEncoding() && dataField.getColumn().isDimension()) {
+        noDictDataTypesList.add(dataField.getColumn().getDataType());
+      }
+    }
     CarbonDataFileAttributes carbonDataFileAttributes =
         new CarbonDataFileAttributes(Long.parseLong(configuration.getTaskNo()),
             (Long) configuration.getDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP));
@@ -264,6 +274,7 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setNoDictionaryCount(noDictionaryCount);
     carbonFactDataHandlerModel.setDimensionCount(
         configuration.getDimensionCount() - noDictionaryCount);
+    carbonFactDataHandlerModel.setNoDictDataTypesList(noDictDataTypesList);
     carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
     carbonFactDataHandlerModel.setSegmentProperties(segmentProperties);
     carbonFactDataHandlerModel.setColCardinality(colCardinality);
@@ -301,7 +312,6 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
     carbonFactDataHandlerModel.initNumberOfCores();
-    carbonFactDataHandlerModel.setVarcharDimIdxInNoDict(varcharDimIdxInNoDict);
     return carbonFactDataHandlerModel;
   }
 
@@ -316,31 +326,19 @@ public class CarbonFactDataHandlerModel {
       String[] tempStoreLocation, String carbonDataDirectoryPath) {
 
     // for dynamic page size in write step if varchar columns exist
-    List<Integer> varcharDimIdxInNoDict = new ArrayList<>();
-    List<CarbonDimension> allDimensions = carbonTable.getAllDimensions();
-    int dictDimCount = allDimensions.size() - segmentProperties.getNumberOfNoDictionaryDimension()
-            - segmentProperties.getComplexDimensions().size();
+    List<CarbonDimension> allDimensions = carbonTable.getDimensions();
     CarbonColumn[] noDicAndComplexColumns =
         new CarbonColumn[segmentProperties.getNumberOfNoDictionaryDimension() + segmentProperties
             .getComplexDimensions().size()];
     int noDicAndComp = 0;
-    int invisibleCount = 0;
+    List<DataType> noDictDataTypesList = new ArrayList<>();
     for (CarbonDimension dim : allDimensions) {
-      if (dim.isInvisible()) {
-        invisibleCount++;
-        continue;
-      }
-      if (!dim.isComplex() && !dim.hasEncoding(Encoding.DICTIONARY) &&
-          dim.getDataType() == DataTypes.VARCHAR) {
-        // ordinal is set in CarbonTable.fillDimensionsAndMeasuresForTables()
-        varcharDimIdxInNoDict.add(dim.getOrdinal() - dictDimCount - invisibleCount);
-      }
       if (!dim.hasEncoding(Encoding.DICTIONARY)) {
         noDicAndComplexColumns[noDicAndComp++] =
             new CarbonColumn(dim.getColumnSchema(), dim.getOrdinal(), dim.getSchemaOrdinal());
+        noDictDataTypesList.add(dim.getDataType());
       }
     }
-
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
     carbonFactDataHandlerModel.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     carbonFactDataHandlerModel.setDatabaseName(loadModel.getDatabaseName());
@@ -385,6 +383,7 @@ public class CarbonFactDataHandlerModel {
     }
     carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
     carbonFactDataHandlerModel.setNoDictAndComplexColumns(noDicAndComplexColumns);
+    carbonFactDataHandlerModel.setNoDictDataTypesList(noDictDataTypesList);
     CarbonUtil.checkAndCreateFolderWithPermission(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
@@ -410,7 +409,6 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.initNumberOfCores();
     carbonFactDataHandlerModel
         .setColumnLocalDictGenMap(CarbonUtil.getLocalDictionaryModel(carbonTable));
-    carbonFactDataHandlerModel.setVarcharDimIdxInNoDict(varcharDimIdxInNoDict);
     carbonFactDataHandlerModel.sortScope = carbonTable.getSortScope();
     return carbonFactDataHandlerModel;
   }
@@ -419,11 +417,11 @@ public class CarbonFactDataHandlerModel {
    * This routine takes the Complex Dimension and convert into generic DataType.
    *
    * @param segmentProperties
-   * @param isNullFormat
+   * @param nullFormat
    * @return
    */
   private static Map<Integer, GenericDataType> convertComplexDimensionToComplexIndexMap(
-      SegmentProperties segmentProperties, String isNullFormat) {
+      SegmentProperties segmentProperties, String nullFormat) {
     List<CarbonDimension> complexDimensions = segmentProperties.getComplexDimensions();
     int simpleDimsCount = segmentProperties.getDimensions().size() - segmentProperties
         .getNumberOfNoDictionaryDimension();
@@ -432,14 +430,14 @@ public class CarbonFactDataHandlerModel {
     for (CarbonColumn complexDimension : complexDimensions) {
       dataFields[i++] = new DataField(complexDimension);
     }
-    return getComplexMap(isNullFormat, simpleDimsCount, dataFields);
+    return getComplexMap(nullFormat, simpleDimsCount, dataFields);
   }
 
-  private static Map<Integer, GenericDataType> getComplexMap(String isNullFormat,
+  private static Map<Integer, GenericDataType> getComplexMap(String nullFormat,
       int simpleDimsCount, DataField[] dataFields) {
     int surrIndex = 0;
     Iterator<Map.Entry<String, GenericDataType>> complexMap =
-        CarbonDataProcessorUtil.getComplexTypesMap(dataFields, isNullFormat).entrySet().iterator();
+        CarbonDataProcessorUtil.getComplexTypesMap(dataFields, nullFormat).entrySet().iterator();
     Map<Integer, GenericDataType> complexIndexMap = new HashMap<>(dataFields.length);
     while (complexMap.hasNext()) {
       Map.Entry<String, GenericDataType> complexDataType = complexMap.next();
@@ -756,14 +754,6 @@ public class CarbonFactDataHandlerModel {
     return numberOfCores;
   }
 
-  public void setVarcharDimIdxInNoDict(List<Integer> varcharDimIdxInNoDict) {
-    this.varcharDimIdxInNoDict = varcharDimIdxInNoDict;
-  }
-
-  public List<Integer> getVarcharDimIdxInNoDict() {
-    return varcharDimIdxInNoDict;
-  }
-
   public String getColumnCompressor() {
     return columnCompressor;
   }
@@ -779,5 +769,21 @@ public class CarbonFactDataHandlerModel {
   public void setNoDictAndComplexColumns(CarbonColumn[] noDictAndComplexColumns) {
     this.noDictAndComplexColumns = noDictAndComplexColumns;
   }
+
+  public List<DataType> getNoDictDataTypesList() {
+    return this.noDictDataTypesList;
+  }
+
+  public void setNoDictDataTypesList(List<DataType> noDictDataTypesList) {
+    this.noDictDataTypesList = noDictDataTypesList;
+  }
+
+  public int getNoDictAllComplexColumnDepth() {
+    return noDictAllComplexColumnDepth;
+  }
+
+  public void setNoDictAllComplexColumnDepth(int noDictAllComplexColumnDepth) {
+    this.noDictAllComplexColumnDepth = noDictAllComplexColumnDepth;
+  }
 }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 7cc8932..a201679 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.processing.store;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -30,7 +28,6 @@ import java.util.Map;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.EncodedTablePage;
@@ -252,11 +249,11 @@ public class TablePage {
           }
         } else {
           // complex columns
-          addComplexColumn(i - noDictionaryCount, rowId, (byte[]) noDictAndComplex[i]);
+          addComplexColumn(i - noDictionaryCount, rowId,
+              (List<ArrayList<byte[]>>) noDictAndComplex[i]);
         }
       }
     }
-
     // 3. convert measure columns
     Object[] measureColumns = WriteStepRowUtil.getMeasure(row);
     for (int i = 0; i < measurePages.length; i++) {
@@ -278,14 +275,14 @@ public class TablePage {
    *
    * @param index          index of the complexDimensionPage
    * @param rowId          Id of the input row
-   * @param complexColumns byte array the complex columm to be added, extracted of input row
+   * @param encodedComplexColumnar flatten data of complex column
    */
   // TODO: this function should be refactoried, ColumnPage should support complex type encoding
   // directly instead of doing it here
-  private void addComplexColumn(int index, int rowId, byte[] complexColumns) {
+  private void addComplexColumn(int index, int rowId,
+      List<ArrayList<byte[]>> encodedComplexColumnar) {
     GenericDataType complexDataType = complexIndexMap.get(
         index + model.getPrimitiveDimLens().length);
-
     // initialize the page if first row
     if (rowId == 0) {
       List<ComplexColumnInfo> complexColumnInfoList = new ArrayList<>();
@@ -298,30 +295,7 @@ public class TablePage {
         throw new RuntimeException(e);
       }
     }
-
     int depthInComplexColumn = complexDimensionPages[index].getComplexColumnIndex();
-    // this is the result columnar data which will be added to page,
-    // size of this list is the depth of complex column, we will fill it by input data
-    List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>(depthInComplexColumn);
-    for (int k = 0; k < depthInComplexColumn; k++) {
-      encodedComplexColumnar.add(new ArrayList<byte[]>());
-    }
-
-    // apply the complex type data and fill columnsArray
-    try {
-      ByteBuffer byteArrayInput = ByteBuffer.wrap(complexColumns);
-      ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
-      DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
-      complexDataType.parseComplexValue(byteArrayInput, dataOutputStream,
-          model.getComplexDimensionKeyGenerator());
-      complexDataType.getColumnarDataForComplexType(encodedComplexColumnar,
-          ByteBuffer.wrap(byteArrayOutput.toByteArray()));
-      byteArrayOutput.close();
-    } catch (IOException | KeyGenException e) {
-      throw new CarbonDataWriterException("Problem while bit packing and writing complex datatype",
-          e);
-    }
-
     for (int depth = 0; depth < depthInComplexColumn; depth++) {
       complexDimensionPages[index].putComplexData(depth, encodedComplexColumnar.get(depth));
     }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index dc2268c..cac0e8b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -331,6 +331,13 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
         new BlockletInfo3(encodedBlocklet.getBlockletSize(), currentDataChunksOffset,
             currentDataChunksLength, dimensionOffset, measureOffset,
             encodedBlocklet.getNumberOfPages());
+    // Avoid storing as integer in encodedBocklet,
+    // but in thrift store as int for large number of rows future support
+    List<Integer> rowList = new ArrayList<>(encodedBlocklet.getRowCountInPage().size());
+    for (int rows : encodedBlocklet.getRowCountInPage()) {
+      rowList.add(rows);
+    }
+    blockletInfo3.setRow_count_in_page(rowList);
     blockletMetadata.add(blockletInfo3);
   }
 
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index cdb610d..cfae2ae 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -62,6 +62,7 @@ public class CarbonWriterBuilder {
   //initialize with empty array , as no columns should be selected for sorting in NO_SORT
   private String[] sortColumns = new String[0];
   private int blockletSize;
+  private int pageSizeInMb;
   private int blockSize;
   private long timestamp;
   private Map<String, String> options;
@@ -269,6 +270,7 @@ public class CarbonWriterBuilder {
    *                           default value is null.
    * l. inverted_index -- comma separated string columns for which inverted index needs to be
    *                      generated
+   * m. table_page_size_inmb -- [1-1755] MB.
    *
    * @return updated CarbonWriterBuilder
    */
@@ -277,7 +279,7 @@ public class CarbonWriterBuilder {
     Set<String> supportedOptions = new HashSet<>(Arrays
         .asList("table_blocksize", "table_blocklet_size", "local_dictionary_threshold",
             "local_dictionary_enable", "sort_columns", "sort_scope", "long_string_columns",
-            "inverted_index"));
+            "inverted_index","table_page_size_inmb"));
 
     for (String key : options.keySet()) {
       if (!supportedOptions.contains(key.toLowerCase())) {
@@ -317,6 +319,8 @@ public class CarbonWriterBuilder {
           invertedIndexColumns = entry.getValue().split(",");
         }
         this.invertedIndexFor(invertedIndexColumns);
+      } else if (entry.getKey().equalsIgnoreCase("table_page_size_inmb")) {
+        this.withPageSizeInMb(Integer.parseInt(entry.getValue()));
       }
     }
     return this;
@@ -446,6 +450,21 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * To set the blocklet size of CarbonData file
+   *
+   * @param pageSizeInMb is page size in MB
+   *
+   * @return updated CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder withPageSizeInMb(int pageSizeInMb) {
+    if (pageSizeInMb < 1 || pageSizeInMb > 1755) {
+      throw new IllegalArgumentException("pageSizeInMb must be 1 MB - 1755 MB");
+    }
+    this.pageSizeInMb = pageSizeInMb;
+    return this;
+  }
+
+  /**
    * to build a {@link CarbonWriter}, which accepts row in CSV format
    *
    * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
@@ -626,6 +645,11 @@ public class CarbonWriterBuilder {
     if (blockletSize > 0) {
       tableSchemaBuilder = tableSchemaBuilder.blockletSize(blockletSize);
     }
+
+    if (pageSizeInMb > 0) {
+      tableSchemaBuilder = tableSchemaBuilder.pageSizeInMb(pageSizeInMb);
+    }
+
     tableSchemaBuilder.enableLocalDictionary(isLocalDictionaryEnabled);
     tableSchemaBuilder.localDictionaryThreshold(localDictionaryThreshold);
     List<String> sortColumnsList = new ArrayList<>();