You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/28 14:32:20 UTC

[2/3] incubator-carbondata git commit: Added partitioner

Added partitioner

Added bucketing in load

Added headers

Bucketing is handled in load and query flow

Fixed test case

Rebased with master

rebased

Added bucketing in spark layer

Rebased and fixed scala style

Added test cases for bucketing in all scenerios. And fixed review comments

rebased and fixed issues

Rebased and fixed comments

Rebased and fixed testcases

Rebased and fixed testcases

Fixed comments

Rebased

Fixed compilation issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/cbf87977
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/cbf87977
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/cbf87977

Branch: refs/heads/master
Commit: cbf8797776c2f3be48efe029d858b37a37d29848
Parents: 65b9221
Author: ravipesala <ra...@gmail.com>
Authored: Sun Nov 27 16:58:55 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Wed Dec 28 22:14:11 2016 +0800

----------------------------------------------------------------------
 .../carbon/datastore/SegmentTaskIndexStore.java |  89 +++++--
 .../ThriftWrapperSchemaConverterImpl.java       |  30 +++
 .../carbon/metadata/schema/BucketingInfo.java   |  49 ++++
 .../metadata/schema/table/CarbonTable.java      |  14 +
 .../metadata/schema/table/TableSchema.java      |  14 +
 .../core/carbon/path/CarbonTablePath.java       |  37 ++-
 .../carbondata/core/partition/Partitioner.java  |  26 ++
 .../partition/impl/HashPartitionerImpl.java     | 105 ++++++++
 .../core/util/CarbonMetadataUtil.java           |   4 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   5 +-
 .../datastore/SegmentTaskIndexStoreTest.java    |   8 +-
 .../CarbonFormatDirectoryStructureTest.java     |   4 +-
 .../core/util/CarbonMetadataUtilTest.java       |   3 +-
 format/src/main/thrift/carbondata_index.thrift  |   1 +
 format/src/main/thrift/schema.thrift            |   9 +
 .../carbondata/hadoop/CarbonInputFormat.java    |  22 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  18 ++
 .../hadoop/CarbonMultiBlockSplit.java           |  23 +-
 .../internal/index/impl/InMemoryBTreeIndex.java |   9 +-
 .../apache/carbondata/spark/CarbonOption.scala  |   7 +
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   2 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 146 ++++++----
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   7 +-
 .../execution/command/carbonTableSchema.scala   |  29 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  |  11 +-
 .../apache/carbondata/spark/CarbonOption.scala  |   7 +
 .../org/apache/spark/sql/CarbonSource.scala     |  14 +-
 .../org/apache/spark/sql/TableCreator.scala     |   6 +-
 .../execution/CarbonLateDecodeStrategy.scala    |  42 ++-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  14 +-
 .../bucketing/TableBucketingTestCase.scala      | 193 ++++++++++++++
 .../newflow/CarbonDataLoadConfiguration.java    |  11 +
 .../newflow/DataLoadProcessBuilder.java         |  29 ++
 .../processing/newflow/row/CarbonRow.java       |   2 +
 ...arallelReadMergeSorterWithBucketingImpl.java | 265 +++++++++++++++++++
 ...ConverterProcessorWithBucketingStepImpl.java | 189 +++++++++++++
 .../steps/DataWriterProcessorStepImpl.java      |  79 +++---
 .../newflow/steps/SortProcessorStepImpl.java    |  11 +-
 .../sortandgroupby/sortdata/SortParameters.java |  28 ++
 .../store/CarbonFactDataHandlerColumnar.java    |   4 +
 .../store/CarbonFactDataHandlerModel.java       |   9 +-
 .../store/SingleThreadFinalSortFilesMerger.java |   4 +
 .../store/writer/AbstractFactDataWriter.java    |   6 +-
 .../store/writer/CarbonDataWriterVo.java        |   9 +
 44 files changed, 1417 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
index e2218a8..6ab18bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.carbondata.core.carbon.datastore;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -57,7 +58,8 @@ public class SegmentTaskIndexStore {
    * reason of so many map as each segment can have multiple data file and
    * each file will have its own btree
    */
-  private Map<AbsoluteTableIdentifier, Map<String, Map<String, AbstractIndex>>> tableSegmentMap;
+  private Map<AbsoluteTableIdentifier,
+      Map<String, Map<TaskBucketHolder, AbstractIndex>>> tableSegmentMap;
 
   /**
    * map of block info to lock object map, while loading the btree this will be filled
@@ -76,7 +78,7 @@ public class SegmentTaskIndexStore {
 
   private SegmentTaskIndexStore() {
     tableSegmentMap =
-        new ConcurrentHashMap<AbsoluteTableIdentifier, Map<String, Map<String, AbstractIndex>>>(
+        new ConcurrentHashMap<>(
             CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     tableLockMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Object>(
         CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -103,26 +105,26 @@ public class SegmentTaskIndexStore {
    * @return map of taks id to segment mapping
    * @throws IndexBuilderException
    */
-  public Map<String, AbstractIndex> loadAndGetTaskIdToSegmentsMap(
+  public Map<TaskBucketHolder, AbstractIndex> loadAndGetTaskIdToSegmentsMap(
       Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException {
     // task id to segment map
-    Map<String, AbstractIndex> taskIdToTableSegmentMap =
-        new HashMap<String, AbstractIndex>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    Map<TaskBucketHolder, AbstractIndex> taskIdToTableSegmentMap =
+        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     addLockObject(absoluteTableIdentifier);
     Iterator<Entry<String, List<TableBlockInfo>>> iteratorOverSegmentBlocksInfos =
         segmentToTableBlocksInfos.entrySet().iterator();
-    Map<String, Map<String, AbstractIndex>> tableSegmentMapTemp =
+    Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegmentMapTemp =
         addTableSegmentMap(absoluteTableIdentifier);
-    Map<String, AbstractIndex> taskIdToSegmentIndexMap = null;
+    Map<TaskBucketHolder, AbstractIndex> taskIdToSegmentIndexMap = null;
     String segmentId = null;
-    String taskId = null;
+    TaskBucketHolder taskId = null;
     try {
       while (iteratorOverSegmentBlocksInfos.hasNext()) {
         // segment id to table block mapping
         Entry<String, List<TableBlockInfo>> next = iteratorOverSegmentBlocksInfos.next();
         // group task id to table block info mapping for the segment
-        Map<String, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
+        Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
             mappedAndGetTaskIdToTableBlockInfo(segmentToTableBlocksInfos);
         // get the existing map of task id to table segment map
         segmentId = next.getKey();
@@ -142,11 +144,11 @@ public class SegmentTaskIndexStore {
             taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId);
             if (null == taskIdToSegmentIndexMap) {
               // creating a map of task id to table segment
-              taskIdToSegmentIndexMap = new ConcurrentHashMap<String, AbstractIndex>();
-              Iterator<Entry<String, List<TableBlockInfo>>> iterator =
+              taskIdToSegmentIndexMap = new ConcurrentHashMap<TaskBucketHolder, AbstractIndex>();
+              Iterator<Entry<TaskBucketHolder, List<TableBlockInfo>>> iterator =
                   taskIdToTableBlockInfoMap.entrySet().iterator();
               while (iterator.hasNext()) {
-                Entry<String, List<TableBlockInfo>> taskToBlockInfoList = iterator.next();
+                Entry<TaskBucketHolder, List<TableBlockInfo>> taskToBlockInfoList = iterator.next();
                 taskId = taskToBlockInfoList.getKey();
                 taskIdToSegmentIndexMap.put(taskId,
                     loadBlocks(taskId, taskToBlockInfoList.getValue(), absoluteTableIdentifier));
@@ -207,18 +209,18 @@ public class SegmentTaskIndexStore {
    * @param absoluteTableIdentifier
    * @return table segment map
    */
-  private Map<String, Map<String, AbstractIndex>> addTableSegmentMap(
+  private Map<String, Map<TaskBucketHolder, AbstractIndex>> addTableSegmentMap(
       AbsoluteTableIdentifier absoluteTableIdentifier) {
     // get the instance of lock object
     Object lockObject = tableLockMap.get(absoluteTableIdentifier);
-    Map<String, Map<String, AbstractIndex>> tableSegmentMapTemp =
+    Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegmentMapTemp =
         tableSegmentMap.get(absoluteTableIdentifier);
     if (null == tableSegmentMapTemp) {
       synchronized (lockObject) {
         // segment id to task id to table segment map
         tableSegmentMapTemp = tableSegmentMap.get(absoluteTableIdentifier);
         if (null == tableSegmentMapTemp) {
-          tableSegmentMapTemp = new ConcurrentHashMap<String, Map<String, AbstractIndex>>();
+          tableSegmentMapTemp = new ConcurrentHashMap<>();
           tableSegmentMap.put(absoluteTableIdentifier, tableSegmentMapTemp);
         }
       }
@@ -233,12 +235,13 @@ public class SegmentTaskIndexStore {
    * @return loaded segment
    * @throws CarbonUtilException
    */
-  private AbstractIndex loadBlocks(String taskId, List<TableBlockInfo> tableBlockInfoList,
+  private AbstractIndex loadBlocks(TaskBucketHolder holder, List<TableBlockInfo> tableBlockInfoList,
       AbsoluteTableIdentifier tableIdentifier) throws CarbonUtilException {
     // all the block of one task id will be loaded together
     // so creating a list which will have all the data file meta data to of one task
-    List<DataFileFooter> footerList =
-        CarbonUtil.readCarbonIndexFile(taskId, tableBlockInfoList, tableIdentifier);
+    List<DataFileFooter> footerList = CarbonUtil
+        .readCarbonIndexFile(holder.taskNo, holder.bucketNumber, tableBlockInfoList,
+            tableIdentifier);
     AbstractIndex segment = new SegmentTaskIndex();
     // file path of only first block is passed as it all table block info path of
     // same task id will be same
@@ -253,10 +256,10 @@ public class SegmentTaskIndexStore {
    * @param segmentToTableBlocksInfos segment if to table blocks info map
    * @return task id to table block info mapping
    */
-  private Map<String, List<TableBlockInfo>> mappedAndGetTaskIdToTableBlockInfo(
+  private Map<TaskBucketHolder, List<TableBlockInfo>> mappedAndGetTaskIdToTableBlockInfo(
       Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos) {
-    Map<String, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
-        new ConcurrentHashMap<String, List<TableBlockInfo>>();
+    Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
+        new ConcurrentHashMap<>();
     Iterator<Entry<String, List<TableBlockInfo>>> iterator =
         segmentToTableBlocksInfos.entrySet().iterator();
     while (iterator.hasNext()) {
@@ -264,10 +267,12 @@ public class SegmentTaskIndexStore {
       List<TableBlockInfo> value = next.getValue();
       for (TableBlockInfo blockInfo : value) {
         String taskNo = DataFileUtil.getTaskNo(blockInfo.getFilePath());
-        List<TableBlockInfo> list = taskIdToTableBlockInfoMap.get(taskNo);
+        String bucketNo = DataFileUtil.getBucketNo(blockInfo.getFilePath());
+        TaskBucketHolder bucketHolder = new TaskBucketHolder(taskNo, bucketNo);
+        List<TableBlockInfo> list = taskIdToTableBlockInfoMap.get(bucketHolder);
         if (null == list) {
           list = new ArrayList<TableBlockInfo>();
-          taskIdToTableBlockInfoMap.put(taskNo, list);
+          taskIdToTableBlockInfoMap.put(bucketHolder, list);
         }
         list.add(blockInfo);
       }
@@ -304,7 +309,8 @@ public class SegmentTaskIndexStore {
       return;
     }
     // Acquire the lock and remove only those instance which was loaded
-    Map<String, Map<String, AbstractIndex>> map = tableSegmentMap.get(absoluteTableIdentifier);
+    Map<String, Map<TaskBucketHolder, AbstractIndex>> map =
+        tableSegmentMap.get(absoluteTableIdentifier);
     // if there is no loaded blocks then return
     if (null == map) {
       return;
@@ -322,13 +328,44 @@ public class SegmentTaskIndexStore {
    * @param segmentId
    * @return is loaded then return the loaded blocks otherwise null
    */
-  public Map<String, AbstractIndex> getSegmentBTreeIfExists(
+  public Map<TaskBucketHolder, AbstractIndex> getSegmentBTreeIfExists(
       AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) {
-    Map<String, Map<String, AbstractIndex>> tableSegment =
+    Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegment =
         tableSegmentMap.get(absoluteTableIdentifier);
     if (null == tableSegment) {
       return null;
     }
     return tableSegment.get(segmentId);
   }
+
+  public static class TaskBucketHolder implements Serializable {
+
+    public String taskNo;
+
+    public String bucketNumber;
+
+    public TaskBucketHolder(String taskNo, String bucketNumber) {
+      this.taskNo = taskNo;
+      this.bucketNumber = bucketNumber;
+    }
+
+    @Override public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      TaskBucketHolder that = (TaskBucketHolder) o;
+
+      if (taskNo != null ? !taskNo.equals(that.taskNo) : that.taskNo != null) return false;
+      return bucketNumber != null ?
+          bucketNumber.equals(that.bucketNumber) :
+          that.bucketNumber == null;
+
+    }
+
+    @Override public int hashCode() {
+      int result = taskNo != null ? taskNo.hashCode() : 0;
+      result = 31 * result + (bucketNumber != null ? bucketNumber.hashCode() : 0);
+      return result;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 7d5386e..4b2be5f 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolutionEntry;
 import org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo;
@@ -190,9 +191,24 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         new org.apache.carbondata.format.TableSchema(
             wrapperTableSchema.getTableId(), thriftColumnSchema, schemaEvolution);
     externalTableSchema.setTableProperties(wrapperTableSchema.getTableProperties());
+    if (wrapperTableSchema.getBucketingInfo() != null) {
+      externalTableSchema.setBucketingInfo(
+          fromWrapperToExternalBucketingInfo(wrapperTableSchema.getBucketingInfo()));
+    }
     return externalTableSchema;
   }
 
+  private org.apache.carbondata.format.BucketingInfo fromWrapperToExternalBucketingInfo(
+      BucketingInfo bucketingInfo) {
+    List<org.apache.carbondata.format.ColumnSchema> thriftColumnSchema =
+        new ArrayList<org.apache.carbondata.format.ColumnSchema>();
+    for (ColumnSchema wrapperColumnSchema : bucketingInfo.getListOfColumns()) {
+      thriftColumnSchema.add(fromWrapperToExternalColumnSchema(wrapperColumnSchema));
+    }
+    return new org.apache.carbondata.format.BucketingInfo(thriftColumnSchema,
+        bucketingInfo.getNumberOfBuckets());
+  }
+
   /* (non-Javadoc)
    * convert from wrapper to external tableinfo
    */
@@ -365,9 +381,23 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
     wrapperTableSchema.setListOfColumns(listOfColumns);
     wrapperTableSchema.setSchemaEvalution(
         fromExternalToWrapperSchemaEvolution(externalTableSchema.getSchema_evolution()));
+    if (externalTableSchema.isSetBucketingInfo()) {
+      wrapperTableSchema.setBucketingInfo(
+          fromExternalToWarpperBucketingInfo(externalTableSchema.bucketingInfo));
+    }
     return wrapperTableSchema;
   }
 
+  private BucketingInfo fromExternalToWarpperBucketingInfo(
+      org.apache.carbondata.format.BucketingInfo externalBucketInfo) {
+    List<ColumnSchema> listOfColumns = new ArrayList<ColumnSchema>();
+    for (org.apache.carbondata.format.ColumnSchema externalColumnSchema :
+          externalBucketInfo.table_columns) {
+      listOfColumns.add(fromExternalToWrapperColumnSchema(externalColumnSchema));
+    }
+    return new BucketingInfo(listOfColumns, externalBucketInfo.number_of_buckets);
+  }
+
   /* (non-Javadoc)
    * convert from external to wrapper tableinfo
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java
new file mode 100644
index 0000000..75c888d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.carbon.metadata.schema;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * Bucketing information
+ */
+public class BucketingInfo implements Serializable {
+
+  private List<ColumnSchema> listOfColumns;
+
+  private int numberOfBuckets;
+
+  public BucketingInfo(List<ColumnSchema> listOfColumns, int numberOfBuckets) {
+    this.listOfColumns = listOfColumns;
+    this.numberOfBuckets = numberOfBuckets;
+  }
+
+  public List<ColumnSchema> getListOfColumns() {
+    return listOfColumns;
+  }
+
+  public int getNumberOfBuckets() {
+    return numberOfBuckets;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
index d3e2e62..7766616 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
@@ -72,6 +73,11 @@ public class CarbonTable implements Serializable {
   private Map<String, List<CarbonMeasure>> tableMeasuresMap;
 
   /**
+   * table bucket map.
+   */
+  private Map<String, BucketingInfo> tableBucketMap;
+
+  /**
    * tableUniqueName
    */
   private String tableUniqueName;
@@ -99,6 +105,7 @@ public class CarbonTable implements Serializable {
   public CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableMeasuresMap = new HashMap<String, List<CarbonMeasure>>();
+    this.tableBucketMap = new HashMap<>();
     this.aggregateTablesName = new ArrayList<String>();
     this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
   }
@@ -124,7 +131,10 @@ public class CarbonTable implements Serializable {
     for (TableSchema aggTable : aggregateTableList) {
       this.aggregateTablesName.add(aggTable.getTableName());
       fillDimensionsAndMeasuresForTables(aggTable);
+      tableBucketMap.put(aggTable.getTableName(), aggTable.getBucketingInfo());
     }
+    tableBucketMap.put(tableInfo.getFactTable().getTableName(),
+        tableInfo.getFactTable().getBucketingInfo());
   }
 
   /**
@@ -474,6 +484,10 @@ public class CarbonTable implements Serializable {
     return null;
   }
 
+  public BucketingInfo getBucketingInfo(String tableName) {
+    return tableBucketMap.get(tableName);
+  }
+
   /**
    * @return absolute table identifier
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java
index 348f235..9beeff2 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -62,6 +63,11 @@ public class TableSchema implements Serializable {
    */
   private Map<String, String> tableProperties;
 
+  /**
+   * Information about bucketing of fields and number of buckets
+   */
+  private BucketingInfo bucketingInfo;
+
   public TableSchema() {
     this.listOfColumns = new ArrayList<ColumnSchema>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   }
@@ -202,4 +208,12 @@ public class TableSchema implements Serializable {
   public void setTableProperties(Map<String, String> tableProperties) {
     this.tableProperties = tableProperties;
   }
+
+  public BucketingInfo getBucketingInfo() {
+    return bucketingInfo;
+  }
+
+  public void setBucketingInfo(BucketingInfo bucketingInfo) {
+    this.bucketingInfo = bucketingInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
index 54e7266..cda971a 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
@@ -215,9 +215,9 @@ public class CarbonTablePath extends Path {
    * @return absolute path of data file stored in carbon data format
    */
   public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo,
-      Integer taskNo, String factUpdateTimeStamp) {
+      Integer taskNo, int bucketNumber, String factUpdateTimeStamp) {
     return getSegmentDir(partitionId, segmentId) + File.separator + getCarbonDataFileName(
-        filePartNo, taskNo, factUpdateTimeStamp);
+        filePartNo, taskNo, bucketNumber, factUpdateTimeStamp);
   }
 
   /**
@@ -230,14 +230,15 @@ public class CarbonTablePath extends Path {
    * @return full qualified carbon index path
    */
   public String getCarbonIndexFilePath(final String taskId, final String partitionId,
-      final String segmentId) {
+      final String segmentId, final String bucketNumber) {
     String segmentDir = getSegmentDir(partitionId, segmentId);
     CarbonFile carbonFile =
         FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
 
     CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
-        return file.getName().startsWith(taskId) && file.getName().endsWith(INDEX_FILE_EXT);
+        return file.getName().startsWith(taskId + "-" + bucketNumber) && file.getName()
+            .endsWith(INDEX_FILE_EXT);
       }
     });
     return files[0].getAbsolutePath();
@@ -262,10 +263,10 @@ public class CarbonTablePath extends Path {
    * @param factUpdateTimeStamp unique identifier to identify an update
    * @return gets data file name only with out path
    */
-  public String getCarbonDataFileName(Integer filePartNo, Integer taskNo,
+  public String getCarbonDataFileName(Integer filePartNo, Integer taskNo, int bucketNumber,
       String factUpdateTimeStamp) {
-    return DATA_PART_PREFIX + "-" + filePartNo + "-" + taskNo + "-" + factUpdateTimeStamp
-        + CARBON_DATA_EXT;
+    return DATA_PART_PREFIX + "-" + filePartNo + "-" + taskNo + "-" + bucketNumber + "-"
+        + factUpdateTimeStamp + CARBON_DATA_EXT;
   }
 
   /**
@@ -275,8 +276,8 @@ public class CarbonTablePath extends Path {
    * @param factUpdatedTimeStamp time stamp
    * @return filename
    */
-  public String getCarbonIndexFileName(int taskNo, String factUpdatedTimeStamp) {
-    return taskNo + "-" + factUpdatedTimeStamp + INDEX_FILE_EXT;
+  public String getCarbonIndexFileName(int taskNo, int bucketNumber, String factUpdatedTimeStamp) {
+    return taskNo + "-" + bucketNumber + "-" + factUpdatedTimeStamp + INDEX_FILE_EXT;
   }
 
   private String getSegmentDir(String partitionId, String segmentId) {
@@ -352,6 +353,24 @@ public class CarbonTablePath extends Path {
     }
 
     /**
+     * gets updated timestamp information from given carbon data file name
+     */
+    public static String getBucketNo(String carbonFilePath) {
+      // Get the file name from path
+      String fileName = getFileName(carbonFilePath);
+      // + 1 for size of "-"
+      int firstDashPos = fileName.indexOf("-");
+      int secondDash = fileName.indexOf("-", firstDashPos + 1);
+      int startIndex = fileName.indexOf("-", secondDash + 1) + 1;
+      int endIndex = fileName.indexOf("-", startIndex);
+      // to support backward compatibility
+      if (startIndex == -1 || endIndex == -1) {
+        return "0";
+      }
+      return fileName.substring(startIndex, endIndex);
+    }
+
+    /**
      * Gets the file name from file path
      */
     private static String getFileName(String carbonDataFileName) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java b/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java
new file mode 100644
index 0000000..1907687
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.partition;
+
+/**
+ * Partitions the data as per key
+ */
+public interface Partitioner<Key> {
+
+  int getPartition(Key key);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java b/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java
new file mode 100644
index 0000000..a702a6b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.partition.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.partition.Partitioner;
+
+/**
+ * Hash partitioner implementation
+ */
+public class HashPartitionerImpl implements Partitioner<Object[]> {
+
+  private int numberOfBuckets;
+
+  private Hash[] hashes;
+
+  public HashPartitionerImpl(List<Integer> indexes, List<ColumnSchema> columnSchemas,
+      int numberOfBuckets) {
+    this.numberOfBuckets = numberOfBuckets;
+    hashes = new Hash[indexes.size()];
+    for (int i = 0; i < indexes.size(); i++) {
+      switch(columnSchemas.get(i).getDataType()) {
+        case SHORT:
+        case INT:
+        case LONG:
+          hashes[i] = new IntegralHash(indexes.get(i));
+          break;
+        case DOUBLE:
+        case FLOAT:
+        case DECIMAL:
+          hashes[i] = new DecimalHash(indexes.get(i));
+          break;
+        default:
+          hashes[i] = new StringHash(indexes.get(i));
+      }
+    }
+  }
+
+  @Override public int getPartition(Object[] objects) {
+    int hashCode = 0;
+    for (Hash hash : hashes) {
+      hashCode += hash.getHash(objects);
+    }
+    return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
+  }
+
+  private interface Hash {
+    int getHash(Object[] value);
+  }
+
+  private static class IntegralHash implements Hash {
+
+    private int index;
+
+    private IntegralHash(int index) {
+      this.index = index;
+    }
+
+    public int getHash(Object[] value) {
+      return value[index] != null ? Long.valueOf(value[index].toString()).hashCode() : 0;
+    }
+  }
+
+  private static class DecimalHash implements Hash {
+
+    private int index;
+
+    private DecimalHash(int index) {
+      this.index = index;
+    }
+
+    public int getHash(Object[] value) {
+      return value[index] != null ? Double.valueOf(value[index].toString()).hashCode() : 0;
+    }
+  }
+
+  private static class StringHash implements Hash {
+
+    private int index;
+
+    private StringHash(int index) {
+      this.index = index;
+    }
+
+    @Override public int getHash(Object[] value) {
+      return value[index] != null ? value[index].hashCode() : 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 747862f..3415d92 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -467,7 +467,7 @@ public class CarbonMetadataUtil {
    * @return Index header object
    */
   public static IndexHeader getIndexHeader(int[] columnCardinality,
-      List<ColumnSchema> columnSchemaList) {
+      List<ColumnSchema> columnSchemaList, int bucketNumber) {
     // create segment info object
     SegmentInfo segmentInfo = new SegmentInfo();
     // set the number of columns
@@ -482,6 +482,8 @@ public class CarbonMetadataUtil {
     indexHeader.setSegment_info(segmentInfo);
     // set the column names
     indexHeader.setTable_columns(columnSchemaList);
+    // set the bucket number
+    indexHeader.setBucket_id(bucketNumber);
     return indexHeader;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 3980cb3..fbbed76 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1093,7 +1093,7 @@ public final class CarbonUtil {
    * @return list of block info
    * @throws CarbonUtilException if any problem while reading
    */
-  public static List<DataFileFooter> readCarbonIndexFile(String taskId,
+  public static List<DataFileFooter> readCarbonIndexFile(String taskId, String bucketNumber,
       List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier)
       throws CarbonUtilException {
     // need to sort the  block info list based for task in ascending  order so
@@ -1105,7 +1105,8 @@ public final class CarbonUtil {
     // geting the index file path
     //TODO need to pass proper partition number when partiton will be supported
     String carbonIndexFilePath = carbonTablePath
-        .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId());
+        .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(),
+            bucketNumber);
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
     try {
       // read the index info and return

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
index 7cb213c..722d030 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
@@ -90,6 +90,7 @@ public class SegmentTaskIndexStoreTest {
 
     new MockUp<CarbonUtil>() {
       @Mock List<DataFileFooter> readCarbonIndexFile(String taskId,
+          String bucketNumber,
           List<TableBlockInfo> tableBlockInfoList,
           AbsoluteTableIdentifier absoluteTableIdentifier) {
         return getDataFileFooters();
@@ -101,18 +102,17 @@ public class SegmentTaskIndexStoreTest {
       }
     };
 
-    Map<String, AbstractIndex> result =
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> result =
         taskIndexStore.loadAndGetTaskIdToSegmentsMap(new HashMap<String, List<TableBlockInfo>>() {{
           put("SG100", Arrays.asList(tableBlockInfo));
         }}, absoluteTableIdentifier);
 
     assertEquals(result.size(), 1);
-    assertTrue(result.containsKey(new String("100")));
-
+    assertTrue(result.containsKey(new SegmentTaskIndexStore.TaskBucketHolder("100", "0")));
   }
 
   @Test public void checkExistenceOfSegmentBTree() {
-    Map<String, AbstractIndex> result =
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> result =
         taskIndexStore.getSegmentBTreeIfExists(absoluteTableIdentifier, "SG100");
     assertNull(result);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java
index 437a13f..95dd5d7 100644
--- a/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java
@@ -53,8 +53,8 @@ public class CarbonFormatDirectoryStructureTest {
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta"));
     assertTrue(carbonTablePath.getSortIndexFilePath("t1_c1").replace("\\", "/")
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex"));
-    assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4, "999").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4-999.carbondata"));
+    assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4, 0, "999").replace("\\", "/")
+        .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4-0-999.carbondata"));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index b3647a8..b7d3a01 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -160,7 +160,8 @@ public class CarbonMetadataUtilTest {
     indexHeader.setVersion(2);
     indexHeader.setSegment_info(segmentInfo);
     indexHeader.setTable_columns(columnSchemaList);
-    IndexHeader indexheaderResult = getIndexHeader(columnCardinality, columnSchemaList);
+    indexHeader.setBucket_id(0);
+    IndexHeader indexheaderResult = getIndexHeader(columnCardinality, columnSchemaList, 0);
     assertEquals(indexHeader, indexheaderResult);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/format/src/main/thrift/carbondata_index.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata_index.thrift b/format/src/main/thrift/carbondata_index.thrift
index e5fda5d..364a7e5 100644
--- a/format/src/main/thrift/carbondata_index.thrift
+++ b/format/src/main/thrift/carbondata_index.thrift
@@ -32,6 +32,7 @@ struct IndexHeader{
   1: required i32 version; // version used for data compatibility
   2: required list<schema.ColumnSchema> table_columns;	// Description of columns in this file
   3: required carbondata.SegmentInfo segment_info;	// Segment info (will be same/repeated for all files in this segment)
+  4: optional i32 bucket_id; //bucket number in which file contains
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/format/src/main/thrift/schema.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index 377c372..775573c 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -122,6 +122,14 @@ struct SchemaEvolution{
 }
 
 /**
+* Bucketing information of fields on table
+**/
+struct BucketingInfo{
+  1: required list<ColumnSchema> table_columns;
+  2: required i32 number_of_buckets;
+}
+
+/**
 * The description of table schema
 */
 struct TableSchema{
@@ -129,6 +137,7 @@ struct TableSchema{
 	2: required list<ColumnSchema> table_columns; // Columns in the table
 	3: required SchemaEvolution schema_evolution; // History of schema evolution of this table
   4: optional map<string,string> tableProperties; // table properties configured bu the user
+  5: optional BucketingInfo bucketingInfo; // bucketing information
 }
 
 struct TableInfo{

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index aa995fa..b69df86 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -42,7 +42,9 @@ import org.apache.carbondata.core.carbon.datastore.impl.btree.BlockBTreeLeafNode
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.carbon.path.CarbonStorePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.carbon.querystatistics.*;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatistic;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsConstants;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
@@ -319,7 +321,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       String segmentId) throws IndexBuilderException, IOException {
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
-    Map<String, AbstractIndex> segmentIndexMap =
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
         getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId);
 
     List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
@@ -379,10 +381,20 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return tableBlockInfoList;
   }
 
-  private Map<String, AbstractIndex> getSegmentAbstractIndexs(JobContext job,
-      AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId)
+  /**
+   * It returns index for each task file.
+   * @param job
+   * @param absoluteTableIdentifier
+   * @param segmentId
+   * @return
+   * @throws IOException
+   * @throws IndexBuilderException
+   */
+  private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
+      JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId)
       throws IOException, IndexBuilderException {
-    Map<String, AbstractIndex> segmentIndexMap = SegmentTaskIndexStore.getInstance()
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+        SegmentTaskIndexStore.getInstance()
         .getSegmentBTreeIfExists(absoluteTableIdentifier, segmentId);
 
     // if segment tree is not loaded, load the segment tree

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 8b87cad..a4acd9c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -45,7 +45,10 @@ public class CarbonInputSplit extends FileSplit
 
   private static final long serialVersionUID = 3520344046772190207L;
   public String taskId;
+
   private String segmentId;
+
+  private String bucketId;
   /*
    * Invalid segments that need to be removed in task side index
    */
@@ -61,6 +64,7 @@ public class CarbonInputSplit extends FileSplit
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
+    bucketId = "0";
     numberOfBlocklets = 0;
     invalidSegments = new ArrayList<>();
     version = CarbonProperties.getInstance().getFormatVersion();
@@ -71,6 +75,7 @@ public class CarbonInputSplit extends FileSplit
     super(path, start, length, locations);
     this.segmentId = segmentId;
     this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
+    this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName());
     this.invalidSegments = new ArrayList<>();
     this.version = version;
   }
@@ -124,6 +129,7 @@ public class CarbonInputSplit extends FileSplit
     super.readFields(in);
     this.segmentId = in.readUTF();
     this.version = ColumnarFormatVersion.valueOf(in.readShort());
+    this.bucketId = in.readUTF();
     int numInvalidSegment = in.readInt();
     invalidSegments = new ArrayList<>(numInvalidSegment);
     for (int i = 0; i < numInvalidSegment; i++) {
@@ -135,6 +141,7 @@ public class CarbonInputSplit extends FileSplit
     super.write(out);
     out.writeUTF(segmentId);
     out.writeShort(version.number());
+    out.writeUTF(bucketId);
     out.writeInt(invalidSegments.size());
     for (String invalidSegment : invalidSegments) {
       out.writeUTF(invalidSegment);
@@ -166,6 +173,10 @@ public class CarbonInputSplit extends FileSplit
     this.version = version;
   }
 
+  public String getBucketId() {
+    return bucketId;
+  }
+
   @Override public int compareTo(Distributable o) {
     CarbonInputSplit other = (CarbonInputSplit) o;
     int compareResult = 0;
@@ -193,6 +204,13 @@ public class CarbonInputSplit extends FileSplit
       if (firstTaskId != otherTaskId) {
         return firstTaskId - otherTaskId;
       }
+
+      int firstBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath1));
+      int otherBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath2));
+      if (firstBucketNo != otherBucketNo) {
+        return firstBucketNo - otherBucketNo;
+      }
+
       // compare the part no of both block info
       int firstPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath1));
       int SecondPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath2));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index a13d6ba..26b5252 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -42,19 +42,19 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
   private List<CarbonInputSplit> splitList;
 
   /*
-   * The location of all wrapped splits belong to the same node
+   * The locations of all wrapped splits
    */
-  private String location;
+  private String[] locations;
 
   public CarbonMultiBlockSplit() {
     splitList = null;
-    location = null;
+    locations = null;
   }
 
   public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList,
-      String location) throws IOException {
+      String[] locations) throws IOException {
     this.splitList = splitList;
-    this.location = location;
+    this.locations = locations;
   }
 
   /**
@@ -76,7 +76,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
 
   @Override
   public String[] getLocations() throws IOException, InterruptedException {
-    return new String[]{location};
+    return locations;
   }
 
   @Override
@@ -86,7 +86,10 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
     for (CarbonInputSplit split: splitList) {
       split.write(out);
     }
-    out.writeUTF(location);
+    out.writeInt(locations.length);
+    for (int i = 0; i < locations.length; i++) {
+      out.writeUTF(locations[i]);
+    }
   }
 
   @Override
@@ -99,7 +102,11 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
       split.readFields(in);
       splitList.add(split);
     }
-    location = in.readUTF();
+    int len = in.readInt();
+    locations = new String[len];
+    for (int i = 0; i < len; i++) {
+      locations[i] = in.readUTF();
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
index c238e10..82bcf1c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
@@ -100,10 +100,10 @@ class InMemoryBTreeIndex implements Index {
     return result;
   }
 
-  private Map<String, AbstractIndex> getSegmentAbstractIndexs(JobContext job,
-      AbsoluteTableIdentifier identifier)
+  private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
+      JobContext job, AbsoluteTableIdentifier identifier)
       throws IOException, IndexBuilderException {
-    Map<String, AbstractIndex> segmentIndexMap =
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
         SegmentTaskIndexStore.getInstance().getSegmentBTreeIfExists(identifier, segment.getId());
 
     // if segment tree is not loaded, load the segment tree
@@ -153,7 +153,8 @@ class InMemoryBTreeIndex implements Index {
 
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
-    Map<String, AbstractIndex> segmentIndexMap = getSegmentAbstractIndexs(job, identifier);
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+        getSegmentAbstractIndexs(job, identifier);
 
     List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 213712e..c63b43d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -46,5 +46,12 @@ class CarbonOption(options: Map[String, String]) {
 
   def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
 
+  def bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt
+
+  def bucketColumns: String = options.getOrElse("bucketcolumns", "")
+
+  def isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
+                                    options.contains("bucketnumber")
+
   def toMap: Map[String, String] = options
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index ccaf9e3..99dc853 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -302,7 +302,7 @@ class CarbonMergerRDD[K, V](
       }
       if (blockletCount != 0) {
         val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
-          carbonInputSplits.asJava, nodeName)
+          carbonInputSplits.asJava, Array(nodeName))
         result.add(new CarbonSparkPartition(id, i, multiBlockSplit))
         i += 1
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 2705f94..5d972bf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -65,6 +65,8 @@ class CarbonScanRDD(
 
   private val readSupport = SparkReadSupport.readSupportClass
 
+  private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+
   @transient private val jobId = new JobID(jobTrackerId, id)
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
@@ -95,36 +97,54 @@ class CarbonScanRDD(
     var noOfTasks = 0
 
     if (!splits.isEmpty) {
-      // create a list of block based on split
-      val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
-
-      // get the list of executors and map blocks to executors based on locality
-      val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
-
-      // divide the blocks among the tasks of the nodes as per the data locality
-      val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
-        parallelism, activeNodes.toList.asJava)
 
       statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
       statisticRecorder.recordStatisticsForDriver(statistic, queryId)
       statistic = new QueryStatistic()
 
-      var i = 0
-      // Create Spark Partition for each task and assign blocks
-      nodeBlockMapping.asScala.foreach { case (node, blockList) =>
-        blockList.asScala.foreach { blocksPerTask =>
-          val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
-          if (blocksPerTask.size() != 0) {
-            val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node)
-            val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
-            result.add(partition)
-            i += 1
+      // If bucketing is enabled on table then partitions should be grouped based on buckets.
+      if (bucketedTable != null) {
+        var i = 0
+        val bucketed =
+          splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => f.getBucketId)
+        (0 until bucketedTable.getNumberOfBuckets).map { bucketId =>
+          val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil)
+          val multiBlockSplit =
+            new CarbonMultiBlockSplit(identifier,
+              bucketPartitions.asJava,
+              bucketPartitions.flatMap(_.getLocations).toArray)
+          val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+          i += 1
+          result.add(partition)
+        }
+      } else {
+        // create a list of block based on split
+        val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
+
+        // get the list of executors and map blocks to executors based on locality
+        val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
+
+        // divide the blocks among the tasks of the nodes as per the data locality
+        val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
+          parallelism, activeNodes.toList.asJava)
+        var i = 0
+        // Create Spark Partition for each task and assign blocks
+        nodeBlockMapping.asScala.foreach { case (node, blockList) =>
+          blockList.asScala.foreach { blocksPerTask =>
+            val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
+            if (blocksPerTask.size() != 0) {
+              val multiBlockSplit =
+                new CarbonMultiBlockSplit(identifier, splits.asJava, Array(node))
+              val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+              result.add(partition)
+              i += 1
+            }
           }
         }
+        noOfNodes = nodeBlockMapping.size
       }
 
       noOfBlocks = splits.size
-      noOfNodes = nodeBlockMapping.size
       noOfTasks = result.size()
 
       statistic = new QueryStatistic()
@@ -155,58 +175,68 @@ class CarbonScanRDD(
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
     val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
     val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
-    val model = format.getQueryModel(inputSplit, attemptContext)
-    val reader = {
-      if (vectorReader) {
-        val carbonRecordReader = createVectorizedCarbonRecordReader(model)
-        if (carbonRecordReader == null) {
-          new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration))
+    val iterator = if (inputSplit.getAllSplits.size() > 0) {
+      val model = format.getQueryModel(inputSplit, attemptContext)
+      val reader = {
+        if (vectorReader) {
+          val carbonRecordReader = createVectorizedCarbonRecordReader(model)
+          if (carbonRecordReader == null) {
+            new CarbonRecordReader(model,
+              format.getReadSupportClass(attemptContext.getConfiguration))
+          } else {
+            carbonRecordReader
+          }
         } else {
-          carbonRecordReader
+          new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration))
         }
-      } else {
-        new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration))
       }
-    }
 
-    reader.initialize(inputSplit, attemptContext)
+      reader.initialize(inputSplit, attemptContext)
+      val queryStartTime = System.currentTimeMillis
 
-    val queryStartTime = System.currentTimeMillis
+      new Iterator[Any] {
+        private var havePair = false
+        private var finished = false
+        private var count = 0
 
-    val iterator = new Iterator[Any] {
-      private var havePair = false
-      private var finished = false
-      private var count = 0
-
-      context.addTaskCompletionListener { context =>
-        logStatistics(queryStartTime, count)
-        reader.close()
-      }
+        context.addTaskCompletionListener { context =>
+          logStatistics(queryStartTime, count)
+          reader.close()
+        }
 
-      override def hasNext: Boolean = {
-        if (context.isInterrupted) {
-          throw new TaskKilledException
+        override def hasNext: Boolean = {
+          if (context.isInterrupted) {
+            throw new TaskKilledException
+          }
+          if (!finished && !havePair) {
+            finished = !reader.nextKeyValue
+            if (finished) {
+              reader.close()
+            }
+            havePair = !finished
+          }
+          !finished
         }
-        if (!finished && !havePair) {
-          finished = !reader.nextKeyValue
-          if (finished) {
-            reader.close()
+
+        override def next(): Any = {
+          if (!hasNext) {
+            throw new java.util.NoSuchElementException("End of stream")
           }
-          havePair = !finished
+          havePair = false
+          val value = reader.getCurrentValue
+          count += 1
+          value
         }
-        !finished
       }
+    } else {
+      new Iterator[Any] {
+        override def hasNext: Boolean = false
 
-      override def next(): Any = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        val value = reader.getCurrentValue
-        count += 1
-        value
+        override def next(): Any = throw new java.util.NoSuchElementException("End of stream")
       }
     }
+
+
     iterator.asInstanceOf[Iterator[InternalRow]]
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index a5088df..461633d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -223,8 +223,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
       , tableName: String, fields: Seq[Field],
       partitionCols: Seq[PartitionerField],
-      tableProperties: Map[String, String]): TableModel
-  = {
+      tableProperties: Map[String, String],
+      bucketFields: Option[BucketFields]): TableModel = {
 
     fields.zipWithIndex.foreach { x =>
       x._1.schemaOrdinal = x._2
@@ -268,7 +268,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
       groupCols,
-      Some(colProps))
+      Some(colProps),
+      bucketFields: Option[BucketFields])
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f646f1d..ec064ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -29,7 +29,7 @@ import org.apache.carbondata.common.factory.CarbonCommonFactory
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
-import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
+import org.apache.carbondata.core.carbon.metadata.schema.{BucketingInfo, SchemaEvolution, SchemaEvolutionEntry}
 import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -50,7 +50,9 @@ case class TableModel(
     highcardinalitydims: Option[Seq[String]],
     noInvertedIdxCols: Option[Seq[String]],
     columnGroups: Seq[String],
-    colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None)
+    colProps: Option[util.Map[String,
+    util.List[ColumnProperty]]] = None,
+    bucketFields: Option[BucketFields])
 
 case class Field(column: String, var dataType: Option[String], name: Option[String],
     children: Option[List[Field]], parent: String = null,
@@ -69,6 +71,8 @@ case class Partitioner(partitionClass: String, partitionColumn: Array[String], p
 case class PartitionerField(partitionColumn: String, dataType: Option[String],
     columnComment: String)
 
+case class BucketFields(bucketColumns: Seq[String], numberOfBuckets: Int)
+
 case class DataLoadTableFileMapping(table: String, loadPath: String)
 
 case class CarbonMergerMapping(storeLocation: String,
@@ -300,6 +304,27 @@ class TableNewProcessor(cm: TableModel) {
       x => tablePropertiesMap.put(x._1, x._2)
     }
     tableSchema.setTableProperties(tablePropertiesMap)
+    if (cm.bucketFields.isDefined) {
+      val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
+        val col = allColumns.find(_.getColumnName.equalsIgnoreCase(b))
+        col match {
+          case Some(colSchema: ColumnSchema) =>
+            if (colSchema.isDimensionColumn && !colSchema.isComplex) {
+              colSchema
+            } else {
+              LOGGER.error(s"Bucket field must be dimension column and " +
+                           s"should not be measure or complex column: ${colSchema.getColumnName}")
+              sys.error(s"Bucket field must be dimension column and " +
+                        s"should not be measure or complex column: ${colSchema.getColumnName}")
+            }
+          case _ =>
+            LOGGER.error(s"Bucket field is not present in table columns")
+            sys.error(s"Bucket field is not present in table columns")
+        }
+      }
+      tableSchema.setBucketingInfo(
+        new BucketingInfo(bucketCols.asJava, cm.bucketFields.get.numberOfBuckets))
+    }
     tableSchema.setTableName(cm.tableName)
     tableSchema.setListOfColumns(allColumns.asJava)
     tableSchema.setSchemaEvalution(schemaEvol)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 16e35f4..7318e27 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -159,6 +159,7 @@ class CarbonSqlParser() extends CarbonDDLSqlParser {
         var ifNotExistPresent: Boolean = false
         var dbName: Option[String] = None
         var tableName: String = ""
+        var bucketFields: Option[BucketFields] = None
 
         try {
 
@@ -252,6 +253,13 @@ class CarbonSqlParser() extends CarbonDDLSqlParser {
 
             case Token("TOK_LIKETABLE", child :: Nil) =>
               likeTableName = child.getChild(0).getText()
+            case Token("TOK_ALTERTABLE_BUCKETS",
+                  Token("TOK_TABCOLNAME", list)::numberOfBuckets) =>
+              val cols = list.map(_.getText)
+              if (cols != null) {
+                bucketFields = Some(BucketFields(cols,
+                  numberOfBuckets.head.getText.toInt))
+              }
 
             case _ => // Unsupport features
           }
@@ -267,7 +275,8 @@ class CarbonSqlParser() extends CarbonDDLSqlParser {
             tableName,
             fields,
             partitionCols,
-            tableProperties)
+            tableProperties,
+            bucketFields)
 
           // get logical plan.
           CreateTable(tableModel)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index b02d467..3b87e41 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -46,5 +46,12 @@ class CarbonOption(options: Map[String, String]) {
 
   def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
 
+  def bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt
+
+  def bucketColumns: String = options.getOrElse("bucketcolumns", "")
+
+  def isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
+                                  options.contains("bucketnumber")
+
   def toMap: Map[String, String] = options
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 8a946c0..d03c90c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -24,7 +24,7 @@ import scala.language.implicitConversions
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.{CreateTable, Field}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field}
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DecimalType, StructType}
@@ -108,7 +108,7 @@ class CarbonSource extends CreatableRelationProvider
 
     val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
     val tableName: String = parameters.getOrElse("tableName", "default_table")
-
+    val options = new CarbonOption(parameters)
     try {
       CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession)
       CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
@@ -132,7 +132,15 @@ class CarbonSource extends CreatableRelationProvider
         }
         val map = scala.collection.mutable.Map[String, String]()
         parameters.foreach { x => map.put(x._1, x._2) }
-        val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
+        val bucketFields = {
+          if (options.isBucketingEnabled) {
+            Some(BucketFields(options.bucketColumns.split(","), options.bucketNumber))
+          } else {
+            None
+          }
+        }
+        val cm = TableCreator.prepareTableModel(false, Option(dbName),
+          tableName, fields, Nil, bucketFields, map)
         CreateTable(cm, false).run(sparkSession)
         CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
       case ex: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index 362c951..530e70e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -21,7 +21,7 @@ import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.mutable.{LinkedHashSet, Map}
 
-import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, ColumnProperty, Field, PartitionerField, TableModel}
 
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -442,6 +442,7 @@ object TableCreator {
   def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
                         , tableName: String, fields: Seq[Field],
                         partitionCols: Seq[PartitionerField],
+                        bucketFields: Option[BucketFields],
                         tableProperties: Map[String, String]): TableModel
   = {
 
@@ -483,7 +484,8 @@ object TableCreator {
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
       groupCols,
-      Some(colProps))
+      Some(colProps),
+      bucketFields)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index fe8bbe7..de768c0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -17,24 +17,28 @@
 
 package org.apache.spark.sql.execution
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.{AtomicType, IntegerType}
 
+import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
  * Carbon strategy for late decode (convert dictionary key to value as late as possible), which
@@ -248,20 +252,21 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       metadata: Map[String, String],
       needDecoder: ArrayBuffer[AttributeReference],
       updateRequestedColumns: Seq[AttributeReference]): DataSourceScanExec = {
+    val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
     if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) &&
         needDecoder.isEmpty) {
       BatchedDataSourceScanExec(
         output,
         scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
         relation.relation,
-        UnknownPartitioning(0),
+        getPartitioning(table.carbonTable, updateRequestedColumns),
         metadata,
         relation.metastoreTableIdentifier)
     } else {
       RowDataSourceScanExec(output,
         scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
         relation.relation,
-        UnknownPartitioning(0),
+        getPartitioning(table.carbonTable, updateRequestedColumns),
         metadata,
         relation.metastoreTableIdentifier)
     }
@@ -288,6 +293,35 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     }
   }
 
+  private def getPartitioning(carbonTable: CarbonTable,
+      output: Seq[AttributeReference]): Partitioning = {
+    val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+    if (info != null) {
+      val cols = info.getListOfColumns.asScala
+      val sortColumn = carbonTable.
+        getDimensionByTableName(carbonTable.getFactTableName).get(0).getColName
+      val numBuckets = info.getNumberOfBuckets
+      val bucketColumns = cols.flatMap { n =>
+        val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
+        attrRef match {
+          case Some(attr) =>
+            Some(AttributeReference(attr.name,
+              CarbonScalaUtil.convertCarbonToSparkDataType(n.getDataType),
+              attr.nullable,
+              attr.metadata)(attr.exprId, attr.qualifier))
+          case _ => None
+        }
+      }
+      if (bucketColumns.size == cols.size) {
+        HashPartitioning(bucketColumns, numBuckets)
+      } else {
+        UnknownPartitioning(0)
+      }
+    } else {
+      UnknownPartitioning(0)
+    }
+  }
+
 
   protected[sql] def selectFilters(
       relation: BaseRelation,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cbf87977/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 5a91ad1..342cabc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -24,10 +24,11 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{ColTypeListContext, CreateTableContext, TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{CreateTable, Field, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types.DataType
 
+import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -132,13 +133,22 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
       if (!CommonUtil.validateTblProperties(properties.asJava.asScala, fields)) {
         throw new MalformedCarbonCommandException("Invalid table properties")
       }
+      val options = new CarbonOption(properties)
+      val bucketFields = {
+        if (options.isBucketingEnabled) {
+          Some(BucketFields(options.bucketColumns.split(","), options.bucketNumber))
+        } else {
+          None
+        }
+      }
       // prepare table model of the collected tokens
       val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
         name.database,
         name.table,
         fields,
         Seq(),
-        properties.asJava.asScala)
+        properties.asJava.asScala,
+        bucketFields)
 
       CreateTable(tableModel)
     } else {