You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/24 13:26:36 UTC

[40/50] [abbrv] carbondata git commit: [CARBONDATA-1920] [PrestoIntegration] Sparksql query result is not same as presto on same sql

[CARBONDATA-1920] [PrestoIntegration] Sparksql query result is not same as presto on same sql

In Stream Readers we are only decoding dictionary value in case our javatype is slice and it is not decimal type

This closes #1695


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6add19ad
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6add19ad
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6add19ad

Branch: refs/heads/fgdatamap
Commit: 6add19adee6f061d1399525b113eea11d1085713
Parents: dfe3097
Author: anubhav100 <an...@knoldus.in>
Authored: Tue Dec 5 12:25:58 2017 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Fri Dec 22 14:11:52 2017 +0800

----------------------------------------------------------------------
 .../carbondata/presto/CarbondataPageSource.java |  20 +--
 .../carbondata/presto/PrestoFilterUtil.java     |  51 +++-----
 .../readers/DecimalSliceStreamReader.java       |  87 +++++++++----
 .../presto/readers/DoubleStreamReader.java      |  39 ++++--
 .../presto/readers/IntegerStreamReader.java     |  54 +++++---
 .../presto/readers/LongStreamReader.java        |  38 ++++--
 .../presto/readers/ShortStreamReader.java       |  51 +++++---
 .../presto/readers/SliceStreamReader.java       |  31 ++---
 .../presto/readers/StreamReaders.java           |  74 +++++++----
 .../CarbonDictionaryDecodeReadSupport.scala     |  40 +++---
 .../presto/util/CarbonDataStoreCreator.scala    | 126 ++++++++++---------
 11 files changed, 366 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6add19ad/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index e8ecba3..1679f29 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -65,7 +65,8 @@ class CarbondataPageSource implements ConnectorPageSource {
   private long nanoEnd;
 
   CarbondataPageSource(RecordSet recordSet) {
-    this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(), recordSet.cursor());
+    this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(),
+        recordSet.cursor());
   }
 
   private CarbondataPageSource(List<Type> types, RecordCursor cursor) {
@@ -109,7 +110,6 @@ class CarbondataPageSource implements ConnectorPageSource {
             return null;
           }
         }
-
       } else {
         close();
         return null;
@@ -134,17 +134,10 @@ class CarbondataPageSource implements ConnectorPageSource {
       closeWithSuppression(e);
       throw e;
     }
-    catch ( RuntimeException e) {
-      closeWithSuppression(e);
-      throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
-    } catch (InterruptedException e) {
-      closeWithSuppression(e);
-      throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
-    } catch (IOException e) {
+    catch ( RuntimeException | InterruptedException | IOException e) {
       closeWithSuppression(e);
       throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
     }
-
   }
 
   @Override public long getSystemMemoryUsage() {
@@ -205,9 +198,7 @@ class CarbondataPageSource implements ConnectorPageSource {
       if (loaded) {
         return;
       }
-
       checkState(batchId == expectedBatchId);
-
       try {
         Block block = readers[columnIndex].readBlock(type);
         lazyBlock.setBlock(block);
@@ -215,7 +206,6 @@ class CarbondataPageSource implements ConnectorPageSource {
       catch (IOException e) {
         throw new CarbonDataLoadingException("Error in Reading Data from Carbondata ", e);
       }
-
       loaded = true;
     }
   }
@@ -231,8 +221,8 @@ class CarbondataPageSource implements ConnectorPageSource {
     requireNonNull(types);
     StreamReader[] readers = new StreamReader[types.size()];
     for (int i = 0; i < types.size(); i++) {
-      readers[i] =
-          StreamReaders.createStreamReader(types.get(i), readSupport.getSliceArrayBlock(i));
+      readers[i] = StreamReaders.createStreamReader(types.get(i), readSupport
+          .getSliceArrayBlock(i),readSupport.getDictionaries()[i]);
     }
     return readers;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6add19ad/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
index 31d5ba6..a69013f 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
@@ -125,20 +125,14 @@ public class PrestoFilterUtil {
                 } else {
                   GreaterThanExpression greater = new GreaterThanExpression(colExpression,
                       new LiteralExpression(value, coltype));
-                  if(valueExpressionMap.get(value) == null) {
-                    valueExpressionMap.put(value, new ArrayList<>());
-                  }
-                  valueExpressionMap.get(value).add(greater);
+                  valueExpressionMap.computeIfAbsent(value, key -> new ArrayList<>()).add(greater);
                 }
                 break;
               case EXACTLY:
                 GreaterThanEqualToExpression greater =
                     new GreaterThanEqualToExpression(colExpression,
                         new LiteralExpression(value, coltype));
-                if(valueExpressionMap.get(value) == null) {
-                  valueExpressionMap.put(value, new ArrayList<>());
-                }
-                valueExpressionMap.get(value).add(greater);
+                valueExpressionMap.computeIfAbsent(value, key -> new ArrayList<>()).add(greater);
                 break;
               case BELOW:
                 throw new IllegalArgumentException("Low marker should never use BELOW bound");
@@ -154,18 +148,12 @@ public class PrestoFilterUtil {
               case EXACTLY:
                 LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
                     new LiteralExpression(value, coltype));
-                if(valueExpressionMap.get(value) == null) {
-                  valueExpressionMap.put(value, new ArrayList<>());
-                }
-                valueExpressionMap.get(value).add(less);
+                valueExpressionMap.computeIfAbsent(value, key -> new ArrayList<>()).add(less);
                 break;
               case BELOW:
                 LessThanExpression less2 =
                     new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
-                if(valueExpressionMap.get(value) == null) {
-                  valueExpressionMap.put(value, new ArrayList<>());
-                }
-                valueExpressionMap.get(value).add(less2);
+                valueExpressionMap.computeIfAbsent(value, key -> new ArrayList<>()).add(less2);
                 break;
               default:
                 throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
@@ -208,14 +196,14 @@ public class PrestoFilterUtil {
           valuefilters.add(finalFilters);
         }
 
-        if(valuefilters.size() == 1){
+        if (valuefilters.size() == 1) {
           finalFilters = valuefilters.get(0);
         } else if (valuefilters.size() >= 2) {
-         finalFilters = new AndExpression(valuefilters.get(0), valuefilters.get(1));
-         for (int i = 2; i < valuefilters.size() ; i++) {
-           finalFilters = new AndExpression(finalFilters, valuefilters.get(i));
-         }
-       }
+          finalFilters = new AndExpression(valuefilters.get(0), valuefilters.get(1));
+          for (int i = 2; i < valuefilters.size(); i++) {
+            finalFilters = new AndExpression(finalFilters, valuefilters.get(i));
+          }
+        }
 
         filters.add(finalFilters);
       }
@@ -236,8 +224,9 @@ public class PrestoFilterUtil {
   }
 
   private static Object ConvertDataByType(Object rawdata, Type type) {
-    if (type.equals(IntegerType.INTEGER)) return Integer.valueOf(rawdata.toString());
-    // new Integer((rawdata.toString()));
+    if (type.equals(IntegerType.INTEGER) || type.equals(SmallintType.SMALLINT))
+      return Integer.valueOf(rawdata.toString());
+      // new Integer((rawdata.toString()));
     else if (type.equals(BigintType.BIGINT)) return rawdata;
     else if (type.equals(VarcharType.VARCHAR)) {
       if (rawdata instanceof Slice) {
@@ -253,18 +242,18 @@ public class PrestoFilterUtil {
       c.add(Calendar.DAY_OF_YEAR, ((Long) rawdata).intValue());
       Date date = c.getTime();
       return date.getTime() * 1000;
-    }
-    else if (type instanceof DecimalType) {
-      if(rawdata instanceof  Double) {
+    } else if (type instanceof DecimalType) {
+      if (rawdata instanceof Double) {
         return new BigDecimal((Double) rawdata);
-      } else if (rawdata instanceof  Long) {
+      } else if (rawdata instanceof Long) {
         return new BigDecimal(new BigInteger(String.valueOf(rawdata)),
             ((DecimalType) type).getScale());
-      } else if(rawdata instanceof Slice) {
-        return new BigDecimal(Decimals.decodeUnscaledValue((Slice) rawdata), ((DecimalType) type).getScale());
+      } else if (rawdata instanceof Slice) {
+        return new BigDecimal(Decimals.decodeUnscaledValue((Slice) rawdata),
+            ((DecimalType) type).getScale());
       }
     } else if (type.equals(TimestampType.TIMESTAMP)) {
-      return (Long)rawdata * 1000;
+      return (Long) rawdata * 1000;
     }
 
     return rawdata;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6add19ad/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
index ad798d4..e6ac386 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
@@ -20,6 +20,11 @@ package org.apache.carbondata.presto.readers;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.util.Objects;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
@@ -42,6 +47,9 @@ import static java.math.RoundingMode.HALF_UP;
  */
 public class DecimalSliceStreamReader  extends AbstractStreamReader {
 
+  private Dictionary dictionary;
+  private boolean isDictionary;
+
 
   private final char[] buffer = new char[100];
 
@@ -49,6 +57,11 @@ public class DecimalSliceStreamReader  extends AbstractStreamReader {
 
   }
 
+  public DecimalSliceStreamReader(boolean isDictionary, Dictionary dictionary) {
+    this.dictionary = dictionary;
+    this.isDictionary = isDictionary;
+  }
+
   /**
    * Create Block for DecimalType
    * @param type
@@ -63,20 +76,18 @@ public class DecimalSliceStreamReader  extends AbstractStreamReader {
     if(isVectorReader) {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
-      int scale = ((DecimalType)type).getScale();
-      int precision = ((DecimalType)type).getPrecision();
       if (columnVector != null) {
         if(columnVector.anyNullsSet())
         {
-          handleNullInVector(type, numberOfRows, builder, scale, precision);
+          handleNullInVector(type, numberOfRows, builder);
         } else {
           if(isShortDecimal(type)) {
-            populateShortDecimalVector(type, numberOfRows, builder, scale, precision);
+            populateShortDecimalVector(type, numberOfRows, builder);
           } else {
-            populateLongDecimalVector(type, numberOfRows, builder, scale, precision);
+            populateLongDecimalVector(type, numberOfRows, builder);
           }
         }
-   }
+      }
 
     } else {
       if (streamData != null) {
@@ -180,8 +191,7 @@ public class DecimalSliceStreamReader  extends AbstractStreamReader {
 
   }
 
-  private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder, int scale,
-      int precision) {
+  private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
     for (int i = 0; i < numberOfRows; i++) {
       if (columnVector.isNullAt(i)) {
         builder.appendNull();
@@ -189,32 +199,63 @@ public class DecimalSliceStreamReader  extends AbstractStreamReader {
         if (isShortDecimal(type)) {
           BigDecimal decimalValue = (BigDecimal)columnVector.getData(i);
           long rescaledDecimal = Decimals.rescale(decimalValue.unscaledValue().longValue(),
-              decimalValue.scale(), scale);
+              decimalValue.scale(),((DecimalType) type).getScale());
           type.writeLong(builder, rescaledDecimal);
         } else {
-          Slice slice =
-              getSlice(columnVector.getData(i), type);
+          Slice slice = getSlice(columnVector.getData(i), type);
           type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
         }
       }
     }
   }
 
-  private void populateShortDecimalVector(Type type, int numberOfRows, BlockBuilder builder,
-      int scale, int precision) {
-    for (int i = 0; i < numberOfRows; i++) {
-      BigDecimal decimalValue = (BigDecimal)columnVector.getData(i);
-      long rescaledDecimal = Decimals.rescale(decimalValue.unscaledValue().longValue(),
-          decimalValue.scale(), scale);
-      type.writeLong(builder, rescaledDecimal);
+  private void populateShortDecimalVector(Type type, int numberOfRows, BlockBuilder builder) {
+    DecimalType decimalType = (DecimalType) type;
+
+    if (isDictionary) {
+      for (int i = 0; i < numberOfRows; i++) {
+        int value = (int)columnVector.getData(i);
+        Object data = DataTypeUtil
+            .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()));
+        if(Objects.isNull(data)) {
+          builder.appendNull();
+        } else {
+          BigDecimal decimalValue = (BigDecimal) data;
+          long rescaledDecimal =
+              Decimals.rescale(decimalValue.unscaledValue().longValue(), decimalValue.scale(),decimalType.getScale());
+          type.writeLong(builder, rescaledDecimal);
+        }
+      }
+    } else {
+      for (int i = 0; i < numberOfRows; i++) {
+        BigDecimal decimalValue = (BigDecimal) columnVector.getData(i);
+        long rescaledDecimal =
+            Decimals.rescale(decimalValue.unscaledValue().longValue(), decimalValue.scale(),decimalType.getScale());
+        type.writeLong(builder, rescaledDecimal);
+      }
     }
   }
 
-  private void populateLongDecimalVector(Type type, int numberOfRows, BlockBuilder builder,
-      int scale, int precision) {
-    for (int i = 0; i < numberOfRows; i++) {
-      Slice slice = getSlice((BigDecimal)columnVector.getData(i), type);
-      type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
+  private void populateLongDecimalVector(Type type, int numberOfRows, BlockBuilder builder) {
+    if (isDictionary) {
+      for (int i = 0; i < numberOfRows; i++) {
+        int value = (int) columnVector.getData(i);
+        DecimalType decimalType = (DecimalType) type;
+        Object data = DataTypeUtil
+            .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale()));
+        if(Objects.isNull(data)) {
+          builder.appendNull();
+        } else {
+          BigDecimal decimalValue = (BigDecimal) data;
+          Slice slice = getSlice(decimalValue, type);
+          type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
+        }
+      }
+    } else {
+      for (int i = 0; i < numberOfRows; i++) {
+        Slice slice = getSlice((columnVector.getData(i)), type);
+        type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6add19ad/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
index a1910b7..7968ae6 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
@@ -19,6 +19,10 @@ package org.apache.carbondata.presto.readers;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
@@ -29,10 +33,18 @@ import com.facebook.presto.spi.type.Type;
  */
 public class DoubleStreamReader extends AbstractStreamReader {
 
+  private boolean isDictionary;
+  private Dictionary dictionary;
+
   public DoubleStreamReader() {
 
   }
 
+  public DoubleStreamReader(boolean isDictionary, Dictionary dictionary) {
+    this.isDictionary = isDictionary;
+    this.dictionary = dictionary;
+  }
+
   /**
    * Create the DoubleType Block
    *
@@ -47,10 +59,9 @@ public class DoubleStreamReader extends AbstractStreamReader {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if(columnVector.anyNullsSet()) {
+        if (columnVector.anyNullsSet()) {
           handleNullInVector(type, numberOfRows, builder);
-        }
-        else {
+        } else {
           populateVector(type, numberOfRows, builder);
         }
       }
@@ -72,15 +83,29 @@ public class DoubleStreamReader extends AbstractStreamReader {
       if (columnVector.isNullAt(i)) {
         builder.appendNull();
       } else {
-        type.writeDouble(builder, (Double)columnVector.getData(i));
+        type.writeDouble(builder, (Double) columnVector.getData(i));
       }
     }
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-      type.writeDouble(builder, (Double)columnVector.getData(i));
+    if (isDictionary) {
+      for (int i = 0; i < numberOfRows; i++) {
+        int value = (int) columnVector.getData(i);
+        Object data = DataTypeUtil
+            .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.DOUBLE);
+        if (data != null) {
+          type.writeDouble(builder, (Double) data);
+        } else {
+          builder.appendNull();
+        }
+
+      }
+    } else {
+      for (int i = 0; i < numberOfRows; i++) {
+        type.writeDouble(builder, (Double) columnVector.getData(i));
+      }
     }
-  }
 
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6add19ad/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
index 33fc529..5b20925 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -19,6 +19,10 @@ package org.apache.carbondata.presto.readers;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
@@ -26,38 +30,40 @@ import com.facebook.presto.spi.type.Type;
 
 public class IntegerStreamReader extends AbstractStreamReader {
 
+  private Dictionary dictionaryValues;
+  private boolean isDictionary;
+
+  public IntegerStreamReader() {
 
-  public IntegerStreamReader( ) {
+  }
 
+  public IntegerStreamReader(boolean isDictionary, Dictionary dictionary) {
+    this.dictionaryValues = dictionary;
+    this.isDictionary = isDictionary;
   }
 
-  public Block readBlock(Type type)
-      throws IOException
-  {
-    int numberOfRows = 0;
-    BlockBuilder builder = null;
-    if(isVectorReader) {
+  public Block readBlock(Type type) throws IOException {
+    int numberOfRows;
+    BlockBuilder builder;
+    if (isVectorReader) {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if(columnVector.anyNullsSet()) {
+        if (columnVector.anyNullsSet()) {
           handleNullInVector(type, numberOfRows, builder);
-        }
-        else {
+        } else {
           populateVector(type, numberOfRows, builder);
         }
       }
-
     } else {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (streamData != null) {
-        for(int i = 0; i < numberOfRows ; i++ ){
-          type.writeLong(builder, ((Integer)streamData[i]).longValue());
+        for (int i = 0; i < numberOfRows; i++) {
+          type.writeLong(builder, ((Integer) streamData[i]).longValue());
         }
       }
     }
-
     return builder.build();
   }
 
@@ -72,9 +78,23 @@ public class IntegerStreamReader extends AbstractStreamReader {
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-        type.writeLong(builder,  (Integer) columnVector.getData(i));
+    if (isDictionary) {
+      for (int i = 0; i < numberOfRows; i++) {
+        int value = (int) columnVector.getData(i);
+        Object data = DataTypeUtil
+            .getDataBasedOnDataType(dictionaryValues.getDictionaryValueForKey(value),
+                DataTypes.INT);
+        if (data != null) {
+          type.writeLong(builder, ((Integer) data).longValue());
+        } else {
+          builder.appendNull();
+        }
+      }
+    } else {
+      for (int i = 0; i < numberOfRows; i++) {
+        Integer value = (Integer) columnVector.getData(i);
+        type.writeLong(builder, value.longValue());
       }
+    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6add19ad/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
index d7ccda0..ab68889 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -19,6 +19,10 @@ package org.apache.carbondata.presto.readers;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
@@ -26,25 +30,31 @@ import com.facebook.presto.spi.type.Type;
 
 public class LongStreamReader extends AbstractStreamReader {
 
+  private boolean isDictionary;
+  private Dictionary dictionary;
+
   public LongStreamReader() {
 
   }
 
+  public LongStreamReader(boolean isDictionary, Dictionary dictionary) {
+    this.isDictionary = isDictionary;
+    this.dictionary = dictionary;
+  }
+
   public Block readBlock(Type type) throws IOException {
-    int numberOfRows = 0;
-    BlockBuilder builder = null;
+    int numberOfRows;
+    BlockBuilder builder;
     if (isVectorReader) {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if(columnVector.anyNullsSet()) {
+        if (columnVector.anyNullsSet()) {
           handleNullInVector(type, numberOfRows, builder);
-        }
-        else {
+        } else {
           populateVector(type, numberOfRows, builder);
         }
       }
-
     } else {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
@@ -54,7 +64,6 @@ public class LongStreamReader extends AbstractStreamReader {
         }
       }
     }
-
     return builder.build();
   }
 
@@ -63,14 +72,25 @@ public class LongStreamReader extends AbstractStreamReader {
       if (columnVector.isNullAt(i)) {
         builder.appendNull();
       } else {
-        type.writeLong(builder, (Long)columnVector.getData(i));
+        type.writeLong(builder, (Long) columnVector.getData(i));
       }
     }
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
     for (int i = 0; i < numberOfRows; i++) {
-      type.writeLong(builder, (Long)columnVector.getData(i));
+      if (isDictionary) {
+        int value = (int) columnVector.getData(i);
+        Object data = DataTypeUtil
+            .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.LONG);
+        if (data != null) {
+          type.writeLong(builder, (Long) data);
+        } else {
+          builder.appendNull();
+        }
+      } else {
+        type.writeLong(builder, (long) columnVector.getData(i));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6add19ad/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
index 87ebb12..4a28761 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
@@ -19,6 +19,10 @@ package org.apache.carbondata.presto.readers;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
@@ -26,24 +30,28 @@ import com.facebook.presto.spi.type.Type;
 
 public class ShortStreamReader extends AbstractStreamReader {
 
+  private boolean isDictionary;
+  private Dictionary dictionary;
+
+  public ShortStreamReader() {
 
-  public ShortStreamReader( ) {
+  }
 
+  public ShortStreamReader(boolean isDictionary, Dictionary dictionary) {
+    this.isDictionary = isDictionary;
+    this.dictionary = dictionary;
   }
 
-  public Block readBlock(Type type)
-      throws IOException
-  {
-    int numberOfRows = 0;
-    BlockBuilder builder = null;
-    if(isVectorReader) {
+  public Block readBlock(Type type) throws IOException {
+    int numberOfRows;
+    BlockBuilder builder;
+    if (isVectorReader) {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if(columnVector.anyNullsSet()) {
+        if (columnVector.anyNullsSet()) {
           handleNullInVector(type, numberOfRows, builder);
-        }
-        else {
+        } else {
           populateVector(type, numberOfRows, builder);
         }
       }
@@ -52,8 +60,8 @@ public class ShortStreamReader extends AbstractStreamReader {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (streamData != null) {
-        for(int i = 0; i < numberOfRows ; i++ ){
-          type.writeLong(builder, (short) streamData[i]);
+        for (int i = 0; i < numberOfRows; i++) {
+          type.writeLong(builder, (Short) streamData[i]);
         }
       }
     }
@@ -66,15 +74,28 @@ public class ShortStreamReader extends AbstractStreamReader {
       if (columnVector.isNullAt(i)) {
         builder.appendNull();
       } else {
-        type.writeLong(builder, ((short) columnVector.getData(i)));
+        type.writeLong(builder, ((Short) columnVector.getData(i)));
       }
     }
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    for (int i = 0; i < numberOfRows; i++) {
-       type.writeLong(builder, ((short) columnVector.getData(i)));
+    if (isDictionary) {
+      for (int i = 0; i < numberOfRows; i++) {
+        int value = (int) columnVector.getData(i);
+        Object data = DataTypeUtil
+            .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.SHORT);
+        if (data != null) {
+          type.writeLong(builder, (Short) data);
+        } else {
+          builder.appendNull();
+        }
+      }
+    } else {
+      for (int i = 0; i < numberOfRows; i++) {
+        type.writeLong(builder, (Short) columnVector.getData(i));
       }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6add19ad/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index f307331..00b4bce 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -34,43 +34,40 @@ import static io.airlift.slice.Slices.wrappedBuffer;
  */
 public class SliceStreamReader extends AbstractStreamReader {
 
-
   private boolean isDictionary;
 
-  private SliceArrayBlock dictionaryBlock;
+  private SliceArrayBlock dictionarySliceArrayBlock;
 
-  public SliceStreamReader() {}
+  public SliceStreamReader() {
+  }
 
-  public SliceStreamReader(boolean isDictionary, SliceArrayBlock dictionaryBlock) {
+  public SliceStreamReader(boolean isDictionary, SliceArrayBlock dictionarySliceArrayBlock) {
     this.isDictionary = isDictionary;
-    this.dictionaryBlock = dictionaryBlock;
+    this.dictionarySliceArrayBlock = dictionarySliceArrayBlock;
   }
 
   /**
    * Function to create the Slice Block
+   *
    * @param type
    * @return
    * @throws IOException
    */
-  public Block readBlock(Type type)
-      throws IOException
-  {
-    int numberOfRows = 0;
-    BlockBuilder builder = null;
-    if(isVectorReader) {
+  public Block readBlock(Type type) throws IOException {
+    int numberOfRows;
+    BlockBuilder builder;
+    if (isVectorReader) {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if(isDictionary) {
+        if (isDictionary) {
           int[] values = new int[numberOfRows];
           for (int i = 0; i < numberOfRows; i++) {
             if (!columnVector.isNullAt(i)) {
               values[i] = (Integer) columnVector.getData(i);
             }
           }
-          Block block = new DictionaryBlock(batchSize, dictionaryBlock, values);
-
-          return block;
+          return new DictionaryBlock(batchSize, dictionarySliceArrayBlock, values);
         } else {
           for (int i = 0; i < numberOfRows; i++) {
             if (columnVector.isNullAt(i)) {
@@ -85,15 +82,13 @@ public class SliceStreamReader extends AbstractStreamReader {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (streamData != null) {
-        for(int i = 0; i < numberOfRows ; i++ ){
+        for (int i = 0; i < numberOfRows; i++) {
           type.writeSlice(builder, utf8Slice(streamData[i].toString()));
         }
       }
     }
 
     return builder.build();
-
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6add19ad/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
index df1f8d6..b9b8b7e 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.presto.readers;
 
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+
 import com.facebook.presto.spi.block.SliceArrayBlock;
 import com.facebook.presto.spi.type.DateType;
 import com.facebook.presto.spi.type.DecimalType;
@@ -32,39 +34,61 @@ import io.airlift.slice.Slice;
 public final class StreamReaders {
   /**
    * This function select Stream readers based on Type and use it.
+   *
    * @param type
-   * @param dictionary
+   * @param dictionarySliceArrayBlock
    * @return StreamReader
    */
-  public static StreamReader createStreamReader(Type type, SliceArrayBlock dictionary) {
+  public static StreamReader createStreamReader(Type type,
+      SliceArrayBlock dictionarySliceArrayBlock, Dictionary dictionary) {
     Class<?> javaType = type.getJavaType();
-    if (javaType == long.class) {
-      if(type instanceof IntegerType || type instanceof DateType) {
-        return new IntegerStreamReader();
-      } else if (type instanceof DecimalType) {
-        return new DecimalSliceStreamReader();
-      } else if (type instanceof SmallintType) {
-        return new ShortStreamReader();
-      } else if (type instanceof TimestampType) {
-        return new TimestampStreamReader();
-      }
-      return new LongStreamReader();
-    } else if (javaType == double.class) {
-      return new DoubleStreamReader();
-    } else if (javaType == Slice.class) {
-      if (type instanceof DecimalType) {
-       return new DecimalSliceStreamReader();
-      } else {
-        if(dictionary != null) {
-          return new SliceStreamReader(true, dictionary);
+    if (dictionary != null) {
+      if (javaType == long.class) {
+        if (type instanceof IntegerType || type instanceof DateType) {
+          return new IntegerStreamReader(true, dictionary);
+        } else if (type instanceof DecimalType) {
+          return new DecimalSliceStreamReader(true, dictionary);
+        } else if (type instanceof SmallintType) {
+          return new ShortStreamReader(true, dictionary);
+        }
+        return new LongStreamReader(true, dictionary);
+
+      } else if (javaType == double.class) {
+        return new DoubleStreamReader(true, dictionary);
+      } else if (javaType == Slice.class) {
+        if (type instanceof DecimalType) {
+          return new DecimalSliceStreamReader(true, dictionary);
         } else {
-        return new SliceStreamReader();
+          return new SliceStreamReader(true, dictionarySliceArrayBlock);
+        }
+      } else {
+        return new ObjectStreamReader();
       }
+    } else {
+      if (javaType == long.class) {
+        if (type instanceof IntegerType || type instanceof DateType) {
+          return new IntegerStreamReader();
+        } else if (type instanceof DecimalType) {
+          return new DecimalSliceStreamReader();
+        } else if (type instanceof SmallintType) {
+          return new ShortStreamReader();
+        } else if (type instanceof TimestampType) {
+          return new TimestampStreamReader();
+        }
+        return new LongStreamReader();
 
+      } else if (javaType == double.class) {
+        return new DoubleStreamReader();
+      } else if (javaType == Slice.class) {
+        if (type instanceof DecimalType) {
+          return new DecimalSliceStreamReader();
+        } else {
+          return new SliceStreamReader();
+        }
+      } else {
+        return new ObjectStreamReader();
       }
-    } else {
-      return new ObjectStreamReader();
+
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6add19ad/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
index 8e58f9a..9c05177 100644
--- a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
+++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
 
 /**
@@ -42,8 +42,7 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
    * This initialization is done inside executor task
    * for column dictionary involved in decoding.
    *
-   * @param carbonColumns           column list
-   * @param absoluteTableIdentifier table identifier
+   * @param carbonColumns column list
    */
 
   override def initialize(carbonColumns: Array[CarbonColumn], carbonTable: CarbonTable) {
@@ -66,9 +65,12 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
         dictionaries(index) = forwardDictionaryCache
           .get(new DictionaryColumnUniqueIdentifier(carbonTable.getAbsoluteTableIdentifier,
             carbonColumn.getColumnIdentifier, dataTypes(index), dictionaryPath))
-        dictionarySliceArray(index) = createSliceArrayBlock(dictionaries(index))
-
+        // in case of string data type create dictionarySliceArray same as that of presto code
+        if (dataTypes(index).equals(DataTypes.STRING)) {
+          dictionarySliceArray(index) = createSliceArrayBlock(dictionaries(index))
+        }
       }
+
       else {
         dataTypes(index) = carbonColumn.getDataType
       }
@@ -86,17 +88,12 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
     val chunks: DictionaryChunksWrapper = dictionaryData.getDictionaryChunks
     val sliceArray = new Array[Slice](chunks.getSize + 1)
     // Initialize Slice Array with Empty Slice as per Presto's code
-    sliceArray(0) = (Slices.EMPTY_SLICE)
+    sliceArray(0) = Slices.EMPTY_SLICE
     var count = 1
     while (chunks.hasNext) {
       {
         val value: Array[Byte] = chunks.next
-        if (count == 1) {
-          sliceArray(count + 1) = null
-        }
-        else {
-          sliceArray(count) = wrappedBuffer(value, 0, value.length)
-        }
+        sliceArray(count) = wrappedBuffer(value, 0, value.length)
         count += 1
       }
     }
@@ -104,20 +101,7 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
   }
 
   override def readRow(data: Array[AnyRef]): T = {
-    throw new RuntimeException("UnSupported Method Call Convert Column Instead")
-  }
-
-  def convertColumn(data: Array[AnyRef], columnNo: Int): T = {
-    val convertedData = if (Option(dictionaries(columnNo)).isDefined) {
-      data.map { value =>
-        DataTypeUtil
-          .getDataBasedOnDataType(dictionaries(columnNo)
-            .getDictionaryValueForKey(value.asInstanceOf[Int]), DataTypes.STRING)
-      }
-    } else {
-      data
-    }
-    convertedData.asInstanceOf[T]
+    throw new RuntimeException("UnSupported Method")
   }
 
   /**
@@ -130,6 +114,10 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
     dictionarySliceArray(columnNo)
   }
 
+  def getDictionaries: Array[Dictionary] = {
+    dictionaries
+  }
+
   /**
    * to book keep the dictionary cache or update access count for each
    * column involved during decode, to facilitate LRU cache policy if memory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6add19ad/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index ed89be0..5a0adf5 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -34,28 +34,36 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.hadoop.mapreduce.{RecordReader, TaskType}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier, ReverseDictionary}
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier,
+ReverseDictionary}
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.fileoperations.{AtomicFileOperations, AtomicFileOperationsImpl, FileWriteOperation}
-import org.apache.carbondata.core.metadata.converter.{SchemaConverter, ThriftWrapperSchemaConverterImpl}
+import org.apache.carbondata.core.fileoperations.{AtomicFileOperations, AtomicFileOperationsImpl,
+FileWriteOperation}
+import org.apache.carbondata.core.metadata.converter.{SchemaConverter,
+ThriftWrapperSchemaConverterImpl}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, CarbonMeasure, ColumnSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension,
+CarbonMeasure, ColumnSchema}
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
 import org.apache.carbondata.core.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata,
+CarbonTableIdentifier, ColumnIdentifier}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortIndexWriterImpl, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
-import org.apache.carbondata.core.writer.{CarbonDictionaryWriter, CarbonDictionaryWriterImpl, ThriftWriter}
-import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator, StringArrayWritable}
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter,
+CarbonDictionarySortIndexWriterImpl, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
+import org.apache.carbondata.core.writer.{CarbonDictionaryWriter, CarbonDictionaryWriterImpl,
+ThriftWriter}
 import org.apache.carbondata.processing.loading.DataLoadExecutor
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat,
+CSVRecordReaderIterator, StringArrayWritable}
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.TableOptionConstant
 
 object CarbonDataStoreCreator {
@@ -71,21 +79,20 @@ object CarbonDataStoreCreator {
       val dbName: String = "testdb"
       val tableName: String = "testtable"
       val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
-        storePath + "/"+ dbName + "/" + tableName,
+        storePath + "/" + dbName + "/" + tableName,
         new CarbonTableIdentifier(dbName,
           tableName,
           UUID.randomUUID().toString))
-      val factFilePath: String = new File(dataFilePath).getCanonicalPath
+      //   val factFilePath: String = new File(dataFilePath).getCanonicalPath
       val storeDir: File = new File(absoluteTableIdentifier.getTablePath)
       CarbonUtil.deleteFoldersAndFiles(storeDir)
       CarbonProperties.getInstance.addProperty(
         CarbonCommonConstants.STORE_LOCATION_HDFS,
         absoluteTableIdentifier.getTablePath)
       val table: CarbonTable = createTable(absoluteTableIdentifier)
-      writeDictionary(factFilePath, table, absoluteTableIdentifier)
+      writeDictionary(dataFilePath, table, absoluteTableIdentifier)
       val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table)
       val loadModel: CarbonLoadModel = new CarbonLoadModel()
-      val partitionId: String = "0"
       loadModel.setCarbonDataLoadSchema(schema)
       loadModel.setDatabaseName(
         absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName)
@@ -93,7 +100,7 @@ object CarbonDataStoreCreator {
         absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
       loadModel.setTableName(
         absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
-      loadModel.setFactFilePath(factFilePath)
+      loadModel.setFactFilePath(dataFilePath)
       loadModel.setLoadMetadataDetails(new ArrayList[LoadMetadataDetails]())
       loadModel.setTablePath(absoluteTableIdentifier.getTablePath)
       CarbonProperties.getInstance
@@ -146,45 +153,52 @@ object CarbonDataStoreCreator {
     val tableSchema: TableSchema = new TableSchema()
     tableSchema.setTableName(
       absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
-    val columnSchemas: List[ColumnSchema] = new ArrayList[ColumnSchema]()
-    val encodings: ArrayList[Encoding] = new ArrayList[Encoding]()
-    encodings.add(Encoding.INVERTED_INDEX)
+    val columnSchemas = new ArrayList[ColumnSchema]()
+    val dictionaryEncoding: ArrayList[Encoding] = new ArrayList[Encoding]()
+    dictionaryEncoding.add(Encoding.DICTIONARY)
+
+    val invertedIndexEncoding: ArrayList[Encoding] = new ArrayList[Encoding]()
+    invertedIndexEncoding.add(Encoding.INVERTED_INDEX)
+
     val id: ColumnSchema = new ColumnSchema()
     id.setColumnName("ID")
     id.setColumnar(true)
     id.setDataType(DataTypes.INT)
-    id.setEncodingList(encodings)
+    id.setEncodingList(dictionaryEncoding)
     id.setColumnUniqueId(UUID.randomUUID().toString)
     id.setColumnReferenceId(id.getColumnUniqueId)
     id.setDimensionColumn(true)
     id.setColumnGroup(1)
+    id.setSchemaOrdinal(0)
     columnSchemas.add(id)
 
-    val dictEncoding: util.ArrayList[Encoding] = new util.ArrayList[Encoding]()
-    dictEncoding.add(Encoding.DIRECT_DICTIONARY)
-    dictEncoding.add(Encoding.DICTIONARY)
-    dictEncoding.add(Encoding.INVERTED_INDEX)
+    val directDictionaryEncoding: util.ArrayList[Encoding] = new util.ArrayList[Encoding]()
+    directDictionaryEncoding.add(Encoding.DIRECT_DICTIONARY)
+    directDictionaryEncoding.add(Encoding.DICTIONARY)
+    directDictionaryEncoding.add(Encoding.INVERTED_INDEX)
 
     val date: ColumnSchema = new ColumnSchema()
     date.setColumnName("date")
     date.setColumnar(true)
     date.setDataType(DataTypes.DATE)
-    date.setEncodingList(dictEncoding)
+    date.setEncodingList(directDictionaryEncoding)
     date.setColumnUniqueId(UUID.randomUUID().toString)
     date.setDimensionColumn(true)
     date.setColumnGroup(2)
     date.setColumnReferenceId(date.getColumnUniqueId)
+    date.setSchemaOrdinal(1)
     columnSchemas.add(date)
 
     val country: ColumnSchema = new ColumnSchema()
     country.setColumnName("country")
     country.setColumnar(true)
     country.setDataType(DataTypes.STRING)
-    country.setEncodingList(encodings)
+    country.setEncodingList(dictionaryEncoding)
     country.setColumnUniqueId(UUID.randomUUID().toString)
     country.setColumnReferenceId(country.getColumnUniqueId)
     country.setDimensionColumn(true)
     country.setColumnGroup(3)
+    country.setSchemaOrdinal(2)
     country.setColumnReferenceId(country.getColumnUniqueId)
     columnSchemas.add(country)
 
@@ -192,10 +206,11 @@ object CarbonDataStoreCreator {
     name.setColumnName("name")
     name.setColumnar(true)
     name.setDataType(DataTypes.STRING)
-    name.setEncodingList(encodings)
+    name.setEncodingList(dictionaryEncoding)
     name.setColumnUniqueId(UUID.randomUUID().toString)
     name.setDimensionColumn(true)
     name.setColumnGroup(4)
+    name.setSchemaOrdinal(3)
     name.setColumnReferenceId(name.getColumnUniqueId)
     columnSchemas.add(name)
 
@@ -203,10 +218,11 @@ object CarbonDataStoreCreator {
     phonetype.setColumnName("phonetype")
     phonetype.setColumnar(true)
     phonetype.setDataType(DataTypes.STRING)
-    phonetype.setEncodingList(encodings)
+    phonetype.setEncodingList(dictionaryEncoding)
     phonetype.setColumnUniqueId(UUID.randomUUID().toString)
     phonetype.setDimensionColumn(true)
     phonetype.setColumnGroup(5)
+    phonetype.setSchemaOrdinal(4)
     phonetype.setColumnReferenceId(phonetype.getColumnUniqueId)
     columnSchemas.add(phonetype)
 
@@ -214,10 +230,11 @@ object CarbonDataStoreCreator {
     serialname.setColumnName("serialname")
     serialname.setColumnar(true)
     serialname.setDataType(DataTypes.STRING)
-    serialname.setEncodingList(encodings)
+    serialname.setEncodingList(dictionaryEncoding)
     serialname.setColumnUniqueId(UUID.randomUUID().toString)
     serialname.setDimensionColumn(true)
     serialname.setColumnGroup(6)
+    serialname.setSchemaOrdinal(5)
     serialname.setColumnReferenceId(serialname.getColumnUniqueId)
     columnSchemas.add(serialname)
 
@@ -225,10 +242,11 @@ object CarbonDataStoreCreator {
     salary.setColumnName("salary")
     salary.setColumnar(true)
     salary.setDataType(DataTypes.DOUBLE)
-    salary.setEncodingList(encodings)
+    salary.setEncodingList(dictionaryEncoding)
     salary.setColumnUniqueId(UUID.randomUUID().toString)
-    salary.setDimensionColumn(false)
+    salary.setDimensionColumn(true)
     salary.setColumnGroup(7)
+    salary.setSchemaOrdinal(6)
     salary.setColumnReferenceId(salary.getColumnUniqueId)
     columnSchemas.add(salary)
 
@@ -238,23 +256,25 @@ object CarbonDataStoreCreator {
     bonus.setDataType(DataTypes.createDecimalType(10, 4))
     bonus.setPrecision(10)
     bonus.setScale(4)
-    bonus.setEncodingList(encodings)
+    bonus.setEncodingList(invertedIndexEncoding)
     bonus.setColumnUniqueId(UUID.randomUUID().toString)
     bonus.setDimensionColumn(false)
     bonus.setColumnGroup(8)
+    bonus.setSchemaOrdinal(7)
     bonus.setColumnReferenceId(bonus.getColumnUniqueId)
     columnSchemas.add(bonus)
 
     val monthlyBonus: ColumnSchema = new ColumnSchema()
     monthlyBonus.setColumnName("monthlyBonus")
     monthlyBonus.setColumnar(true)
-    monthlyBonus.setDataType(DataTypes.createDecimalType(18, 4))
-    monthlyBonus.setPrecision(18)
+    monthlyBonus.setDataType(DataTypes.createDecimalType(10, 4))
+    monthlyBonus.setPrecision(10)
     monthlyBonus.setScale(4)
-    monthlyBonus.setEncodingList(encodings)
+    monthlyBonus.setSchemaOrdinal(8)
+    monthlyBonus.setEncodingList(invertedIndexEncoding)
     monthlyBonus.setColumnUniqueId(UUID.randomUUID().toString)
     monthlyBonus.setDimensionColumn(false)
-    monthlyBonus.setColumnGroup(8)
+    monthlyBonus.setColumnGroup(9)
     monthlyBonus.setColumnReferenceId(monthlyBonus.getColumnUniqueId)
     columnSchemas.add(monthlyBonus)
 
@@ -262,10 +282,11 @@ object CarbonDataStoreCreator {
     dob.setColumnName("dob")
     dob.setColumnar(true)
     dob.setDataType(DataTypes.TIMESTAMP)
-    dob.setEncodingList(dictEncoding)
+    dob.setEncodingList(directDictionaryEncoding)
     dob.setColumnUniqueId(UUID.randomUUID().toString)
     dob.setDimensionColumn(true)
     dob.setColumnGroup(9)
+    dob.setSchemaOrdinal(9)
     dob.setColumnReferenceId(dob.getColumnUniqueId)
     columnSchemas.add(dob)
 
@@ -273,10 +294,11 @@ object CarbonDataStoreCreator {
     shortField.setColumnName("shortField")
     shortField.setColumnar(true)
     shortField.setDataType(DataTypes.SHORT)
-    shortField.setEncodingList(encodings)
+    shortField.setEncodingList(dictionaryEncoding)
     shortField.setColumnUniqueId(UUID.randomUUID().toString)
-    shortField.setDimensionColumn(false)
+    shortField.setDimensionColumn(true)
     shortField.setColumnGroup(10)
+    shortField.setSchemaOrdinal(10)
     shortField.setColumnReferenceId(shortField.getColumnUniqueId)
     columnSchemas.add(shortField)
 
@@ -337,6 +359,7 @@ object CarbonDataStoreCreator {
       table.getMeasureByTableName(table.getTableName)
     allCols.addAll(msrs)
     val set: Array[util.Set[String]] = Array.ofDim[util.Set[String]](dims.size)
+    val dimsIndex = dims.map(dim => dim.getColumnSchema.getSchemaOrdinal)
     for (i <- set.indices) {
       set(i) = new util.HashSet[String]()
     }
@@ -344,15 +367,18 @@ object CarbonDataStoreCreator {
     while (line != null) {
       val data: Array[String] = line.split(",")
       for (i <- set.indices) {
-        set(i).add(data(i))
+        set(i).add(data(dimsIndex(i)))
       }
       line = reader.readLine()
     }
     val dictCache: Cache[DictionaryColumnUniqueIdentifier, ReverseDictionary] = CacheProvider
       .getInstance.createCache(CacheType.REVERSE_DICTIONARY)
+
     for (i <- set.indices) {
+      //      val dim = getDimension(dims, i).get
       val columnIdentifier: ColumnIdentifier =
         new ColumnIdentifier(dims.get(i).getColumnId, null, null)
+
       val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier =
         new DictionaryColumnUniqueIdentifier(
           table.getAbsoluteTableIdentifier,
@@ -371,7 +397,7 @@ object CarbonDataStoreCreator {
             absoluteTableIdentifier,
             columnIdentifier,
             dims.get(i).getDataType)
-          )
+        )
         .asInstanceOf[Dictionary]
       val preparator: CarbonDictionarySortInfoPreparator =
         new CarbonDictionarySortInfoPreparator()
@@ -399,10 +425,11 @@ object CarbonDataStoreCreator {
     reader.close()
   }
 
+
   /**
    * Execute graph which will further load data
    *
-   * @param loadModel Carbon load model
+   * @param loadModel     Carbon load model
    * @param storeLocation store location directory
    * @throws Exception
    */
@@ -481,25 +508,6 @@ object CarbonDataStoreCreator {
       loadModel.getTableName,
       loadModel.getTableName,
       new ArrayList[LoadMetadataDetails]())
-    val segLocation: String = storeLocation + "/" + databaseName + "/" + tableName +
-                              "/Fact/Part0/Segment_0"
-    val file: File = new File(segLocation)
-    var factFile: File = null
-    val folderList: Array[File] = file.listFiles()
-    var folder: File = null
-    for (i <- folderList.indices if folderList(i).isDirectory) {
-      folder = folderList(i)
-    }
-    if (folder.isDirectory) {
-      val files: Array[File] = folder.listFiles()
-      for (i <- files.indices
-           if !files(i).isDirectory && files(i).getName.startsWith("part")) {
-        factFile = files(i)
-        //break
-      }
-      factFile.renameTo(new File(segLocation + "/" + factFile.getName))
-      CarbonUtil.deleteFoldersAndFiles(folder)
-    }
   }
 
   private def writeLoadMetadata(