You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/28 14:32:19 UTC

[1/3] incubator-carbondata git commit: Added partitioner

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 65b922126 -> dc7c86ef3


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
new file mode 100644
index 0000000..c480d30
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.carbondata.bucketing
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.execution.command.LoadTable
+import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+
+    // clean data folder
+    clean
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+    spark.sql("DROP TABLE IF EXISTS t3")
+    spark.sql("DROP TABLE IF EXISTS t4")
+    spark.sql("DROP TABLE IF EXISTS t5")
+    spark.sql("DROP TABLE IF EXISTS t6")
+    spark.sql("DROP TABLE IF EXISTS t7")
+    spark.sql("DROP TABLE IF EXISTS t8")
+  }
+
+  test("test create table with buckets") {
+    spark.sql(
+      """
+           CREATE TABLE t4
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           USING org.apache.spark.sql.CarbonSource
+           OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t4")
+      """)
+    LoadTable(Some("default"), "t4", "./src/test/resources/dataDiff.csv", Nil,
+      Map(("use_kettle", "false"))).run(spark)
+    val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t4")
+    if (table != null && table.getBucketingInfo("t4") != null) {
+      assert(true)
+    } else {
+      assert(false, "Bucketing info does not exist")
+    }
+  }
+
+  test("test create table with no bucket join of carbon tables") {
+    spark.sql(
+      """
+           CREATE TABLE t5
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           USING org.apache.spark.sql.CarbonSource
+           OPTIONS("tableName"="t5")
+      """)
+    LoadTable(Some("default"), "t5", "./src/test/resources/dataDiff.csv", Nil,
+      Map(("use_kettle", "false"))).run(spark)
+
+    val plan = spark.sql(
+      """
+        |select t1.*, t2.*
+        |from t5 t1, t5 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: ShuffleExchange => shuffleExists = true
+    }
+    assert(shuffleExists, "shuffle should exist on non bucket tables")
+  }
+
+  test("test create table with bucket join of carbon tables") {
+    spark.sql(
+      """
+           CREATE TABLE t6
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           USING org.apache.spark.sql.CarbonSource
+           OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t6")
+      """)
+    LoadTable(Some("default"), "t6", "./src/test/resources/dataDiff.csv", Nil,
+      Map(("use_kettle", "false"))).run(spark)
+
+    val plan = spark.sql(
+      """
+        |select t1.*, t2.*
+        |from t6 t1, t6 t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: ShuffleExchange => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
+  }
+
+  test("test create table with bucket join of carbon table and parquet table") {
+    spark.sql(
+      """
+           CREATE TABLE t7
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           USING org.apache.spark.sql.CarbonSource
+           OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t7")
+      """)
+    LoadTable(Some("default"), "t7", "./src/test/resources/dataDiff.csv", Nil,
+      Map(("use_kettle", "false"))).run(spark)
+
+    spark.sql("DROP TABLE IF EXISTS bucketed_parquet_table")
+    spark.sql("select * from t7").write
+      .format("parquet")
+      .bucketBy(4, "name")
+      .saveAsTable("bucketed_parquet_table")
+
+    val plan = spark.sql(
+      """
+        |select t1.*, t2.*
+        |from t7 t1, bucketed_parquet_table t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: ShuffleExchange => shuffleExists = true
+    }
+    assert(!shuffleExists, "shuffle should not exist on bucket tables")
+  }
+
+  test("test create table with bucket join of carbon table and non bucket parquet table") {
+    spark.sql(
+      """
+           CREATE TABLE t8
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           USING org.apache.spark.sql.CarbonSource
+           OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t8")
+      """)
+    LoadTable(Some("default"), "t8", "./src/test/resources/dataDiff.csv", Nil,
+      Map(("use_kettle", "false"))).run(spark)
+
+    spark.sql("DROP TABLE IF EXISTS parquet_table")
+    spark.sql("select * from t8").write
+      .format("parquet")
+      .saveAsTable("parquet_table")
+
+    val plan = spark.sql(
+      """
+        |select t1.*, t2.*
+        |from t8 t1, parquet_table t2
+        |where t1.name = t2.name
+      """.stripMargin).queryExecution.executedPlan
+    var shuffleExists = false
+    plan.collect {
+      case s: ShuffleExchange => shuffleExists = true
+    }
+    assert(shuffleExists, "shuffle should exist on non bucket tables")
+  }
+
+  override def afterAll {
+    spark.sql("DROP TABLE IF EXISTS t3")
+    spark.sql("DROP TABLE IF EXISTS t4")
+    spark.sql("DROP TABLE IF EXISTS t5")
+    spark.sql("DROP TABLE IF EXISTS t6")
+    spark.sql("DROP TABLE IF EXISTS t7")
+    spark.sql("DROP TABLE IF EXISTS t8")
+    // clean data folder
+    clean
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
index 20013ce..26300d6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
 
 public class CarbonDataLoadConfiguration {
 
@@ -36,6 +37,8 @@ public class CarbonDataLoadConfiguration {
 
   private String taskNo;
 
+  private BucketingInfo bucketingInfo;
+
   private Map<String, Object> dataLoadProperties = new HashMap<>();
 
   public int getDimensionCount() {
@@ -141,4 +144,12 @@ public class CarbonDataLoadConfiguration {
   public Object getDataLoadProperty(String key) {
     return dataLoadProperties.get(key);
   }
+
+  public BucketingInfo getBucketingInfo() {
+    return bucketingInfo;
+  }
+
+  public void setBucketingInfo(BucketingInfo bucketingInfo) {
+    this.bucketingInfo = bucketingInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index a5388d9..63147c9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
+import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl;
 import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
@@ -54,6 +55,15 @@ public final class DataLoadProcessBuilder {
       CarbonIterator[] inputIterators) throws Exception {
     CarbonDataLoadConfiguration configuration =
         createConfiguration(loadModel, storeLocation);
+    if (configuration.getBucketingInfo() != null) {
+      return buildInternalForBucketing(inputIterators, configuration);
+    } else {
+      return buildInternal(inputIterators, configuration);
+    }
+  }
+
+  private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterators,
+      CarbonDataLoadConfiguration configuration) {
     // 1. Reads the data input iterators and parses the data.
     AbstractDataLoadProcessorStep inputProcessorStep =
         new InputProcessorStepImpl(configuration, inputIterators);
@@ -70,6 +80,24 @@ public final class DataLoadProcessBuilder {
     return writerProcessorStep;
   }
 
+  private AbstractDataLoadProcessorStep buildInternalForBucketing(CarbonIterator[] inputIterators,
+      CarbonDataLoadConfiguration configuration) throws Exception {
+    // 1. Reads the data input iterators and parses the data.
+    AbstractDataLoadProcessorStep inputProcessorStep =
+        new InputProcessorStepImpl(configuration, inputIterators);
+    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+    // data types and configurations.
+    AbstractDataLoadProcessorStep converterProcessorStep =
+        new DataConverterProcessorWithBucketingStepImpl(configuration, inputProcessorStep);
+    // 3. Sorts the data which are part of key (all dimensions except complex types)
+    AbstractDataLoadProcessorStep sortProcessorStep =
+        new SortProcessorStepImpl(configuration, converterProcessorStep);
+    // 4. Writes the sorted data in carbondata format.
+    AbstractDataLoadProcessorStep writerProcessorStep =
+        new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
+    return writerProcessorStep;
+  }
+
   private CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel,
       String storeLocation) throws Exception {
     if (!new File(storeLocation).mkdirs()) {
@@ -165,6 +193,7 @@ public final class DataLoadProcessBuilder {
       }
     }
     configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
+    configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
     return configuration;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
index daf37fb..0689310 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
@@ -28,6 +28,8 @@ public class CarbonRow {
 
   private Object[] data;
 
+  public short bucketNumber;
+
   public CarbonRow(Object[] data) {
     this.data = data;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
new file mode 100644
index 0000000..3ca1497
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort.impl;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It parallely reads data from array of iterates and do merge sort.
+ * First it sorts the data and write to temp files. These temp files will be merge sorted to get
+ * final merge sort result.
+ * This step is specifically for bucketing, it sorts each bucket data separately and write to
+ * temp files.
+ */
+public class ParallelReadMergeSorterWithBucketingImpl implements Sorter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
+
+  private SortParameters sortParameters;
+
+  private SortIntermediateFileMerger intermediateFileMerger;
+
+  private ExecutorService executorService;
+
+  private BucketingInfo bucketingInfo;
+
+  private DataField[] inputDataFields;
+
+  private int sortBufferSize;
+
+  public ParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields,
+      BucketingInfo bucketingInfo) {
+    this.inputDataFields = inputDataFields;
+    this.bucketingInfo = bucketingInfo;
+  }
+
+  @Override public void initialize(SortParameters sortParameters) {
+    this.sortParameters = sortParameters;
+    intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
+    int buffer = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
+    sortBufferSize = buffer/bucketingInfo.getNumberOfBuckets();
+    if (sortBufferSize < 100) {
+      sortBufferSize = 100;
+    }
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
+      throws CarbonDataLoadingException {
+    SortDataRows[] sortDataRows = new SortDataRows[bucketingInfo.getNumberOfBuckets()];
+    try {
+      for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+        SortParameters parameters = sortParameters.getCopy();
+        parameters.setPartitionID(i + "");
+        setTempLocation(parameters);
+        parameters.setBufferSize(sortBufferSize);
+        sortDataRows[i] = new SortDataRows(parameters, intermediateFileMerger);
+        sortDataRows[i].initialize();
+      }
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    this.executorService = Executors.newFixedThreadPool(iterators.length);
+    final int batchSize = CarbonProperties.getInstance().getBatchSize();
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.submit(new SortIteratorThread(iterators[i], sortDataRows));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+      processRowToNextStep(sortDataRows, sortParameters);
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+    try {
+      intermediateFileMerger.finish();
+    } catch (CarbonDataWriterException e) {
+      throw new CarbonDataLoadingException(e);
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+
+    Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
+    for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
+      batchIterator[i] = new MergedDataIterator(String.valueOf(i), batchSize);
+    }
+
+    return batchIterator;
+  }
+
+  private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
+    String storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+            String.valueOf(sortParameters.getTaskNo()), bucketId,
+            sortParameters.getSegmentId() + "", false);
+    // Set the data file location
+    String dataFolderLocation =
+        storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
+    SingleThreadFinalSortFilesMerger finalMerger =
+        new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
+            sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
+            sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
+            sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn(),
+            sortParameters.isUseKettle());
+    return finalMerger;
+  }
+
+  @Override public void close() {
+    intermediateFileMerger.close();
+  }
+
+  /**
+   * Below method will be used to process data to next step
+   */
+  private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters)
+      throws CarbonDataLoadingException {
+    if (null == sortDataRows || sortDataRows.length == 0) {
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      LOGGER.info("Number of Records was Zero");
+      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
+      LOGGER.info(logMessage);
+      return false;
+    }
+
+    try {
+      for (int i = 0; i < sortDataRows.length; i++) {
+        // start sorting
+        sortDataRows[i].startSorting();
+      }
+      // check any more rows are present
+      LOGGER.info("Record Processed For table: " + parameters.getTableName());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
+      return false;
+    } catch (CarbonSortKeyAndGroupByException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  private void setTempLocation(SortParameters parameters) {
+    String carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(parameters.getDatabaseName(),
+            parameters.getTableName(), parameters.getTaskNo(),
+            parameters.getPartitionID(), parameters.getSegmentId(), false);
+    parameters.setTempFileLocation(
+        carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
+  }
+
+  /**
+   * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
+   */
+  private static class SortIteratorThread implements Callable<Void> {
+
+    private Iterator<CarbonRowBatch> iterator;
+
+    private SortDataRows[] sortDataRows;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows) {
+      this.iterator = iterator;
+      this.sortDataRows = sortDataRows;
+    }
+
+    @Override public Void call() throws CarbonDataLoadingException {
+      try {
+        while (iterator.hasNext()) {
+          CarbonRowBatch batch = iterator.next();
+          Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
+          int i = 0;
+          while (batchIterator.hasNext()) {
+            CarbonRow row = batchIterator.next();
+            if (row != null) {
+              SortDataRows sortDataRow = sortDataRows[row.bucketNumber];
+              synchronized (sortDataRow) {
+                sortDataRow.addRow(row.getData());
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error(e);
+        throw new CarbonDataLoadingException(e);
+      }
+      return null;
+    }
+
+  }
+
+  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
+
+    private String partitionId;
+
+    private int batchSize;
+
+    private boolean firstRow = true;
+
+    public MergedDataIterator(String partitionId, int batchSize) {
+      this.partitionId = partitionId;
+      this.batchSize = batchSize;
+    }
+
+    private SingleThreadFinalSortFilesMerger finalMerger;
+
+    @Override public boolean hasNext() {
+      if (firstRow) {
+        firstRow = false;
+        finalMerger = getFinalMerger(partitionId);
+        finalMerger.startFinalMerge();
+      }
+      return finalMerger.hasNext();
+    }
+
+    @Override public CarbonRowBatch next() {
+      int counter = 0;
+      CarbonRowBatch rowBatch = new CarbonRowBatch();
+      while (finalMerger.hasNext() && counter < batchSize) {
+        rowBatch.addRow(new CarbonRow(finalMerger.next()));
+        counter++;
+      }
+      return rowBatch;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
new file mode 100644
index 0000000..16b203a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.steps;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.partition.Partitioner;
+import org.apache.carbondata.core.partition.impl.HashPartitionerImpl;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.constants.LoggerAction;
+import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.converter.RowConverter;
+import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
+
+/**
+ * Replace row data fields with dictionary values if column is configured dictionary encoded.
+ * And nondictionary columns as well as complex columns will be converted to byte[].
+ */
+public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoadProcessorStep {
+
+  private RowConverter converter;
+
+  private Partitioner<Object[]> partitioner;
+
+  public DataConverterProcessorWithBucketingStepImpl(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override
+  public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override
+  public void initialize() throws CarbonDataLoadingException {
+    child.initialize();
+    BadRecordsLogger badRecordLogger = createBadRecordLogger();
+    converter = new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+    converter.initialize();
+    List<Integer> indexes = new ArrayList<>();
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
+    DataField[] inputDataFields = getOutput();
+    BucketingInfo bucketingInfo = configuration.getBucketingInfo();
+    for (int i = 0; i < inputDataFields.length; i++) {
+      for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) {
+        if (inputDataFields[i].getColumn().getColName()
+            .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) {
+          indexes.add(i);
+          columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema());
+          break;
+        }
+      }
+    }
+    partitioner =
+        new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumberOfBuckets());
+  }
+
+  /**
+   * Create the iterator using child iterator.
+   *
+   * @param childIter
+   * @return new iterator with step specific processing.
+   */
+  @Override
+  protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
+    return new CarbonIterator<CarbonRowBatch>() {
+      RowConverter localConverter = converter.createCopyForNewThread();
+      @Override public boolean hasNext() {
+        return childIter.hasNext();
+      }
+
+      @Override public CarbonRowBatch next() {
+        return processRowBatch(childIter.next(), localConverter);
+      }
+    };
+  }
+
+  /**
+   * Process the batch of rows as per the step logic.
+   *
+   * @param rowBatch
+   * @return processed row.
+   */
+  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
+    CarbonRowBatch newBatch = new CarbonRowBatch();
+    Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator();
+    while (batchIterator.hasNext()) {
+      CarbonRow next = batchIterator.next();
+      CarbonRow convertRow = localConverter.convert(next);
+      convertRow.bucketNumber = (short) partitioner.getPartition(next.getData());
+      newBatch.addRow(convertRow);
+    }
+    return newBatch;
+  }
+
+  @Override
+  protected CarbonRow processRow(CarbonRow row) {
+    throw new UnsupportedOperationException();
+  }
+
+  private BadRecordsLogger createBadRecordLogger() {
+    boolean badRecordsLogRedirect = false;
+    boolean badRecordConvertNullDisable = false;
+    boolean badRecordsLoggerEnable = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+            .toString());
+    Object bad_records_action =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
+            .toString();
+    if (null != bad_records_action) {
+      LoggerAction loggerAction = null;
+      try {
+        loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+      } catch (IllegalArgumentException e) {
+        loggerAction = LoggerAction.FORCE;
+      }
+      switch (loggerAction) {
+        case FORCE:
+          badRecordConvertNullDisable = false;
+          break;
+        case REDIRECT:
+          badRecordsLogRedirect = true;
+          badRecordConvertNullDisable = true;
+          break;
+        case IGNORE:
+          badRecordsLogRedirect = false;
+          badRecordConvertNullDisable = true;
+          break;
+      }
+    }
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    BadRecordsLogger badRecordsLogger = new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
+        identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation(
+        identifier.getDatabaseName() + File.separator + identifier.getTableName() + File.separator
+            + configuration.getTaskNo()), badRecordsLogRedirect, badRecordsLoggerEnable,
+        badRecordConvertNullDisable);
+    return badRecordsLogger;
+  }
+
+  private String getBadLogStoreLocation(String storeLocation) {
+    String badLogStoreLocation =
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+    return badLogStoreLocation;
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    if (converter != null) {
+      converter.finish();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index 48492d3..8318530 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -53,8 +53,6 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   private KeyGenerator keyGenerator;
 
-  private CarbonFactHandler dataHandler;
-
   private int noDictionaryCount;
 
   private int complexDimensionCount;
@@ -71,57 +69,66 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
 
-  private String storeLocation;
-
   public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
   }
 
-  @Override
-  public DataField[] getOutput() {
+  @Override public DataField[] getOutput() {
     return child.getOutput();
   }
 
-  @Override
-  public void initialize() throws CarbonDataLoadingException {
+  @Override public void initialize() throws CarbonDataLoadingException {
     child.initialize();
-    CarbonTableIdentifier tableIdentifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
+  }
 
-    storeLocation = CarbonDataProcessorUtil
+  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
+    String storeLocation = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()),
-            configuration.getPartitionId(), configuration.getSegmentId() + "", false);
-
-    if (!(new File(storeLocation).mkdirs())) {
-      LOGGER.error("Local data load folder location does not exist: " + storeLocation);
-      return;
-    }
+            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
+            configuration.getSegmentId() + "", false);
+    new File(storeLocation).mkdirs();
+    return storeLocation;
   }
 
-  @Override
-  public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
     Iterator<CarbonRowBatch>[] iterators = child.execute();
-    String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName();
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    String tableName = tableIdentifier.getTableName();
     try {
-      CarbonFactDataHandlerModel dataHandlerModel =
-          CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration, storeLocation);
+      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
+          .createCarbonFactDataHandlerModel(configuration,
+              getStoreLocation(tableIdentifier, String.valueOf(0)), 0);
       noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
       complexDimensionCount = configuration.getComplexDimensionCount();
       measureCount = dataHandlerModel.getMeasureCount();
       segmentProperties = dataHandlerModel.getSegmentProperties();
       keyGenerator = segmentProperties.getDimensionKeyGenerator();
-      dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel,
-          CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
-      dataHandler.initialise();
+
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
           .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
               System.currentTimeMillis());
+      int i = 0;
       for (Iterator<CarbonRowBatch> iterator : iterators) {
+        String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
+        CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+            .createCarbonFactDataHandlerModel(configuration, storeLocation, i);
+        CarbonFactHandler dataHandler = null;
+        boolean rowsNotExist = true;
         while (iterator.hasNext()) {
-          processBatch(iterator.next());
+          if (rowsNotExist) {
+            rowsNotExist = false;
+            dataHandler = CarbonFactHandlerFactory
+                .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+            dataHandler.initialise();
+          }
+          processBatch(iterator.next(), dataHandler);
+        }
+        if (!rowsNotExist) {
+          finish(tableName, dataHandler);
         }
+        i++;
       }
 
     } catch (CarbonDataWriterException e) {
@@ -135,9 +142,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     return null;
   }
 
-  @Override
-  public void close() {
-    String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName();
+  @Override public void close() {
+
+  }
+
+  private void finish(String tableName, CarbonFactHandler dataHandler) {
     try {
       dataHandler.finish();
     } catch (Exception e) {
@@ -149,7 +158,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
             + writeCounter;
     LOGGER.info(logMessage);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(writeCounter);
-    processingComplete();
+    processingComplete(dataHandler);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
             System.currentTimeMillis());
@@ -157,7 +166,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
         .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
   }
 
-  private void processingComplete() throws CarbonDataLoadingException {
+  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
     if (null != dataHandler) {
       try {
         dataHandler.closeHandler();
@@ -171,7 +180,8 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     }
   }
 
-  private void processBatch(CarbonRowBatch batch) throws CarbonDataLoadingException {
+  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler)
+      throws CarbonDataLoadingException {
     Iterator<CarbonRow> iterator = batch.getBatchIterator();
     try {
       while (iterator.hasNext()) {
@@ -208,8 +218,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     }
   }
 
-  @Override
-  protected CarbonRow processRow(CarbonRow row) {
+  @Override protected CarbonRow processRow(CarbonRow row) {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
index 99b7894..ef43751 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
 import org.apache.carbondata.processing.newflow.sort.Sorter;
 import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterImpl;
+import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
 import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
 
@@ -60,7 +61,15 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
             CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
     if (offheapsort) {
       sorter = new UnsafeParallelReadMergeSorterImpl(child.getOutput());
-    } else sorter = new ParallelReadMergeSorterImpl(child.getOutput());
+    } else {
+      sorter = new ParallelReadMergeSorterImpl(child.getOutput());
+    }
+    if (configuration.getBucketingInfo() != null) {
+      sorter = new ParallelReadMergeSorterWithBucketingImpl(child.getOutput(),
+          configuration.getBucketingInfo());
+    } else {
+      sorter = new ParallelReadMergeSorterImpl(child.getOutput());
+    }
     sorter.initialize(sortParameters);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 6254cce..c24472b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -121,6 +121,34 @@ public class SortParameters {
    */
   private boolean useKettle = true;
 
+  public SortParameters getCopy() {
+    SortParameters parameters = new SortParameters();
+    parameters.tempFileLocation = tempFileLocation;
+    parameters.sortBufferSize = sortBufferSize;
+    parameters.measureColCount = measureColCount;
+    parameters.dimColCount = dimColCount;
+    parameters.complexDimColCount = complexDimColCount;
+    parameters.fileBufferSize = fileBufferSize;
+    parameters.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged;
+    parameters.fileWriteBufferSize = fileWriteBufferSize;
+    parameters.observer = observer;
+    parameters.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression;
+    parameters.isSortFileCompressionEnabled = isSortFileCompressionEnabled;
+    parameters.prefetch = prefetch;
+    parameters.bufferSize = bufferSize;
+    parameters.databaseName = databaseName;
+    parameters.tableName = tableName;
+    parameters.aggType = aggType;
+    parameters.noDictionaryCount = noDictionaryCount;
+    parameters.partitionID = partitionID;
+    parameters.segmentId = segmentId;
+    parameters.taskNo = taskNo;
+    parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+    parameters.numberOfCores = numberOfCores;
+    parameters.useKettle = useKettle;
+    return parameters;
+  }
+
   public String getTempFileLocation() {
     return tempFileLocation;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 2d468ac..f6a729d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -274,6 +274,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
   private boolean useKettle;
 
+  private int bucketNumber;
+
   /**
    * CarbonFactDataHandler constructor
    */
@@ -293,6 +295,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
     this.aggKeyBlock = new boolean[columnStoreCount];
     this.isNoDictionary = new boolean[columnStoreCount];
+    this.bucketNumber = carbonFactDataHandlerModel.getBucketId();
     this.isUseInvertedIndex = new boolean[columnStoreCount];
     if (null != carbonFactDataHandlerModel.getIsUseInvertedIndex()) {
       for (int i = 0; i < isUseInvertedIndex.length; i++) {
@@ -1495,6 +1498,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     carbonDataWriterVo.setColCardinality(colCardinality);
     carbonDataWriterVo.setSegmentProperties(segmentProperties);
     carbonDataWriterVo.setTableBlocksize(tableBlockSize);
+    carbonDataWriterVo.setBucketNumber(bucketNumber);
     return carbonDataWriterVo;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index c7d9d29..51663fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -186,13 +186,15 @@ public class CarbonFactDataHandlerModel {
    */
   private boolean useKettle = true;
 
+  private int bucketId = 0;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    * @param configuration
    * @return CarbonFactDataHandlerModel
    */
   public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
-      CarbonDataLoadConfiguration configuration, String storeLocation) {
+      CarbonDataLoadConfiguration configuration, String storeLocation, int bucketId) {
 
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
@@ -291,6 +293,7 @@ public class CarbonFactDataHandlerModel {
     } else {
       carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
     }
+    carbonFactDataHandlerModel.bucketId = bucketId;
     return carbonFactDataHandlerModel;
   }
 
@@ -558,5 +561,9 @@ public class CarbonFactDataHandlerModel {
   public void setUseKettle(boolean useKettle) {
     this.useKettle = useKettle;
   }
+
+  public int getBucketId() {
+    return bucketId;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 2c82672..5697cb6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -150,6 +150,10 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    */
   private void startSorting(File[] files) throws CarbonDataWriterException {
     this.fileCounter = files.length;
+    if (fileCounter == 0) {
+      LOGGER.info("No files to merge sort");
+      return;
+    }
     this.fileBufferSize = CarbonDataProcessorUtil
         .getFileBufferSize(this.fileCounter, CarbonProperties.getInstance(),
             CarbonCommonConstants.CONSTANT_SIZE_TEN);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a1e7311..0195392 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -271,6 +271,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
     initFileCount();
     String carbonDataFileName = carbonTablePath
         .getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
+            dataWriterVo.getBucketNumber(),
             dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
     String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
     FileData fileData = new FileData(actualFileNameVal, dataWriterVo.getStoreLocation());
@@ -423,12 +424,13 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    */
   private void writeIndexFile() throws IOException, CarbonDataWriterException {
     // get the header
-    IndexHeader indexHeader =
-        CarbonMetadataUtil.getIndexHeader(localCardinality, thriftColumnSchemaList);
+    IndexHeader indexHeader = CarbonMetadataUtil
+        .getIndexHeader(localCardinality, thriftColumnSchemaList, dataWriterVo.getBucketNumber());
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
     String fileName = dataWriterVo.getStoreLocation() + File.separator + carbonTablePath
         .getCarbonIndexFileName(dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
+            dataWriterVo.getBucketNumber(),
             dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
     CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
     // open file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 6e0287d..e46430b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -66,6 +66,8 @@ public class CarbonDataWriterVo {
 
   private int tableBlocksize;
 
+  private int bucketNumber;
+
   /**
    * @return the storeLocation
    */
@@ -318,4 +320,11 @@ public class CarbonDataWriterVo {
     this.tableBlocksize = tableBlocksize;
   }
 
+  public int getBucketNumber() {
+    return bucketNumber;
+  }
+
+  public void setBucketNumber(int bucketNumber) {
+    this.bucketNumber = bucketNumber;
+  }
 }


[3/3] incubator-carbondata git commit: [CARBONDATA-467] Adding bucketing to carbon table loading This closes #358

Posted by ja...@apache.org.
[CARBONDATA-467] Adding bucketing to carbon table loading  This closes #358


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/dc7c86ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/dc7c86ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/dc7c86ef

Branch: refs/heads/master
Commit: dc7c86ef388af86392667289eec68e46bc257128
Parents: 65b9221 cbf8797
Author: jackylk <ja...@huawei.com>
Authored: Wed Dec 28 22:31:55 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Wed Dec 28 22:31:55 2016 +0800

----------------------------------------------------------------------
 .../carbon/datastore/SegmentTaskIndexStore.java |  89 +++++--
 .../ThriftWrapperSchemaConverterImpl.java       |  30 +++
 .../carbon/metadata/schema/BucketingInfo.java   |  49 ++++
 .../metadata/schema/table/CarbonTable.java      |  14 +
 .../metadata/schema/table/TableSchema.java      |  14 +
 .../core/carbon/path/CarbonTablePath.java       |  37 ++-
 .../carbondata/core/partition/Partitioner.java  |  26 ++
 .../partition/impl/HashPartitionerImpl.java     | 105 ++++++++
 .../core/util/CarbonMetadataUtil.java           |   4 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   5 +-
 .../datastore/SegmentTaskIndexStoreTest.java    |   8 +-
 .../CarbonFormatDirectoryStructureTest.java     |   4 +-
 .../core/util/CarbonMetadataUtilTest.java       |   3 +-
 format/src/main/thrift/carbondata_index.thrift  |   1 +
 format/src/main/thrift/schema.thrift            |   9 +
 .../carbondata/hadoop/CarbonInputFormat.java    |  22 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  18 ++
 .../hadoop/CarbonMultiBlockSplit.java           |  23 +-
 .../internal/index/impl/InMemoryBTreeIndex.java |   9 +-
 .../apache/carbondata/spark/CarbonOption.scala  |   7 +
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   2 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 146 ++++++----
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   7 +-
 .../execution/command/carbonTableSchema.scala   |  29 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  |  11 +-
 .../apache/carbondata/spark/CarbonOption.scala  |   7 +
 .../org/apache/spark/sql/CarbonSource.scala     |  14 +-
 .../org/apache/spark/sql/TableCreator.scala     |   6 +-
 .../execution/CarbonLateDecodeStrategy.scala    |  42 ++-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  14 +-
 .../bucketing/TableBucketingTestCase.scala      | 193 ++++++++++++++
 .../newflow/CarbonDataLoadConfiguration.java    |  11 +
 .../newflow/DataLoadProcessBuilder.java         |  29 ++
 .../processing/newflow/row/CarbonRow.java       |   2 +
 ...arallelReadMergeSorterWithBucketingImpl.java | 265 +++++++++++++++++++
 ...ConverterProcessorWithBucketingStepImpl.java | 189 +++++++++++++
 .../steps/DataWriterProcessorStepImpl.java      |  79 +++---
 .../newflow/steps/SortProcessorStepImpl.java    |  11 +-
 .../sortandgroupby/sortdata/SortParameters.java |  28 ++
 .../store/CarbonFactDataHandlerColumnar.java    |   4 +
 .../store/CarbonFactDataHandlerModel.java       |   9 +-
 .../store/SingleThreadFinalSortFilesMerger.java |   4 +
 .../store/writer/AbstractFactDataWriter.java    |   6 +-
 .../store/writer/CarbonDataWriterVo.java        |   9 +
 44 files changed, 1417 insertions(+), 177 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-carbondata git commit: Added partitioner

Posted by ja...@apache.org.
Added partitioner

Added bucketing in load

Added headers

Bucketing is handled in load and query flow

Fixed test case

Rebased with master

rebased

Added bucketing in spark layer

Rebased and fixed scala style

Added test cases for bucketing in all scenerios. And fixed review comments

rebased and fixed issues

Rebased and fixed comments

Rebased and fixed testcases

Rebased and fixed testcases

Fixed comments

Rebased

Fixed compilation issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/cbf87977
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/cbf87977
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/cbf87977

Branch: refs/heads/master
Commit: cbf8797776c2f3be48efe029d858b37a37d29848
Parents: 65b9221
Author: ravipesala <ra...@gmail.com>
Authored: Sun Nov 27 16:58:55 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Wed Dec 28 22:14:11 2016 +0800

----------------------------------------------------------------------
 .../carbon/datastore/SegmentTaskIndexStore.java |  89 +++++--
 .../ThriftWrapperSchemaConverterImpl.java       |  30 +++
 .../carbon/metadata/schema/BucketingInfo.java   |  49 ++++
 .../metadata/schema/table/CarbonTable.java      |  14 +
 .../metadata/schema/table/TableSchema.java      |  14 +
 .../core/carbon/path/CarbonTablePath.java       |  37 ++-
 .../carbondata/core/partition/Partitioner.java  |  26 ++
 .../partition/impl/HashPartitionerImpl.java     | 105 ++++++++
 .../core/util/CarbonMetadataUtil.java           |   4 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   5 +-
 .../datastore/SegmentTaskIndexStoreTest.java    |   8 +-
 .../CarbonFormatDirectoryStructureTest.java     |   4 +-
 .../core/util/CarbonMetadataUtilTest.java       |   3 +-
 format/src/main/thrift/carbondata_index.thrift  |   1 +
 format/src/main/thrift/schema.thrift            |   9 +
 .../carbondata/hadoop/CarbonInputFormat.java    |  22 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  18 ++
 .../hadoop/CarbonMultiBlockSplit.java           |  23 +-
 .../internal/index/impl/InMemoryBTreeIndex.java |   9 +-
 .../apache/carbondata/spark/CarbonOption.scala  |   7 +
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   2 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 146 ++++++----
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   7 +-
 .../execution/command/carbonTableSchema.scala   |  29 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  |  11 +-
 .../apache/carbondata/spark/CarbonOption.scala  |   7 +
 .../org/apache/spark/sql/CarbonSource.scala     |  14 +-
 .../org/apache/spark/sql/TableCreator.scala     |   6 +-
 .../execution/CarbonLateDecodeStrategy.scala    |  42 ++-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  14 +-
 .../bucketing/TableBucketingTestCase.scala      | 193 ++++++++++++++
 .../newflow/CarbonDataLoadConfiguration.java    |  11 +
 .../newflow/DataLoadProcessBuilder.java         |  29 ++
 .../processing/newflow/row/CarbonRow.java       |   2 +
 ...arallelReadMergeSorterWithBucketingImpl.java | 265 +++++++++++++++++++
 ...ConverterProcessorWithBucketingStepImpl.java | 189 +++++++++++++
 .../steps/DataWriterProcessorStepImpl.java      |  79 +++---
 .../newflow/steps/SortProcessorStepImpl.java    |  11 +-
 .../sortandgroupby/sortdata/SortParameters.java |  28 ++
 .../store/CarbonFactDataHandlerColumnar.java    |   4 +
 .../store/CarbonFactDataHandlerModel.java       |   9 +-
 .../store/SingleThreadFinalSortFilesMerger.java |   4 +
 .../store/writer/AbstractFactDataWriter.java    |   6 +-
 .../store/writer/CarbonDataWriterVo.java        |   9 +
 44 files changed, 1417 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
index e2218a8..6ab18bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.carbondata.core.carbon.datastore;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -57,7 +58,8 @@ public class SegmentTaskIndexStore {
    * reason of so many map as each segment can have multiple data file and
    * each file will have its own btree
    */
-  private Map<AbsoluteTableIdentifier, Map<String, Map<String, AbstractIndex>>> tableSegmentMap;
+  private Map<AbsoluteTableIdentifier,
+      Map<String, Map<TaskBucketHolder, AbstractIndex>>> tableSegmentMap;
 
   /**
    * map of block info to lock object map, while loading the btree this will be filled
@@ -76,7 +78,7 @@ public class SegmentTaskIndexStore {
 
   private SegmentTaskIndexStore() {
     tableSegmentMap =
-        new ConcurrentHashMap<AbsoluteTableIdentifier, Map<String, Map<String, AbstractIndex>>>(
+        new ConcurrentHashMap<>(
             CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     tableLockMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Object>(
         CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -103,26 +105,26 @@ public class SegmentTaskIndexStore {
    * @return map of taks id to segment mapping
    * @throws IndexBuilderException
    */
-  public Map<String, AbstractIndex> loadAndGetTaskIdToSegmentsMap(
+  public Map<TaskBucketHolder, AbstractIndex> loadAndGetTaskIdToSegmentsMap(
       Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException {
     // task id to segment map
-    Map<String, AbstractIndex> taskIdToTableSegmentMap =
-        new HashMap<String, AbstractIndex>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    Map<TaskBucketHolder, AbstractIndex> taskIdToTableSegmentMap =
+        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     addLockObject(absoluteTableIdentifier);
     Iterator<Entry<String, List<TableBlockInfo>>> iteratorOverSegmentBlocksInfos =
         segmentToTableBlocksInfos.entrySet().iterator();
-    Map<String, Map<String, AbstractIndex>> tableSegmentMapTemp =
+    Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegmentMapTemp =
         addTableSegmentMap(absoluteTableIdentifier);
-    Map<String, AbstractIndex> taskIdToSegmentIndexMap = null;
+    Map<TaskBucketHolder, AbstractIndex> taskIdToSegmentIndexMap = null;
     String segmentId = null;
-    String taskId = null;
+    TaskBucketHolder taskId = null;
     try {
       while (iteratorOverSegmentBlocksInfos.hasNext()) {
         // segment id to table block mapping
         Entry<String, List<TableBlockInfo>> next = iteratorOverSegmentBlocksInfos.next();
         // group task id to table block info mapping for the segment
-        Map<String, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
+        Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
             mappedAndGetTaskIdToTableBlockInfo(segmentToTableBlocksInfos);
         // get the existing map of task id to table segment map
         segmentId = next.getKey();
@@ -142,11 +144,11 @@ public class SegmentTaskIndexStore {
             taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId);
             if (null == taskIdToSegmentIndexMap) {
               // creating a map of task id to table segment
-              taskIdToSegmentIndexMap = new ConcurrentHashMap<String, AbstractIndex>();
-              Iterator<Entry<String, List<TableBlockInfo>>> iterator =
+              taskIdToSegmentIndexMap = new ConcurrentHashMap<TaskBucketHolder, AbstractIndex>();
+              Iterator<Entry<TaskBucketHolder, List<TableBlockInfo>>> iterator =
                   taskIdToTableBlockInfoMap.entrySet().iterator();
               while (iterator.hasNext()) {
-                Entry<String, List<TableBlockInfo>> taskToBlockInfoList = iterator.next();
+                Entry<TaskBucketHolder, List<TableBlockInfo>> taskToBlockInfoList = iterator.next();
                 taskId = taskToBlockInfoList.getKey();
                 taskIdToSegmentIndexMap.put(taskId,
                     loadBlocks(taskId, taskToBlockInfoList.getValue(), absoluteTableIdentifier));
@@ -207,18 +209,18 @@ public class SegmentTaskIndexStore {
    * @param absoluteTableIdentifier
    * @return table segment map
    */
-  private Map<String, Map<String, AbstractIndex>> addTableSegmentMap(
+  private Map<String, Map<TaskBucketHolder, AbstractIndex>> addTableSegmentMap(
       AbsoluteTableIdentifier absoluteTableIdentifier) {
     // get the instance of lock object
     Object lockObject = tableLockMap.get(absoluteTableIdentifier);
-    Map<String, Map<String, AbstractIndex>> tableSegmentMapTemp =
+    Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegmentMapTemp =
         tableSegmentMap.get(absoluteTableIdentifier);
     if (null == tableSegmentMapTemp) {
       synchronized (lockObject) {
         // segment id to task id to table segment map
         tableSegmentMapTemp = tableSegmentMap.get(absoluteTableIdentifier);
         if (null == tableSegmentMapTemp) {
-          tableSegmentMapTemp = new ConcurrentHashMap<String, Map<String, AbstractIndex>>();
+          tableSegmentMapTemp = new ConcurrentHashMap<>();
           tableSegmentMap.put(absoluteTableIdentifier, tableSegmentMapTemp);
         }
       }
@@ -233,12 +235,13 @@ public class SegmentTaskIndexStore {
    * @return loaded segment
    * @throws CarbonUtilException
    */
-  private AbstractIndex loadBlocks(String taskId, List<TableBlockInfo> tableBlockInfoList,
+  private AbstractIndex loadBlocks(TaskBucketHolder holder, List<TableBlockInfo> tableBlockInfoList,
       AbsoluteTableIdentifier tableIdentifier) throws CarbonUtilException {
     // all the block of one task id will be loaded together
     // so creating a list which will have all the data file meta data to of one task
-    List<DataFileFooter> footerList =
-        CarbonUtil.readCarbonIndexFile(taskId, tableBlockInfoList, tableIdentifier);
+    List<DataFileFooter> footerList = CarbonUtil
+        .readCarbonIndexFile(holder.taskNo, holder.bucketNumber, tableBlockInfoList,
+            tableIdentifier);
     AbstractIndex segment = new SegmentTaskIndex();
     // file path of only first block is passed as it all table block info path of
     // same task id will be same
@@ -253,10 +256,10 @@ public class SegmentTaskIndexStore {
    * @param segmentToTableBlocksInfos segment if to table blocks info map
    * @return task id to table block info mapping
    */
-  private Map<String, List<TableBlockInfo>> mappedAndGetTaskIdToTableBlockInfo(
+  private Map<TaskBucketHolder, List<TableBlockInfo>> mappedAndGetTaskIdToTableBlockInfo(
       Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos) {
-    Map<String, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
-        new ConcurrentHashMap<String, List<TableBlockInfo>>();
+    Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
+        new ConcurrentHashMap<>();
     Iterator<Entry<String, List<TableBlockInfo>>> iterator =
         segmentToTableBlocksInfos.entrySet().iterator();
     while (iterator.hasNext()) {
@@ -264,10 +267,12 @@ public class SegmentTaskIndexStore {
       List<TableBlockInfo> value = next.getValue();
       for (TableBlockInfo blockInfo : value) {
         String taskNo = DataFileUtil.getTaskNo(blockInfo.getFilePath());
-        List<TableBlockInfo> list = taskIdToTableBlockInfoMap.get(taskNo);
+        String bucketNo = DataFileUtil.getBucketNo(blockInfo.getFilePath());
+        TaskBucketHolder bucketHolder = new TaskBucketHolder(taskNo, bucketNo);
+        List<TableBlockInfo> list = taskIdToTableBlockInfoMap.get(bucketHolder);
         if (null == list) {
           list = new ArrayList<TableBlockInfo>();
-          taskIdToTableBlockInfoMap.put(taskNo, list);
+          taskIdToTableBlockInfoMap.put(bucketHolder, list);
         }
         list.add(blockInfo);
       }
@@ -304,7 +309,8 @@ public class SegmentTaskIndexStore {
       return;
     }
     // Acquire the lock and remove only those instance which was loaded
-    Map<String, Map<String, AbstractIndex>> map = tableSegmentMap.get(absoluteTableIdentifier);
+    Map<String, Map<TaskBucketHolder, AbstractIndex>> map =
+        tableSegmentMap.get(absoluteTableIdentifier);
     // if there is no loaded blocks then return
     if (null == map) {
       return;
@@ -322,13 +328,44 @@ public class SegmentTaskIndexStore {
    * @param segmentId
    * @return is loaded then return the loaded blocks otherwise null
    */
-  public Map<String, AbstractIndex> getSegmentBTreeIfExists(
+  public Map<TaskBucketHolder, AbstractIndex> getSegmentBTreeIfExists(
       AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) {
-    Map<String, Map<String, AbstractIndex>> tableSegment =
+    Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegment =
         tableSegmentMap.get(absoluteTableIdentifier);
     if (null == tableSegment) {
       return null;
     }
     return tableSegment.get(segmentId);
   }
+
+  public static class TaskBucketHolder implements Serializable {
+
+    public String taskNo;
+
+    public String bucketNumber;
+
+    public TaskBucketHolder(String taskNo, String bucketNumber) {
+      this.taskNo = taskNo;
+      this.bucketNumber = bucketNumber;
+    }
+
+    @Override public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      TaskBucketHolder that = (TaskBucketHolder) o;
+
+      if (taskNo != null ? !taskNo.equals(that.taskNo) : that.taskNo != null) return false;
+      return bucketNumber != null ?
+          bucketNumber.equals(that.bucketNumber) :
+          that.bucketNumber == null;
+
+    }
+
+    @Override public int hashCode() {
+      int result = taskNo != null ? taskNo.hashCode() : 0;
+      result = 31 * result + (bucketNumber != null ? bucketNumber.hashCode() : 0);
+      return result;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 7d5386e..4b2be5f 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolutionEntry;
 import org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo;
@@ -190,9 +191,24 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         new org.apache.carbondata.format.TableSchema(
             wrapperTableSchema.getTableId(), thriftColumnSchema, schemaEvolution);
     externalTableSchema.setTableProperties(wrapperTableSchema.getTableProperties());
+    if (wrapperTableSchema.getBucketingInfo() != null) {
+      externalTableSchema.setBucketingInfo(
+          fromWrapperToExternalBucketingInfo(wrapperTableSchema.getBucketingInfo()));
+    }
     return externalTableSchema;
   }
 
+  private org.apache.carbondata.format.BucketingInfo fromWrapperToExternalBucketingInfo(
+      BucketingInfo bucketingInfo) {
+    List<org.apache.carbondata.format.ColumnSchema> thriftColumnSchema =
+        new ArrayList<org.apache.carbondata.format.ColumnSchema>();
+    for (ColumnSchema wrapperColumnSchema : bucketingInfo.getListOfColumns()) {
+      thriftColumnSchema.add(fromWrapperToExternalColumnSchema(wrapperColumnSchema));
+    }
+    return new org.apache.carbondata.format.BucketingInfo(thriftColumnSchema,
+        bucketingInfo.getNumberOfBuckets());
+  }
+
   /* (non-Javadoc)
    * convert from wrapper to external tableinfo
    */
@@ -365,9 +381,23 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
     wrapperTableSchema.setListOfColumns(listOfColumns);
     wrapperTableSchema.setSchemaEvalution(
         fromExternalToWrapperSchemaEvolution(externalTableSchema.getSchema_evolution()));
+    if (externalTableSchema.isSetBucketingInfo()) {
+      wrapperTableSchema.setBucketingInfo(
+          fromExternalToWarpperBucketingInfo(externalTableSchema.bucketingInfo));
+    }
     return wrapperTableSchema;
   }
 
+  private BucketingInfo fromExternalToWarpperBucketingInfo(
+      org.apache.carbondata.format.BucketingInfo externalBucketInfo) {
+    List<ColumnSchema> listOfColumns = new ArrayList<ColumnSchema>();
+    for (org.apache.carbondata.format.ColumnSchema externalColumnSchema :
+          externalBucketInfo.table_columns) {
+      listOfColumns.add(fromExternalToWrapperColumnSchema(externalColumnSchema));
+    }
+    return new BucketingInfo(listOfColumns, externalBucketInfo.number_of_buckets);
+  }
+
   /* (non-Javadoc)
    * convert from external to wrapper tableinfo
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java
new file mode 100644
index 0000000..75c888d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.carbon.metadata.schema;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * Bucketing information
+ */
+public class BucketingInfo implements Serializable {
+
+  private List<ColumnSchema> listOfColumns;
+
+  private int numberOfBuckets;
+
+  public BucketingInfo(List<ColumnSchema> listOfColumns, int numberOfBuckets) {
+    this.listOfColumns = listOfColumns;
+    this.numberOfBuckets = numberOfBuckets;
+  }
+
+  public List<ColumnSchema> getListOfColumns() {
+    return listOfColumns;
+  }
+
+  public int getNumberOfBuckets() {
+    return numberOfBuckets;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
index d3e2e62..7766616 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
@@ -72,6 +73,11 @@ public class CarbonTable implements Serializable {
   private Map<String, List<CarbonMeasure>> tableMeasuresMap;
 
   /**
+   * table bucket map.
+   */
+  private Map<String, BucketingInfo> tableBucketMap;
+
+  /**
    * tableUniqueName
    */
   private String tableUniqueName;
@@ -99,6 +105,7 @@ public class CarbonTable implements Serializable {
   public CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableMeasuresMap = new HashMap<String, List<CarbonMeasure>>();
+    this.tableBucketMap = new HashMap<>();
     this.aggregateTablesName = new ArrayList<String>();
     this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
   }
@@ -124,7 +131,10 @@ public class CarbonTable implements Serializable {
     for (TableSchema aggTable : aggregateTableList) {
       this.aggregateTablesName.add(aggTable.getTableName());
       fillDimensionsAndMeasuresForTables(aggTable);
+      tableBucketMap.put(aggTable.getTableName(), aggTable.getBucketingInfo());
     }
+    tableBucketMap.put(tableInfo.getFactTable().getTableName(),
+        tableInfo.getFactTable().getBucketingInfo());
   }
 
   /**
@@ -474,6 +484,10 @@ public class CarbonTable implements Serializable {
     return null;
   }
 
+  public BucketingInfo getBucketingInfo(String tableName) {
+    return tableBucketMap.get(tableName);
+  }
+
   /**
    * @return absolute table identifier
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java
index 348f235..9beeff2 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -62,6 +63,11 @@ public class TableSchema implements Serializable {
    */
   private Map<String, String> tableProperties;
 
+  /**
+   * Information about bucketing of fields and number of buckets
+   */
+  private BucketingInfo bucketingInfo;
+
   public TableSchema() {
     this.listOfColumns = new ArrayList<ColumnSchema>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   }
@@ -202,4 +208,12 @@ public class TableSchema implements Serializable {
   public void setTableProperties(Map<String, String> tableProperties) {
     this.tableProperties = tableProperties;
   }
+
+  public BucketingInfo getBucketingInfo() {
+    return bucketingInfo;
+  }
+
+  public void setBucketingInfo(BucketingInfo bucketingInfo) {
+    this.bucketingInfo = bucketingInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
index 54e7266..cda971a 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
@@ -215,9 +215,9 @@ public class CarbonTablePath extends Path {
    * @return absolute path of data file stored in carbon data format
    */
   public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo,
-      Integer taskNo, String factUpdateTimeStamp) {
+      Integer taskNo, int bucketNumber, String factUpdateTimeStamp) {
     return getSegmentDir(partitionId, segmentId) + File.separator + getCarbonDataFileName(
-        filePartNo, taskNo, factUpdateTimeStamp);
+        filePartNo, taskNo, bucketNumber, factUpdateTimeStamp);
   }
 
   /**
@@ -230,14 +230,15 @@ public class CarbonTablePath extends Path {
    * @return full qualified carbon index path
    */
   public String getCarbonIndexFilePath(final String taskId, final String partitionId,
-      final String segmentId) {
+      final String segmentId, final String bucketNumber) {
     String segmentDir = getSegmentDir(partitionId, segmentId);
     CarbonFile carbonFile =
         FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
 
     CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
-        return file.getName().startsWith(taskId) && file.getName().endsWith(INDEX_FILE_EXT);
+        return file.getName().startsWith(taskId + "-" + bucketNumber) && file.getName()
+            .endsWith(INDEX_FILE_EXT);
       }
     });
     return files[0].getAbsolutePath();
@@ -262,10 +263,10 @@ public class CarbonTablePath extends Path {
    * @param factUpdateTimeStamp unique identifier to identify an update
    * @return gets data file name only with out path
    */
-  public String getCarbonDataFileName(Integer filePartNo, Integer taskNo,
+  public String getCarbonDataFileName(Integer filePartNo, Integer taskNo, int bucketNumber,
       String factUpdateTimeStamp) {
-    return DATA_PART_PREFIX + "-" + filePartNo + "-" + taskNo + "-" + factUpdateTimeStamp
-        + CARBON_DATA_EXT;
+    return DATA_PART_PREFIX + "-" + filePartNo + "-" + taskNo + "-" + bucketNumber + "-"
+        + factUpdateTimeStamp + CARBON_DATA_EXT;
   }
 
   /**
@@ -275,8 +276,8 @@ public class CarbonTablePath extends Path {
    * @param factUpdatedTimeStamp time stamp
    * @return filename
    */
-  public String getCarbonIndexFileName(int taskNo, String factUpdatedTimeStamp) {
-    return taskNo + "-" + factUpdatedTimeStamp + INDEX_FILE_EXT;
+  public String getCarbonIndexFileName(int taskNo, int bucketNumber, String factUpdatedTimeStamp) {
+    return taskNo + "-" + bucketNumber + "-" + factUpdatedTimeStamp + INDEX_FILE_EXT;
   }
 
   private String getSegmentDir(String partitionId, String segmentId) {
@@ -352,6 +353,24 @@ public class CarbonTablePath extends Path {
     }
 
     /**
+     * gets updated timestamp information from given carbon data file name
+     */
+    public static String getBucketNo(String carbonFilePath) {
+      // Get the file name from path
+      String fileName = getFileName(carbonFilePath);
+      // + 1 for size of "-"
+      int firstDashPos = fileName.indexOf("-");
+      int secondDash = fileName.indexOf("-", firstDashPos + 1);
+      int startIndex = fileName.indexOf("-", secondDash + 1) + 1;
+      int endIndex = fileName.indexOf("-", startIndex);
+      // to support backward compatibility
+      if (startIndex == -1 || endIndex == -1) {
+        return "0";
+      }
+      return fileName.substring(startIndex, endIndex);
+    }
+
+    /**
      * Gets the file name from file path
      */
     private static String getFileName(String carbonDataFileName) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java b/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java
new file mode 100644
index 0000000..1907687
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.partition;
+
+/**
+ * Partitions the data as per key
+ */
+public interface Partitioner<Key> {
+
+  int getPartition(Key key);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java b/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java
new file mode 100644
index 0000000..a702a6b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.partition.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.partition.Partitioner;
+
+/**
+ * Hash partitioner implementation
+ */
+public class HashPartitionerImpl implements Partitioner<Object[]> {
+
+  private int numberOfBuckets;
+
+  private Hash[] hashes;
+
+  public HashPartitionerImpl(List<Integer> indexes, List<ColumnSchema> columnSchemas,
+      int numberOfBuckets) {
+    this.numberOfBuckets = numberOfBuckets;
+    hashes = new Hash[indexes.size()];
+    for (int i = 0; i < indexes.size(); i++) {
+      switch(columnSchemas.get(i).getDataType()) {
+        case SHORT:
+        case INT:
+        case LONG:
+          hashes[i] = new IntegralHash(indexes.get(i));
+          break;
+        case DOUBLE:
+        case FLOAT:
+        case DECIMAL:
+          hashes[i] = new DecimalHash(indexes.get(i));
+          break;
+        default:
+          hashes[i] = new StringHash(indexes.get(i));
+      }
+    }
+  }
+
+  @Override public int getPartition(Object[] objects) {
+    int hashCode = 0;
+    for (Hash hash : hashes) {
+      hashCode += hash.getHash(objects);
+    }
+    return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
+  }
+
+  private interface Hash {
+    int getHash(Object[] value);
+  }
+
+  private static class IntegralHash implements Hash {
+
+    private int index;
+
+    private IntegralHash(int index) {
+      this.index = index;
+    }
+
+    public int getHash(Object[] value) {
+      return value[index] != null ? Long.valueOf(value[index].toString()).hashCode() : 0;
+    }
+  }
+
+  private static class DecimalHash implements Hash {
+
+    private int index;
+
+    private DecimalHash(int index) {
+      this.index = index;
+    }
+
+    public int getHash(Object[] value) {
+      return value[index] != null ? Double.valueOf(value[index].toString()).hashCode() : 0;
+    }
+  }
+
+  private static class StringHash implements Hash {
+
+    private int index;
+
+    private StringHash(int index) {
+      this.index = index;
+    }
+
+    @Override public int getHash(Object[] value) {
+      return value[index] != null ? value[index].hashCode() : 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 747862f..3415d92 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -467,7 +467,7 @@ public class CarbonMetadataUtil {
    * @return Index header object
    */
   public static IndexHeader getIndexHeader(int[] columnCardinality,
-      List<ColumnSchema> columnSchemaList) {
+      List<ColumnSchema> columnSchemaList, int bucketNumber) {
     // create segment info object
     SegmentInfo segmentInfo = new SegmentInfo();
     // set the number of columns
@@ -482,6 +482,8 @@ public class CarbonMetadataUtil {
     indexHeader.setSegment_info(segmentInfo);
     // set the column names
     indexHeader.setTable_columns(columnSchemaList);
+    // set the bucket number
+    indexHeader.setBucket_id(bucketNumber);
     return indexHeader;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 3980cb3..fbbed76 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1093,7 +1093,7 @@ public final class CarbonUtil {
    * @return list of block info
    * @throws CarbonUtilException if any problem while reading
    */
-  public static List<DataFileFooter> readCarbonIndexFile(String taskId,
+  public static List<DataFileFooter> readCarbonIndexFile(String taskId, String bucketNumber,
       List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier)
       throws CarbonUtilException {
     // need to sort the  block info list based for task in ascending  order so
@@ -1105,7 +1105,8 @@ public final class CarbonUtil {
     // geting the index file path
     //TODO need to pass proper partition number when partiton will be supported
     String carbonIndexFilePath = carbonTablePath
-        .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId());
+        .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(),
+            bucketNumber);
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
     try {
       // read the index info and return

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
index 7cb213c..722d030 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
@@ -90,6 +90,7 @@ public class SegmentTaskIndexStoreTest {
 
     new MockUp<CarbonUtil>() {
       @Mock List<DataFileFooter> readCarbonIndexFile(String taskId,
+          String bucketNumber,
           List<TableBlockInfo> tableBlockInfoList,
           AbsoluteTableIdentifier absoluteTableIdentifier) {
         return getDataFileFooters();
@@ -101,18 +102,17 @@ public class SegmentTaskIndexStoreTest {
       }
     };
 
-    Map<String, AbstractIndex> result =
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> result =
         taskIndexStore.loadAndGetTaskIdToSegmentsMap(new HashMap<String, List<TableBlockInfo>>() {{
           put("SG100", Arrays.asList(tableBlockInfo));
         }}, absoluteTableIdentifier);
 
     assertEquals(result.size(), 1);
-    assertTrue(result.containsKey(new String("100")));
-
+    assertTrue(result.containsKey(new SegmentTaskIndexStore.TaskBucketHolder("100", "0")));
   }
 
   @Test public void checkExistenceOfSegmentBTree() {
-    Map<String, AbstractIndex> result =
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> result =
         taskIndexStore.getSegmentBTreeIfExists(absoluteTableIdentifier, "SG100");
     assertNull(result);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java
index 437a13f..95dd5d7 100644
--- a/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java
@@ -53,8 +53,8 @@ public class CarbonFormatDirectoryStructureTest {
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta"));
     assertTrue(carbonTablePath.getSortIndexFilePath("t1_c1").replace("\\", "/")
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex"));
-    assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4, "999").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4-999.carbondata"));
+    assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4, 0, "999").replace("\\", "/")
+        .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4-0-999.carbondata"));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index b3647a8..b7d3a01 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -160,7 +160,8 @@ public class CarbonMetadataUtilTest {
     indexHeader.setVersion(2);
     indexHeader.setSegment_info(segmentInfo);
     indexHeader.setTable_columns(columnSchemaList);
-    IndexHeader indexheaderResult = getIndexHeader(columnCardinality, columnSchemaList);
+    indexHeader.setBucket_id(0);
+    IndexHeader indexheaderResult = getIndexHeader(columnCardinality, columnSchemaList, 0);
     assertEquals(indexHeader, indexheaderResult);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/format/src/main/thrift/carbondata_index.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata_index.thrift b/format/src/main/thrift/carbondata_index.thrift
index e5fda5d..364a7e5 100644
--- a/format/src/main/thrift/carbondata_index.thrift
+++ b/format/src/main/thrift/carbondata_index.thrift
@@ -32,6 +32,7 @@ struct IndexHeader{
   1: required i32 version; // version used for data compatibility
   2: required list<schema.ColumnSchema> table_columns;	// Description of columns in this file
   3: required carbondata.SegmentInfo segment_info;	// Segment info (will be same/repeated for all files in this segment)
+  4: optional i32 bucket_id; //bucket number in which file contains
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/format/src/main/thrift/schema.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index 377c372..775573c 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -122,6 +122,14 @@ struct SchemaEvolution{
 }
 
 /**
+* Bucketing information of fields on table
+**/
+struct BucketingInfo{
+  1: required list<ColumnSchema> table_columns;
+  2: required i32 number_of_buckets;
+}
+
+/**
 * The description of table schema
 */
 struct TableSchema{
@@ -129,6 +137,7 @@ struct TableSchema{
 	2: required list<ColumnSchema> table_columns; // Columns in the table
 	3: required SchemaEvolution schema_evolution; // History of schema evolution of this table
   4: optional map<string,string> tableProperties; // table properties configured bu the user
+  5: optional BucketingInfo bucketingInfo; // bucketing information
 }
 
 struct TableInfo{

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index aa995fa..b69df86 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -42,7 +42,9 @@ import org.apache.carbondata.core.carbon.datastore.impl.btree.BlockBTreeLeafNode
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.carbon.path.CarbonStorePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.carbon.querystatistics.*;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatistic;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsConstants;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
@@ -319,7 +321,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       String segmentId) throws IndexBuilderException, IOException {
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
-    Map<String, AbstractIndex> segmentIndexMap =
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
         getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId);
 
     List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
@@ -379,10 +381,20 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return tableBlockInfoList;
   }
 
-  private Map<String, AbstractIndex> getSegmentAbstractIndexs(JobContext job,
-      AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId)
+  /**
+   * It returns index for each task file.
+   * @param job
+   * @param absoluteTableIdentifier
+   * @param segmentId
+   * @return
+   * @throws IOException
+   * @throws IndexBuilderException
+   */
+  private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
+      JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId)
       throws IOException, IndexBuilderException {
-    Map<String, AbstractIndex> segmentIndexMap = SegmentTaskIndexStore.getInstance()
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+        SegmentTaskIndexStore.getInstance()
         .getSegmentBTreeIfExists(absoluteTableIdentifier, segmentId);
 
     // if segment tree is not loaded, load the segment tree

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 8b87cad..a4acd9c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -45,7 +45,10 @@ public class CarbonInputSplit extends FileSplit
 
   private static final long serialVersionUID = 3520344046772190207L;
   public String taskId;
+
   private String segmentId;
+
+  private String bucketId;
   /*
    * Invalid segments that need to be removed in task side index
    */
@@ -61,6 +64,7 @@ public class CarbonInputSplit extends FileSplit
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
+    bucketId = "0";
     numberOfBlocklets = 0;
     invalidSegments = new ArrayList<>();
     version = CarbonProperties.getInstance().getFormatVersion();
@@ -71,6 +75,7 @@ public class CarbonInputSplit extends FileSplit
     super(path, start, length, locations);
     this.segmentId = segmentId;
     this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
+    this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName());
     this.invalidSegments = new ArrayList<>();
     this.version = version;
   }
@@ -124,6 +129,7 @@ public class CarbonInputSplit extends FileSplit
     super.readFields(in);
     this.segmentId = in.readUTF();
     this.version = ColumnarFormatVersion.valueOf(in.readShort());
+    this.bucketId = in.readUTF();
     int numInvalidSegment = in.readInt();
     invalidSegments = new ArrayList<>(numInvalidSegment);
     for (int i = 0; i < numInvalidSegment; i++) {
@@ -135,6 +141,7 @@ public class CarbonInputSplit extends FileSplit
     super.write(out);
     out.writeUTF(segmentId);
     out.writeShort(version.number());
+    out.writeUTF(bucketId);
     out.writeInt(invalidSegments.size());
     for (String invalidSegment : invalidSegments) {
       out.writeUTF(invalidSegment);
@@ -166,6 +173,10 @@ public class CarbonInputSplit extends FileSplit
     this.version = version;
   }
 
+  public String getBucketId() {
+    return bucketId;
+  }
+
   @Override public int compareTo(Distributable o) {
     CarbonInputSplit other = (CarbonInputSplit) o;
     int compareResult = 0;
@@ -193,6 +204,13 @@ public class CarbonInputSplit extends FileSplit
       if (firstTaskId != otherTaskId) {
         return firstTaskId - otherTaskId;
       }
+
+      int firstBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath1));
+      int otherBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath2));
+      if (firstBucketNo != otherBucketNo) {
+        return firstBucketNo - otherBucketNo;
+      }
+
       // compare the part no of both block info
       int firstPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath1));
       int SecondPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath2));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index a13d6ba..26b5252 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -42,19 +42,19 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
   private List<CarbonInputSplit> splitList;
 
   /*
-   * The location of all wrapped splits belong to the same node
+   * The locations of all wrapped splits
    */
-  private String location;
+  private String[] locations;
 
   public CarbonMultiBlockSplit() {
     splitList = null;
-    location = null;
+    locations = null;
   }
 
   public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList,
-      String location) throws IOException {
+      String[] locations) throws IOException {
     this.splitList = splitList;
-    this.location = location;
+    this.locations = locations;
   }
 
   /**
@@ -76,7 +76,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
 
   @Override
   public String[] getLocations() throws IOException, InterruptedException {
-    return new String[]{location};
+    return locations;
   }
 
   @Override
@@ -86,7 +86,10 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
     for (CarbonInputSplit split: splitList) {
       split.write(out);
     }
-    out.writeUTF(location);
+    out.writeInt(locations.length);
+    for (int i = 0; i < locations.length; i++) {
+      out.writeUTF(locations[i]);
+    }
   }
 
   @Override
@@ -99,7 +102,11 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
       split.readFields(in);
       splitList.add(split);
     }
-    location = in.readUTF();
+    int len = in.readInt();
+    locations = new String[len];
+    for (int i = 0; i < len; i++) {
+      locations[i] = in.readUTF();
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
index c238e10..82bcf1c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
@@ -100,10 +100,10 @@ class InMemoryBTreeIndex implements Index {
     return result;
   }
 
-  private Map<String, AbstractIndex> getSegmentAbstractIndexs(JobContext job,
-      AbsoluteTableIdentifier identifier)
+  private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
+      JobContext job, AbsoluteTableIdentifier identifier)
       throws IOException, IndexBuilderException {
-    Map<String, AbstractIndex> segmentIndexMap =
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
         SegmentTaskIndexStore.getInstance().getSegmentBTreeIfExists(identifier, segment.getId());
 
     // if segment tree is not loaded, load the segment tree
@@ -153,7 +153,8 @@ class InMemoryBTreeIndex implements Index {
 
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
-    Map<String, AbstractIndex> segmentIndexMap = getSegmentAbstractIndexs(job, identifier);
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+        getSegmentAbstractIndexs(job, identifier);
 
     List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 213712e..c63b43d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -46,5 +46,12 @@ class CarbonOption(options: Map[String, String]) {
 
   def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
 
+  def bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt
+
+  def bucketColumns: String = options.getOrElse("bucketcolumns", "")
+
+  def isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
+                                    options.contains("bucketnumber")
+
   def toMap: Map[String, String] = options
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index ccaf9e3..99dc853 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -302,7 +302,7 @@ class CarbonMergerRDD[K, V](
       }
       if (blockletCount != 0) {
         val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
-          carbonInputSplits.asJava, nodeName)
+          carbonInputSplits.asJava, Array(nodeName))
         result.add(new CarbonSparkPartition(id, i, multiBlockSplit))
         i += 1
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 2705f94..5d972bf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -65,6 +65,8 @@ class CarbonScanRDD(
 
   private val readSupport = SparkReadSupport.readSupportClass
 
+  private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+
   @transient private val jobId = new JobID(jobTrackerId, id)
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
@@ -95,36 +97,54 @@ class CarbonScanRDD(
     var noOfTasks = 0
 
     if (!splits.isEmpty) {
-      // create a list of block based on split
-      val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
-
-      // get the list of executors and map blocks to executors based on locality
-      val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
-
-      // divide the blocks among the tasks of the nodes as per the data locality
-      val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
-        parallelism, activeNodes.toList.asJava)
 
       statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
       statisticRecorder.recordStatisticsForDriver(statistic, queryId)
       statistic = new QueryStatistic()
 
-      var i = 0
-      // Create Spark Partition for each task and assign blocks
-      nodeBlockMapping.asScala.foreach { case (node, blockList) =>
-        blockList.asScala.foreach { blocksPerTask =>
-          val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
-          if (blocksPerTask.size() != 0) {
-            val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node)
-            val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
-            result.add(partition)
-            i += 1
+      // If bucketing is enabled on table then partitions should be grouped based on buckets.
+      if (bucketedTable != null) {
+        var i = 0
+        val bucketed =
+          splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => f.getBucketId)
+        (0 until bucketedTable.getNumberOfBuckets).map { bucketId =>
+          val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil)
+          val multiBlockSplit =
+            new CarbonMultiBlockSplit(identifier,
+              bucketPartitions.asJava,
+              bucketPartitions.flatMap(_.getLocations).toArray)
+          val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+          i += 1
+          result.add(partition)
+        }
+      } else {
+        // create a list of block based on split
+        val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
+
+        // get the list of executors and map blocks to executors based on locality
+        val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
+
+        // divide the blocks among the tasks of the nodes as per the data locality
+        val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
+          parallelism, activeNodes.toList.asJava)
+        var i = 0
+        // Create Spark Partition for each task and assign blocks
+        nodeBlockMapping.asScala.foreach { case (node, blockList) =>
+          blockList.asScala.foreach { blocksPerTask =>
+            val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
+            if (blocksPerTask.size() != 0) {
+              val multiBlockSplit =
+                new CarbonMultiBlockSplit(identifier, splits.asJava, Array(node))
+              val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+              result.add(partition)
+              i += 1
+            }
           }
         }
+        noOfNodes = nodeBlockMapping.size
       }
 
       noOfBlocks = splits.size
-      noOfNodes = nodeBlockMapping.size
       noOfTasks = result.size()
 
       statistic = new QueryStatistic()
@@ -155,58 +175,68 @@ class CarbonScanRDD(
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
     val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
     val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
-    val model = format.getQueryModel(inputSplit, attemptContext)
-    val reader = {
-      if (vectorReader) {
-        val carbonRecordReader = createVectorizedCarbonRecordReader(model)
-        if (carbonRecordReader == null) {
-          new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration))
+    val iterator = if (inputSplit.getAllSplits.size() > 0) {
+      val model = format.getQueryModel(inputSplit, attemptContext)
+      val reader = {
+        if (vectorReader) {
+          val carbonRecordReader = createVectorizedCarbonRecordReader(model)
+          if (carbonRecordReader == null) {
+            new CarbonRecordReader(model,
+              format.getReadSupportClass(attemptContext.getConfiguration))
+          } else {
+            carbonRecordReader
+          }
         } else {
-          carbonRecordReader
+          new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration))
         }
-      } else {
-        new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration))
       }
-    }
 
-    reader.initialize(inputSplit, attemptContext)
+      reader.initialize(inputSplit, attemptContext)
+      val queryStartTime = System.currentTimeMillis
 
-    val queryStartTime = System.currentTimeMillis
+      new Iterator[Any] {
+        private var havePair = false
+        private var finished = false
+        private var count = 0
 
-    val iterator = new Iterator[Any] {
-      private var havePair = false
-      private var finished = false
-      private var count = 0
-
-      context.addTaskCompletionListener { context =>
-        logStatistics(queryStartTime, count)
-        reader.close()
-      }
+        context.addTaskCompletionListener { context =>
+          logStatistics(queryStartTime, count)
+          reader.close()
+        }
 
-      override def hasNext: Boolean = {
-        if (context.isInterrupted) {
-          throw new TaskKilledException
+        override def hasNext: Boolean = {
+          if (context.isInterrupted) {
+            throw new TaskKilledException
+          }
+          if (!finished && !havePair) {
+            finished = !reader.nextKeyValue
+            if (finished) {
+              reader.close()
+            }
+            havePair = !finished
+          }
+          !finished
         }
-        if (!finished && !havePair) {
-          finished = !reader.nextKeyValue
-          if (finished) {
-            reader.close()
+
+        override def next(): Any = {
+          if (!hasNext) {
+            throw new java.util.NoSuchElementException("End of stream")
           }
-          havePair = !finished
+          havePair = false
+          val value = reader.getCurrentValue
+          count += 1
+          value
         }
-        !finished
       }
+    } else {
+      new Iterator[Any] {
+        override def hasNext: Boolean = false
 
-      override def next(): Any = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        val value = reader.getCurrentValue
-        count += 1
-        value
+        override def next(): Any = throw new java.util.NoSuchElementException("End of stream")
       }
     }
+
+
     iterator.asInstanceOf[Iterator[InternalRow]]
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index a5088df..461633d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -223,8 +223,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
       , tableName: String, fields: Seq[Field],
       partitionCols: Seq[PartitionerField],
-      tableProperties: Map[String, String]): TableModel
-  = {
+      tableProperties: Map[String, String],
+      bucketFields: Option[BucketFields]): TableModel = {
 
     fields.zipWithIndex.foreach { x =>
       x._1.schemaOrdinal = x._2
@@ -268,7 +268,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
       groupCols,
-      Some(colProps))
+      Some(colProps),
+      bucketFields: Option[BucketFields])
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f646f1d..ec064ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -29,7 +29,7 @@ import org.apache.carbondata.common.factory.CarbonCommonFactory
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
-import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
+import org.apache.carbondata.core.carbon.metadata.schema.{BucketingInfo, SchemaEvolution, SchemaEvolutionEntry}
 import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -50,7 +50,9 @@ case class TableModel(
     highcardinalitydims: Option[Seq[String]],
     noInvertedIdxCols: Option[Seq[String]],
     columnGroups: Seq[String],
-    colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None)
+    colProps: Option[util.Map[String,
+    util.List[ColumnProperty]]] = None,
+    bucketFields: Option[BucketFields])
 
 case class Field(column: String, var dataType: Option[String], name: Option[String],
     children: Option[List[Field]], parent: String = null,
@@ -69,6 +71,8 @@ case class Partitioner(partitionClass: String, partitionColumn: Array[String], p
 case class PartitionerField(partitionColumn: String, dataType: Option[String],
     columnComment: String)
 
+case class BucketFields(bucketColumns: Seq[String], numberOfBuckets: Int)
+
 case class DataLoadTableFileMapping(table: String, loadPath: String)
 
 case class CarbonMergerMapping(storeLocation: String,
@@ -300,6 +304,27 @@ class TableNewProcessor(cm: TableModel) {
       x => tablePropertiesMap.put(x._1, x._2)
     }
     tableSchema.setTableProperties(tablePropertiesMap)
+    if (cm.bucketFields.isDefined) {
+      val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
+        val col = allColumns.find(_.getColumnName.equalsIgnoreCase(b))
+        col match {
+          case Some(colSchema: ColumnSchema) =>
+            if (colSchema.isDimensionColumn && !colSchema.isComplex) {
+              colSchema
+            } else {
+              LOGGER.error(s"Bucket field must be dimension column and " +
+                           s"should not be measure or complex column: ${colSchema.getColumnName}")
+              sys.error(s"Bucket field must be dimension column and " +
+                        s"should not be measure or complex column: ${colSchema.getColumnName}")
+            }
+          case _ =>
+            LOGGER.error(s"Bucket field is not present in table columns")
+            sys.error(s"Bucket field is not present in table columns")
+        }
+      }
+      tableSchema.setBucketingInfo(
+        new BucketingInfo(bucketCols.asJava, cm.bucketFields.get.numberOfBuckets))
+    }
     tableSchema.setTableName(cm.tableName)
     tableSchema.setListOfColumns(allColumns.asJava)
     tableSchema.setSchemaEvalution(schemaEvol)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 16e35f4..7318e27 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -159,6 +159,7 @@ class CarbonSqlParser() extends CarbonDDLSqlParser {
         var ifNotExistPresent: Boolean = false
         var dbName: Option[String] = None
         var tableName: String = ""
+        var bucketFields: Option[BucketFields] = None
 
         try {
 
@@ -252,6 +253,13 @@ class CarbonSqlParser() extends CarbonDDLSqlParser {
 
             case Token("TOK_LIKETABLE", child :: Nil) =>
               likeTableName = child.getChild(0).getText()
+            case Token("TOK_ALTERTABLE_BUCKETS",
+                  Token("TOK_TABCOLNAME", list)::numberOfBuckets) =>
+              val cols = list.map(_.getText)
+              if (cols != null) {
+                bucketFields = Some(BucketFields(cols,
+                  numberOfBuckets.head.getText.toInt))
+              }
 
             case _ => // Unsupport features
           }
@@ -267,7 +275,8 @@ class CarbonSqlParser() extends CarbonDDLSqlParser {
             tableName,
             fields,
             partitionCols,
-            tableProperties)
+            tableProperties,
+            bucketFields)
 
           // get logical plan.
           CreateTable(tableModel)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index b02d467..3b87e41 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -46,5 +46,12 @@ class CarbonOption(options: Map[String, String]) {
 
   def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
 
+  def bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt
+
+  def bucketColumns: String = options.getOrElse("bucketcolumns", "")
+
+  def isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
+                                  options.contains("bucketnumber")
+
   def toMap: Map[String, String] = options
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 8a946c0..d03c90c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -24,7 +24,7 @@ import scala.language.implicitConversions
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.{CreateTable, Field}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field}
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DecimalType, StructType}
@@ -108,7 +108,7 @@ class CarbonSource extends CreatableRelationProvider
 
     val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
     val tableName: String = parameters.getOrElse("tableName", "default_table")
-
+    val options = new CarbonOption(parameters)
     try {
       CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession)
       CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
@@ -132,7 +132,15 @@ class CarbonSource extends CreatableRelationProvider
         }
         val map = scala.collection.mutable.Map[String, String]()
         parameters.foreach { x => map.put(x._1, x._2) }
-        val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
+        val bucketFields = {
+          if (options.isBucketingEnabled) {
+            Some(BucketFields(options.bucketColumns.split(","), options.bucketNumber))
+          } else {
+            None
+          }
+        }
+        val cm = TableCreator.prepareTableModel(false, Option(dbName),
+          tableName, fields, Nil, bucketFields, map)
         CreateTable(cm, false).run(sparkSession)
         CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
       case ex: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index 362c951..530e70e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -21,7 +21,7 @@ import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.mutable.{LinkedHashSet, Map}
 
-import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, ColumnProperty, Field, PartitionerField, TableModel}
 
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -442,6 +442,7 @@ object TableCreator {
   def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
                         , tableName: String, fields: Seq[Field],
                         partitionCols: Seq[PartitionerField],
+                        bucketFields: Option[BucketFields],
                         tableProperties: Map[String, String]): TableModel
   = {
 
@@ -483,7 +484,8 @@ object TableCreator {
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
       groupCols,
-      Some(colProps))
+      Some(colProps),
+      bucketFields)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index fe8bbe7..de768c0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -17,24 +17,28 @@
 
 package org.apache.spark.sql.execution
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.{AtomicType, IntegerType}
 
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
  * Carbon strategy for late decode (convert dictionary key to value as late as possible), which
@@ -248,20 +252,21 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       metadata: Map[String, String],
       needDecoder: ArrayBuffer[AttributeReference],
       updateRequestedColumns: Seq[AttributeReference]): DataSourceScanExec = {
+    val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
     if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) &&
         needDecoder.isEmpty) {
       BatchedDataSourceScanExec(
         output,
         scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
         relation.relation,
-        UnknownPartitioning(0),
+        getPartitioning(table.carbonTable, updateRequestedColumns),
         metadata,
         relation.metastoreTableIdentifier)
     } else {
       RowDataSourceScanExec(output,
         scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
         relation.relation,
-        UnknownPartitioning(0),
+        getPartitioning(table.carbonTable, updateRequestedColumns),
         metadata,
         relation.metastoreTableIdentifier)
     }
@@ -288,6 +293,35 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     }
   }
 
+  private def getPartitioning(carbonTable: CarbonTable,
+      output: Seq[AttributeReference]): Partitioning = {
+    val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+    if (info != null) {
+      val cols = info.getListOfColumns.asScala
+      val sortColumn = carbonTable.
+        getDimensionByTableName(carbonTable.getFactTableName).get(0).getColName
+      val numBuckets = info.getNumberOfBuckets
+      val bucketColumns = cols.flatMap { n =>
+        val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
+        attrRef match {
+          case Some(attr) =>
+            Some(AttributeReference(attr.name,
+              CarbonScalaUtil.convertCarbonToSparkDataType(n.getDataType),
+              attr.nullable,
+              attr.metadata)(attr.exprId, attr.qualifier))
+          case _ => None
+        }
+      }
+      if (bucketColumns.size == cols.size) {
+        HashPartitioning(bucketColumns, numBuckets)
+      } else {
+        UnknownPartitioning(0)
+      }
+    } else {
+      UnknownPartitioning(0)
+    }
+  }
+
 
   protected[sql] def selectFilters(
       relation: BaseRelation,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 5a91ad1..342cabc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -24,10 +24,11 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{ColTypeListContext, CreateTableContext, TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{CreateTable, Field, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types.DataType
 
+import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -132,13 +133,22 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
       if (!CommonUtil.validateTblProperties(properties.asJava.asScala, fields)) {
         throw new MalformedCarbonCommandException("Invalid table properties")
       }
+      val options = new CarbonOption(properties)
+      val bucketFields = {
+        if (options.isBucketingEnabled) {
+          Some(BucketFields(options.bucketColumns.split(","), options.bucketNumber))
+        } else {
+          None
+        }
+      }
       // prepare table model of the collected tokens
       val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
         name.database,
         name.table,
         fields,
         Seq(),
-        properties.asJava.asScala)
+        properties.asJava.asScala,
+        bucketFields)
 
       CreateTable(tableModel)
     } else {