You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/21 12:24:23 UTC

[2/2] carbondata git commit: [CARBONDATA-2948] Float and Byte DataType support

[CARBONDATA-2948] Float and Byte DataType support

Background
Currently float is supported by internally storing the data as double and changing the data type to Double. This poses some problems while using SparkCarbonFileFormat for reading the float type data.
Internally as the data type is changed from Float to Double therefore the data is retrieved as a Double page instead of float.
If the user tried to create a table using file format by specifying the datatype as float for any column then the query will fail. User is restricted to use double to retrieve the data.

Proposed Solution
Add support for float data type and store the date as a FloatPage. Most of the methods that are used for double can be reused for float.

This closes #2726


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/edfcdca0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/edfcdca0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/edfcdca0

Branch: refs/heads/master
Commit: edfcdca0ac3dcf92a22ffd82557dbff036ec7428
Parents: b04269b
Author: kunal642 <ku...@gmail.com>
Authored: Fri Sep 14 15:26:10 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Sep 21 17:54:10 2018 +0530

----------------------------------------------------------------------
 .../chunk/store/ColumnPageWrapper.java          |  16 +-
 .../core/datastore/page/ColumnPage.java         |  10 +
 .../core/datastore/page/DecimalColumnPage.java  |   5 +
 .../core/datastore/page/LazyColumnPage.java     |   7 +-
 .../datastore/page/LocalDictColumnPage.java     |   4 +
 .../datastore/page/SafeFixLengthColumnPage.java |  10 +
 .../page/UnsafeFixLengthColumnPage.java         |  14 ++
 .../datastore/page/VarLengthColumnPageBase.java |   6 +
 .../page/encoding/ColumnPageEncoderMeta.java    |   8 +
 .../page/encoding/DefaultEncodingFactory.java   |  11 +-
 .../page/encoding/adaptive/AdaptiveCodec.java   |   4 +
 .../adaptive/AdaptiveDeltaFloatingCodec.java    |   7 +-
 .../adaptive/AdaptiveFloatingCodec.java         |  14 +-
 .../statistics/ColumnPageStatsCollector.java    |   1 +
 .../page/statistics/DummyStatsCollector.java    |   4 +
 .../page/statistics/KeyPageStatsCollector.java  |   4 +
 .../page/statistics/LVStringStatsCollector.java |   4 +
 .../statistics/PrimitivePageStatsCollector.java |  57 +++++
 .../ThriftWrapperSchemaConverterImpl.java       |   8 +
 .../core/metadata/datatype/DataType.java        |   2 +-
 .../impl/AbstractScannedResultCollector.java    |   2 +
 .../carbondata/core/scan/filter/FilterUtil.java |  21 ++
 .../executer/ExcludeFilterExecuterImpl.java     |  21 +-
 .../executer/IncludeFilterExecuterImpl.java     |  25 +--
 .../scan/result/vector/CarbonColumnVector.java  |   2 +
 .../vector/MeasureDataVectorProcessor.java      | 111 ++++++++++
 .../vector/impl/CarbonColumnVectorImpl.java     |   4 +
 .../apache/carbondata/core/util/ByteUtil.java   |  12 +-
 .../carbondata/core/util/CarbonUnsafeUtil.java  |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  11 +-
 .../carbondata/core/util/DataTypeUtil.java      |  23 ++
 .../core/util/comparator/Comparator.java        |  19 ++
 format/src/main/thrift/schema.thrift            |   2 +
 .../presto/CarbonColumnVectorWrapper.java       |   6 +
 .../TestNonTransactionalCarbonTable.scala       |   6 +-
 .../vectorreader/ColumnarVectorWrapper.java     |   6 +
 .../VectorizedCarbonRecordReader.java           |   5 +-
 .../datasources/CarbonSparkDataSourceUtil.scala |   3 +
 .../spark/sql/util/SparkTypeConverter.scala     |   2 +
 .../datasource/SparkCarbonDataSourceTest.scala  | 215 ++++++++++++++++++-
 ...tCreateTableUsingSparkCarbonFileFormat.scala |   5 +-
 .../sql/carbondata/datasource/TestUtil.scala    |  16 +-
 .../spark/sql/CarbonDataFrameWriter.scala       |   2 +-
 .../loading/sort/SortStepRowHandler.java        |   8 +
 .../carbondata/sdk/file/AvroCarbonWriter.java   |   8 +-
 .../sdk/file/AvroCarbonWriterTest.java          |  42 ++++
 .../sdk/file/CSVCarbonWriterTest.java           | 183 +++++++++++++++-
 .../sdk/file/ConcurrentAvroSdkWriterTest.java   |   2 +-
 .../sdk/file/ConcurrentSdkWriterTest.java       |   2 +-
 49 files changed, 865 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
index 176a3e9..71cfc46 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
@@ -143,14 +143,12 @@ public class ColumnPageWrapper implements DimensionColumnPage {
         // if this row is null, return default null represent in byte array
         return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
       }
-      if (srcDataType == DataTypes.DOUBLE || srcDataType == DataTypes.FLOAT) {
+      if (srcDataType == DataTypes.FLOAT) {
+        float floatData = columnPage.getFloat(rowId);
+        return ByteUtil.toXorBytes(floatData);
+      } else if (srcDataType == DataTypes.DOUBLE) {
         double doubleData = columnPage.getDouble(rowId);
-        if (srcDataType == DataTypes.FLOAT) {
-          float out = (float) doubleData;
-          return ByteUtil.toXorBytes(out);
-        } else {
-          return ByteUtil.toXorBytes(doubleData);
-        }
+        return ByteUtil.toXorBytes(doubleData);
       } else if (DataTypes.isDecimal(srcDataType)) {
         throw new RuntimeException("unsupported type: " + srcDataType);
       } else if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN) || (
@@ -160,7 +158,7 @@ public class ColumnPageWrapper implements DimensionColumnPage {
         long longData = columnPage.getLong(rowId);
         if ((srcDataType == DataTypes.BYTE)) {
           byte out = (byte) longData;
-          return ByteUtil.toXorBytes(out);
+          return new byte[] { out };
         } else if (srcDataType == DataTypes.BOOLEAN) {
           byte out = (byte) longData;
           return ByteUtil.toBytes(ByteUtil.toBoolean(out));
@@ -195,6 +193,8 @@ public class ColumnPageWrapper implements DimensionColumnPage {
         return columnPage.getBytes(rowId);
       } else if (srcDataType == DataTypes.DOUBLE) {
         return ByteUtil.toXorBytes(columnPage.getDouble(rowId));
+      } else if (srcDataType == DataTypes.FLOAT) {
+        return ByteUtil.toXorBytes(columnPage.getFloat(rowId));
       } else if (srcDataType == targetDataType) {
         return columnPage.getBytes(rowId);
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 796083d..8b9a9a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -435,6 +435,9 @@ public abstract class ColumnPage {
         || dataType == DataTypes.VARCHAR) {
       putBytes(rowId, (byte[]) value);
       statsCollector.update((byte[]) value);
+    } else if (dataType == DataTypes.FLOAT) {
+      putFloat(rowId, (float) value);
+      statsCollector.update((float) value);
     } else {
       throw new RuntimeException("unsupported data type: " + dataType);
     }
@@ -501,6 +504,11 @@ public abstract class ColumnPage {
   public abstract void putDouble(int rowId, double value);
 
   /**
+   * Set float value at rowId
+   */
+  public abstract void putFloat(int rowId, float value);
+
+  /**
    * Set byte array value at rowId
    */
   public abstract void putBytes(int rowId, byte[] bytes);
@@ -545,6 +553,8 @@ public abstract class ColumnPage {
       putLong(rowId, 0L);
     } else if (dataType == DataTypes.DOUBLE) {
       putDouble(rowId, 0.0);
+    } else if (dataType == DataTypes.FLOAT) {
+      putFloat(rowId, 0.0f);
     } else if (DataTypes.isDecimal(dataType)) {
       putDecimal(rowId, BigDecimal.ZERO);
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
index e63614f..50261e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/DecimalColumnPage.java
@@ -110,6 +110,11 @@ public abstract class DecimalColumnPage extends VarLengthColumnPageBase {
         "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
+  @Override public void putFloat(int rowId, float value) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
   @Override
   public void setFloatPage(float[] floatData) {
     throw new UnsupportedOperationException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 605fe4e..772916d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -93,7 +93,7 @@ public class LazyColumnPage extends ColumnPage {
 
   @Override
   public float getFloat(int rowId) {
-    throw new UnsupportedOperationException("internal error");
+    return (float) getDouble(rowId);
   }
 
   @Override
@@ -262,6 +262,11 @@ public class LazyColumnPage extends ColumnPage {
   }
 
   @Override
+  public void putFloat(int rowId, float value) {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
   public void putBytes(int rowId, byte[] bytes) {
     throw new UnsupportedOperationException("internal error");
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
index fced016..ad19e27 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java
@@ -232,6 +232,10 @@ public class LocalDictColumnPage extends ColumnPage {
     throw new UnsupportedOperationException("Operation not supported");
   }
 
+  @Override public void putFloat(int rowId, float value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   @Override public void putDecimal(int rowId, BigDecimal decimal) {
     throw new UnsupportedOperationException("Operation not supported");
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index b355220..3884d9b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -101,6 +101,16 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   }
 
   /**
+   * Set float value at rowId
+   */
+  @Override
+  public void putFloat(int rowId, float value) {
+    ensureArraySize(rowId, DataTypes.FLOAT);
+    floatData[rowId] = value;
+    arrayElementCount++;
+  }
+
+  /**
    * Set string value at rowId
    */
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 8a53840..9e0eb8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -200,6 +200,20 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override
+  public void putFloat(int rowId, float value) {
+    try {
+      ensureMemory(ByteUtil.SIZEOF_FLOAT);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
+
+    long offset = ((long) rowId) << floatBits;
+    CarbonUnsafe.getUnsafe().putFloat(baseAddress, baseOffset + offset, value);
+    totalLength += ByteUtil.SIZEOF_FLOAT;
+    updatePageSize(rowId);
+  }
+
+  @Override
   public void putBytes(int rowId, byte[] bytes) {
     try {
       ensureMemory(eachRowSize);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 7f0b2a6..39395c3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -320,6 +320,12 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
         "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
   }
 
+  @Override
+  public void putFloat(int rowId, float value) {
+    throw new UnsupportedOperationException(
+        "invalid data type: " + columnPageEncoderMeta.getStoreDataType());
+  }
+
   abstract void putBytesAtRow(int rowId, byte[] bytes);
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index 971cf24..e6aafa0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -132,6 +132,10 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       out.writeDouble((Double) getMaxValue());
       out.writeDouble((Double) getMinValue());
       out.writeDouble(0d); // unique value is obsoleted, maintain for compatibility
+    } else if (dataType == DataTypes.FLOAT) {
+      out.writeFloat((Float) getMaxValue());
+      out.writeFloat((Float) getMinValue());
+      out.writeFloat(0f); // unique value is obsoleted, maintain for compatibility
     } else if (DataTypes.isDecimal(dataType)) {
       byte[] maxAsBytes = getMaxAsBytes(columnSpec.getSchemaDataType());
       byte[] minAsBytes = getMinAsBytes(columnSpec.getSchemaDataType());
@@ -181,6 +185,10 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       this.setMaxValue(in.readDouble());
       this.setMinValue(in.readDouble());
       in.readDouble(); // for non exist value which is obsoleted, it is backward compatibility;
+    } else if (dataType == DataTypes.FLOAT) {
+      this.setMaxValue(in.readFloat());
+      this.setMinValue(in.readFloat());
+      in.readFloat(); // for non exist value which is obsoleted, it is backward compatibility;
     } else if (DataTypes.isDecimal(dataType)) {
       byte[] max = new byte[in.readShort()];
       in.readFully(max);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 993b6b8..9e8d853 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -303,8 +303,15 @@ public class DefaultEncodingFactory extends EncodingFactory {
   static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats,
       boolean isComplexPrimitive, TableSpec.ColumnSpec columnSpec) {
     DataType srcDataType = stats.getDataType();
-    double maxValue = (double) stats.getMax();
-    double minValue = (double) stats.getMin();
+    double maxValue;
+    double minValue;
+    if (srcDataType == DataTypes.FLOAT) {
+      maxValue = (float) stats.getMax();
+      minValue = (float) stats.getMin();
+    } else {
+      maxValue = (double) stats.getMax();
+      minValue = (double) stats.getMin();
+    }
     int decimalCount = stats.getDecimalCount();
 
     // For Complex Type primitive we should always choose adaptive path

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
index ef7a6a9..da57e8c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
@@ -145,6 +145,10 @@ public abstract class AdaptiveCodec implements ColumnPageCodec {
       for (int i = 0; i < dataPage.length; i++) {
         page.putDouble(i, (double) dataPage[i]);
       }
+    } else if (srcDataType == DataTypes.FLOAT) {
+      for (int i = 0; i < dataPage.length; i++) {
+        page.putFloat(i, (float) dataPage[i]);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 6d0a8d1..9b0b574 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -58,7 +58,12 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
       SimpleStatsResult stats, boolean isInvertedIndex) {
     super(srcDataType, targetDataType, stats, isInvertedIndex);
     this.factor = Math.pow(10, stats.getDecimalCount());
-    this.max = (long) (Math.pow(10, stats.getDecimalCount()) * (double) stats.getMax());
+    if (srcDataType == DataTypes.FLOAT) {
+      this.max =
+          (long) ((long) Math.pow(10, stats.getDecimalCount()) * ((float) stats.getMax()));
+    } else {
+      this.max = (long) ((long) Math.pow(10, stats.getDecimalCount()) * (double) stats.getMax());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index af1e9ec..836af26 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -44,12 +44,14 @@ import org.apache.carbondata.format.Encoding;
  */
 public class AdaptiveFloatingCodec extends AdaptiveCodec {
 
-  private Double factor;
+  private double factor;
+  private float floatFactor;
 
   public AdaptiveFloatingCodec(DataType srcDataType, DataType targetDataType,
       SimpleStatsResult stats, boolean isInvertedIndex) {
     super(srcDataType, targetDataType, stats, isInvertedIndex);
     this.factor = Math.pow(10, stats.getDecimalCount());
+    this.floatFactor = (float) factor;
   }
 
   @Override
@@ -147,15 +149,15 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
     @Override
     public void encode(int rowId, float value) {
       if (targetDataType == DataTypes.BYTE) {
-        encodedPage.putByte(rowId, (byte) (value * factor));
+        encodedPage.putByte(rowId, (byte) (value * floatFactor));
       } else if (targetDataType == DataTypes.SHORT) {
-        encodedPage.putShort(rowId, (short) (value * factor));
+        encodedPage.putShort(rowId, (short) (value * floatFactor));
       } else if (targetDataType == DataTypes.SHORT_INT) {
-        encodedPage.putShortInt(rowId, (int) (value * factor));
+        encodedPage.putShortInt(rowId, (int) (value * floatFactor));
       } else if (targetDataType == DataTypes.INT) {
-        encodedPage.putInt(rowId, (int) (value * factor));
+        encodedPage.putInt(rowId, (int) (value * floatFactor));
       } else if (targetDataType == DataTypes.LONG) {
-        encodedPage.putLong(rowId, (long) (value * factor));
+        encodedPage.putLong(rowId, (long) (value * floatFactor));
       } else {
         throw new RuntimeException("internal error: " + debugInfo());
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
index a749587..28aec84 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
@@ -26,6 +26,7 @@ public interface ColumnPageStatsCollector {
   void update(int value);
   void update(long value);
   void update(double value);
+  void update(float value);
   void update(BigDecimal value);
   void update(byte[] value);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java
index fc91489..86bd12f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/DummyStatsCollector.java
@@ -78,6 +78,10 @@ public class DummyStatsCollector implements ColumnPageStatsCollector {
 
   }
 
+  @Override public void update(float value) {
+
+  }
+
   @Override public void update(BigDecimal value) {
 
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java
index 30edc53..e929c41 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java
@@ -66,6 +66,10 @@ public class KeyPageStatsCollector implements ColumnPageStatsCollector {
 
   }
 
+  @Override public void update(float value) {
+
+  }
+
   @Override
   public void update(BigDecimal value) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
index b04f2ee..c970111 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
@@ -74,6 +74,10 @@ public abstract class LVStringStatsCollector implements ColumnPageStatsCollector
 
   }
 
+  @Override public void update(float value) {
+
+  }
+
   @Override
   public void update(BigDecimal value) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
index 4af5b14..9be5a58 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -36,6 +36,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   private int minInt, maxInt;
   private long minLong, maxLong;
   private double minDouble, maxDouble;
+  private float minFloat, maxFloat;
   private BigDecimal minDecimal, maxDecimal;
 
   // scale of the double value, apply adaptive encoding if this is positive
@@ -74,6 +75,10 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       instance.minDouble = (double) meta.getMinValue();
       instance.maxDouble = (double) meta.getMaxValue();
       instance.decimal = meta.getDecimal();
+    } else if (dataType == DataTypes.FLOAT) {
+      instance.minFloat = (float) meta.getMinValue();
+      instance.maxFloat = (float) meta.getMaxValue();
+      instance.decimal = meta.getDecimal();
     } else if (DataTypes.isDecimal(dataType)) {
       instance.minDecimal = (BigDecimal) meta.getMinValue();
       instance.maxDecimal = (BigDecimal) meta.getMaxValue();
@@ -107,6 +112,10 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       instance.minDouble = (double) meta.getMinValue();
       instance.maxDouble = (double) meta.getMaxValue();
       instance.decimal = meta.getDecimal();
+    } else if (dataType == DataTypes.FLOAT) {
+      instance.minFloat = (float) meta.getMinValue();
+      instance.maxFloat = (float) meta.getMaxValue();
+      instance.decimal = meta.getDecimal();
     } else if (DataTypes.isDecimal(dataType)) {
       instance.minDecimal = (BigDecimal) meta.getMinValue();
       instance.maxDecimal = (BigDecimal) meta.getMaxValue();
@@ -140,6 +149,10 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       minDouble = Double.POSITIVE_INFINITY;
       maxDouble = Double.NEGATIVE_INFINITY;
       decimal = 0;
+    } else if (dataType == DataTypes.FLOAT) {
+      minFloat = Float.MAX_VALUE;
+      maxFloat = Float.MIN_VALUE;
+      decimal = 0;
     } else if (DataTypes.isDecimal(dataType)) {
       this.zeroDecimal = BigDecimal.ZERO;
       decimal = 0;
@@ -162,6 +175,8 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       update(value);
     } else if (dataType == DataTypes.DOUBLE) {
       update(0d);
+    } else if (dataType == DataTypes.FLOAT) {
+      update(0f);
     } else if (DataTypes.isDecimal(dataType)) {
       if (isFirst) {
         maxDecimal = zeroDecimal;
@@ -237,6 +252,22 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     return decimalPlaces;
   }
 
+  private int getDecimalCount(float value) {
+    int decimalPlaces = 0;
+    try {
+      String strValue = Float.valueOf(Math.abs(value)).toString();
+      int integerPlaces = strValue.indexOf('.');
+      if (-1 != integerPlaces) {
+        decimalPlaces = strValue.length() - integerPlaces - 1;
+      }
+    } catch (NumberFormatException e) {
+      if (!Double.isInfinite(value)) {
+        throw e;
+      }
+    }
+    return decimalPlaces;
+  }
+
   @Override
   public void update(double value) {
     if (minDouble > value) {
@@ -257,6 +288,26 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       }
     }
   }
+  @Override
+  public void update(float value) {
+    if (minFloat > value) {
+      minFloat = value;
+    }
+    if (maxFloat < value) {
+      maxFloat = value;
+    }
+    if (decimal >= 0) {
+      int decimalCount = getDecimalCount(value);
+      decimalCountForComplexPrimitive = decimalCount;
+      if (decimalCount > 5) {
+        // If deciaml count is too big, we do not do adaptive encoding.
+        // So set decimal to negative value
+        decimal = -1;
+      } else if (decimalCount > decimal) {
+        this.decimal = decimalCount;
+      }
+    }
+  }
 
   public int getDecimalForComplexPrimitive() {
     decimal = decimalCountForComplexPrimitive;
@@ -299,6 +350,8 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       return String.format("min: %s, max: %s, decimal: %s ", minLong, maxLong, decimal);
     } else if (dataType == DataTypes.DOUBLE) {
       return String.format("min: %s, max: %s, decimal: %s ", minDouble, maxDouble, decimal);
+    } else if (dataType == DataTypes.FLOAT) {
+      return String.format("min: %s, max: %s, decimal: %s ", minFloat, maxFloat, decimal);
     }
     return super.toString();
   }
@@ -315,6 +368,8 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       return minLong;
     } else if (dataType == DataTypes.DOUBLE) {
       return minDouble;
+    } else if (dataType == DataTypes.FLOAT) {
+      return minFloat;
     } else if (DataTypes.isDecimal(dataType)) {
       return minDecimal;
     }
@@ -333,6 +388,8 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       return maxLong;
     } else if (dataType == DataTypes.DOUBLE) {
       return maxDouble;
+    } else if (dataType == DataTypes.FLOAT) {
+      return maxFloat;
     } else if (DataTypes.isDecimal(dataType)) {
       return maxDecimal;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index de5985c..13f592f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -160,6 +160,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
       return org.apache.carbondata.format.DataType.MAP;
     } else if (dataType.getId() == DataTypes.VARCHAR.getId()) {
       return org.apache.carbondata.format.DataType.VARCHAR;
+    } else if (dataType.getId() == DataTypes.FLOAT.getId()) {
+      return org.apache.carbondata.format.DataType.FLOAT;
+    } else if (dataType.getId() == DataTypes.BYTE.getId()) {
+      return org.apache.carbondata.format.DataType.BYTE;
     } else {
       return org.apache.carbondata.format.DataType.STRING;
     }
@@ -502,6 +506,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         return DataTypes.createDefaultMapType();
       case VARCHAR:
         return DataTypes.VARCHAR;
+      case FLOAT:
+        return DataTypes.FLOAT;
+      case BYTE:
+        return DataTypes.BYTE;
       default:
         return DataTypes.STRING;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index 5a19073..8514ccb 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -85,7 +85,7 @@ public class DataType implements Serializable {
         dataType == DataTypes.INT ||
         dataType == DataTypes.LONG) {
       return BIG_INT_MEASURE_CHAR;
-    } else if (dataType == DataTypes.DOUBLE) {
+    } else if (dataType == DataTypes.DOUBLE || dataType == DataTypes.FLOAT) {
       return DOUBLE_MEASURE_CHAR;
     } else if (DataTypes.isDecimal(dataType)) {
       return BIG_DECIMAL_MEASURE_CHAR;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
index a160778..b20954a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -143,6 +143,8 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
         return (int) dataChunk.getLong(index);
       } else if (dataType == DataTypes.LONG) {
         return dataChunk.getLong(index);
+      } else if (dataType == DataTypes.FLOAT) {
+        return dataChunk.getFloat(index);
       } else if (DataTypes.isDecimal(dataType)) {
         BigDecimal bigDecimalMsrValue = dataChunk.getDecimal(index);
         if (null != bigDecimalMsrValue && carbonMeasure.getScale() > bigDecimalMsrValue.scale()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 8c0ea56..ba6a033 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -667,6 +667,27 @@ public final class FilterUtil {
     return columnFilterInfo;
   }
 
+  public static DataType getMeasureDataType(
+      MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo) {
+    if (msrColumnEvaluatorInfo.getType() == DataTypes.BOOLEAN) {
+      return DataTypes.BOOLEAN;
+    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.SHORT) {
+      return DataTypes.SHORT;
+    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.INT) {
+      return DataTypes.INT;
+    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.LONG) {
+      return DataTypes.LONG;
+    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.FLOAT) {
+      return DataTypes.FLOAT;
+    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.BYTE) {
+      return DataTypes.BYTE;
+    } else if (DataTypes.isDecimal(msrColumnEvaluatorInfo.getType())) {
+      return DataTypes.createDefaultDecimalType();
+    } else {
+      return DataTypes.DOUBLE;
+    }
+  }
+
   /**
    * Method will prepare the  dimfilterinfo instance by resolving the filter
    * expression value to its respective surrogates.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 04264f3..15a43c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -25,7 +25,6 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
@@ -81,7 +80,7 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
               null, null, msrColumnEvaluatorInfo.getMeasure(), msrColumnExecutorInfo);
       isMeasurePresentInCurrentBlock = true;
 
-      DataType msrType = getMeasureDataType(msrColumnEvaluatorInfo);
+      DataType msrType = FilterUtil.getMeasureDataType(msrColumnEvaluatorInfo);
       comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
     }
 
@@ -127,7 +126,7 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
       ColumnPage[] ColumnPages =
           measureRawColumnChunk.decodeAllColumnPages();
       BitSetGroup bitSetGroup = new BitSetGroup(measureRawColumnChunk.getPagesCount());
-      DataType msrType = getMeasureDataType(msrColumnEvaluatorInfo);
+      DataType msrType = FilterUtil.getMeasureDataType(msrColumnEvaluatorInfo);
       for (int i = 0; i < ColumnPages.length; i++) {
         BitSet bitSet =
             getFilteredIndexesForMeasure(
@@ -173,22 +172,6 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
     return true;
   }
 
-  private DataType getMeasureDataType(MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo) {
-    if (msrColumnEvaluatorInfo.getType() == DataTypes.BOOLEAN) {
-      return DataTypes.BOOLEAN;
-    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.SHORT) {
-      return DataTypes.SHORT;
-    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.INT) {
-      return DataTypes.INT;
-    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.LONG) {
-      return DataTypes.LONG;
-    } else if (DataTypes.isDecimal(msrColumnEvaluatorInfo.getType())) {
-      return DataTypes.createDefaultDecimalType();
-    } else {
-      return DataTypes.DOUBLE;
-    }
-  }
-
   private BitSet getFilteredIndexes(ColumnPage columnPage, int numerOfRows, DataType msrType) {
     // Here the algorithm is
     // Get the measure values from the chunk. compare sequentially with the

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index ddc9751..8070c2f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -26,7 +26,6 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
@@ -82,14 +81,12 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
       this.msrColumnEvaluatorInfo = msrColumnEvaluatorInfo;
       msrColumnExecutorInfo = new MeasureColumnExecuterFilterInfo();
       comparator =
-          Comparator.getComparatorByDataTypeForMeasure(getMeasureDataType(msrColumnEvaluatorInfo));
+          Comparator.getComparatorByDataTypeForMeasure(
+              FilterUtil.getMeasureDataType(msrColumnEvaluatorInfo));
       FilterUtil
           .prepareKeysFromSurrogates(msrColumnEvaluatorInfo.getFilterValues(), segmentProperties,
               null, null, msrColumnEvaluatorInfo.getMeasure(), msrColumnExecutorInfo);
       isMeasurePresentInCurrentBlock = true;
-
-      DataType msrType = getMeasureDataType(msrColumnEvaluatorInfo);
-      comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
     }
 
   }
@@ -157,7 +154,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
       MeasureRawColumnChunk measureRawColumnChunk =
           rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex];
       BitSetGroup bitSetGroup = new BitSetGroup(measureRawColumnChunk.getPagesCount());
-      DataType msrType = getMeasureDataType(msrColumnEvaluatorInfo);
+      DataType msrType = FilterUtil.getMeasureDataType(msrColumnEvaluatorInfo);
       for (int i = 0; i < measureRawColumnChunk.getPagesCount(); i++) {
         if (measureRawColumnChunk.getMaxValues() != null) {
           if (isScanRequired(measureRawColumnChunk.getMaxValues()[i],
@@ -210,22 +207,6 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     return false;
   }
 
-  private DataType getMeasureDataType(MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo) {
-    if (msrColumnEvaluatorInfo.getType() == DataTypes.BOOLEAN) {
-      return DataTypes.BOOLEAN;
-    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.SHORT) {
-      return DataTypes.SHORT;
-    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.INT) {
-      return DataTypes.INT;
-    } else if (msrColumnEvaluatorInfo.getType() == DataTypes.LONG) {
-      return DataTypes.LONG;
-    } else if (DataTypes.isDecimal(msrColumnEvaluatorInfo.getType())) {
-      return DataTypes.createDefaultDecimalType();
-    } else {
-      return DataTypes.DOUBLE;
-    }
-  }
-
   private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
       int rowsInPage, DataType msrType) {
     // Here the algorithm is

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index 3bee136..dd0e8b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -53,6 +53,8 @@ public interface CarbonColumnVector {
 
   void putBytes(int rowId, int offset, int length, byte[] value);
 
+  void putByte(int rowId, byte value);
+
   void putNull(int rowId);
 
   void putNulls(int rowId, int count);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
index 8902dfb..808f646 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
@@ -302,6 +302,113 @@ public class MeasureDataVectorProcessor {
       }
     }
   }
+  public static class FloatMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullBits();
+      if (nullBitSet.isEmpty()) {
+        for (int i = offset; i < len; i++) {
+          vector.putFloat(vectorOffset, dataChunk.getFloat(i));
+          vectorOffset++;
+        }
+      } else {
+        for (int i = offset; i < len; i++) {
+          if (nullBitSet.get(i)) {
+            vector.putNull(vectorOffset);
+          } else {
+            vector.putFloat(vectorOffset, dataChunk.getFloat(i));
+          }
+          vectorOffset++;
+        }
+      }
+    }
+
+    @Override
+    public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullBits();
+      if (nullBitSet.isEmpty()) {
+        for (int i = offset; i < len; i++) {
+          int currentRow = filteredRowId[i];
+          vector.putFloat(vectorOffset, dataChunk.getFloat(currentRow));
+          vectorOffset++;
+        }
+      } else {
+        for (int i = offset; i < len; i++) {
+          int currentRow = filteredRowId[i];
+          if (nullBitSet.get(currentRow)) {
+            vector.putNull(vectorOffset);
+          } else {
+            vector.putFloat(vectorOffset, dataChunk.getFloat(currentRow));
+          }
+          vectorOffset++;
+        }
+      }
+    }
+  }
+
+  public static class ByteMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullBits();
+      if (nullBitSet.isEmpty()) {
+        for (int i = offset; i < len; i++) {
+          vector.putByte(vectorOffset, dataChunk.getByte(i));
+          vectorOffset++;
+        }
+      } else {
+        for (int i = offset; i < len; i++) {
+          if (nullBitSet.get(i)) {
+            vector.putNull(vectorOffset);
+          } else {
+            vector.putByte(vectorOffset, dataChunk.getByte(i));
+          }
+          vectorOffset++;
+        }
+      }
+    }
+
+    @Override
+    public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullBits();
+      if (nullBitSet.isEmpty()) {
+        for (int i = offset; i < len; i++) {
+          int currentRow = filteredRowId[i];
+          vector.putByte(vectorOffset, dataChunk.getByte(currentRow));
+          vectorOffset++;
+        }
+      } else {
+        for (int i = offset; i < len; i++) {
+          int currentRow = filteredRowId[i];
+          if (nullBitSet.get(currentRow)) {
+            vector.putNull(vectorOffset);
+          } else {
+            vector.putByte(vectorOffset, dataChunk.getByte(currentRow));
+          }
+          vectorOffset++;
+        }
+      }
+    }
+  }
 
   public static class DefaultMeasureVectorFiller implements MeasureVectorFiller {
 
@@ -370,6 +477,10 @@ public class MeasureDataVectorProcessor {
         return new LongMeasureVectorFiller();
       } else if (DataTypes.isDecimal(dataType)) {
         return new DecimalMeasureVectorFiller();
+      } else if (dataType == DataTypes.FLOAT) {
+        return new FloatMeasureVectorFiller();
+      } else if (dataType == DataTypes.BYTE) {
+        return new ByteMeasureVectorFiller();
       } else {
         return new DefaultMeasureVectorFiller();
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index cc6ddfc..f8f663f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -150,6 +150,10 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
     bytes[rowId] = value;
   }
 
+  @Override public void putByte(int rowId, byte value) {
+    byteArr[rowId] = value;
+  }
+
   @Override public void putBytes(int rowId, int count, byte[] value) {
     for (int i = 0; i < count; ++i) {
       bytes[i + rowId] = value;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 702aded..596d1dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -37,10 +37,10 @@ public final class ByteUtil {
 
   public static final int SIZEOF_INT = 4;
 
-  public static final int SIZEOF_LONG = 8;
-
   public static final int SIZEOF_FLOAT = 4;
 
+  public static final int SIZEOF_LONG = 8;
+
   public static final int SIZEOF_DOUBLE = 8;
 
   public static final String UTF8_CSN = StandardCharsets.UTF_8.name();
@@ -707,6 +707,10 @@ public final class ByteUtil {
     return toXorBytes(Double.doubleToLongBits(val));
   }
 
+  public static byte[] toXorBytes(float val) {
+    return toXorBytes(Float.floatToIntBits(val));
+  }
+
   /**
    * The following methods convert byte array back to the real value.
    */
@@ -725,4 +729,8 @@ public final class ByteUtil {
   public static double toXorDouble(byte[] value, int offset, int length) {
     return Double.longBitsToDouble(toXorLong(value, offset, length));
   }
+
+  public static float toXorFloat(byte[] value, int offset, int length) {
+    return Float.intBitsToFloat(toXorInt(value, offset, length));
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java
index 00e7dee..28cec5f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUnsafeUtil.java
@@ -84,7 +84,7 @@ public class CarbonUnsafeUtil {
     } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.DOUBLE) {
       data = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
     } else if (dataType == DataTypes.FLOAT) {
-      data = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+      data = CarbonUnsafe.getUnsafe().getFloat(baseObject, address + size);
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       CarbonUnsafe.getUnsafe()
           .copyMemory(baseObject, address + size, data, CarbonUnsafe.BYTE_ARRAY_OFFSET,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 9ab875c..5a85b14 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2211,6 +2211,10 @@ public final class CarbonUtil {
         return DataTypes.createDefaultMapType();
       case VARCHAR:
         return DataTypes.VARCHAR;
+      case FLOAT:
+        return DataTypes.FLOAT;
+      case BYTE:
+        return DataTypes.BYTE;
       default:
         return DataTypes.STRING;
     }
@@ -2252,7 +2256,7 @@ public final class CarbonUtil {
       LOGGER.error("CarbonData file is not present in the table location");
       throw new IOException("CarbonData file is not present in the table location");
     }
-    CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(fistFilePath);
+    CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(fistFilePath, configuration);
     List<ColumnSchema> columnSchemaList = carbonHeaderReader.readSchema();
     // only columnSchema is the valid entry, reset all dummy entries.
     TableSchema tableSchema = getDummyTableSchema(tableName,columnSchemaList);
@@ -2392,6 +2396,11 @@ public final class CarbonUtil {
       b.putDouble((double) value);
       b.flip();
       return b.array();
+    } else if (dataType == DataTypes.FLOAT) {
+      b = ByteBuffer.allocate(8);
+      b.putFloat((float) value);
+      b.flip();
+      return b.array();
     } else if (DataTypes.isDecimal(dataType)) {
       return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
     } else if (dataType == DataTypes.BYTE_ARRAY) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 612e17c..4059316 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -113,6 +113,10 @@ public final class DataTypeUtil {
       return Integer.parseInt(msrValue);
     } else if (dataType == DataTypes.LONG) {
       return Long.valueOf(msrValue);
+    } else if (dataType == DataTypes.FLOAT) {
+      return Float.parseFloat(msrValue);
+    } else if (dataType == DataTypes.BYTE) {
+      return Byte.parseByte(msrValue);
     } else {
       Double parsedValue = Double.valueOf(msrValue);
       if (Double.isInfinite(parsedValue) || Double.isNaN(parsedValue)) {
@@ -135,6 +139,10 @@ public final class DataTypeUtil {
       return (int) bb.getLong();
     } else if (dataType == DataTypes.LONG) {
       return bb.getLong();
+    } else if (dataType == DataTypes.FLOAT) {
+      return bb.getFloat();
+    } else if (dataType == DataTypes.BYTE) {
+      return bb.get();
     } else if (DataTypes.isDecimal(dataType)) {
       return byteToBigDecimal(data);
     } else {
@@ -152,6 +160,10 @@ public final class DataTypeUtil {
       return (int) measurePage.getLong(index);
     } else if (dataType == DataTypes.LONG) {
       return measurePage.getLong(index);
+    } else if (dataType == DataTypes.FLOAT) {
+      return measurePage.getFloat(index);
+    } else if (dataType == DataTypes.BYTE) {
+      return measurePage.getByte(index);
     } else if (DataTypes.isDecimal(dataType)) {
       BigDecimal bigDecimalMsrValue = measurePage.getDecimal(index);
       if (null != bigDecimalMsrValue && carbonMeasure.getScale() > bigDecimalMsrValue.scale()) {
@@ -331,6 +343,10 @@ public final class DataTypeUtil {
       return ByteUtil.toXorBytes(Long.parseLong(dimensionValue));
     } else if (actualDataType == DataTypes.DOUBLE) {
       return ByteUtil.toXorBytes(Double.parseDouble(dimensionValue));
+    } else if (actualDataType == DataTypes.FLOAT) {
+      return ByteUtil.toXorBytes(Float.parseFloat(dimensionValue));
+    } else if (actualDataType == DataTypes.BYTE) {
+      return new byte[] { Byte.parseByte(dimensionValue) };
     } else if (DataTypes.isDecimal(actualDataType)) {
       return bigDecimalToByte(new BigDecimal(dimensionValue));
     } else if (actualDataType == DataTypes.TIMESTAMP) {
@@ -491,6 +507,8 @@ public final class DataTypeUtil {
     try {
       if (actualDataType == DataTypes.BOOLEAN) {
         return ByteUtil.toBoolean(dataInBytes);
+      } else if (actualDataType == DataTypes.BYTE) {
+        return dataInBytes[0];
       } else if (actualDataType == DataTypes.SHORT) {
         // for non string type no dictionary column empty byte array is empty value
         // so no need to parse
@@ -522,6 +540,11 @@ public final class DataTypeUtil {
           return null;
         }
         return ByteUtil.toXorDouble(dataInBytes, 0, dataInBytes.length);
+      } else if (actualDataType == DataTypes.FLOAT) {
+        if (isEmptyByteArray(dataInBytes)) {
+          return null;
+        }
+        return ByteUtil.toXorFloat(dataInBytes, 0, dataInBytes.length);
       } else if (DataTypes.isDecimal(actualDataType)) {
         if (isEmptyByteArray(dataInBytes)) {
           return null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java b/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java
index c162396..f4e0adb 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java
@@ -34,6 +34,8 @@ public final class Comparator {
       return new ShortSerializableComparator();
     } else if (dataType == DataTypes.DOUBLE) {
       return new DoubleSerializableComparator();
+    } else if (dataType == DataTypes.FLOAT) {
+      return new FloatSerializableComparator();
     } else if (dataType == DataTypes.LONG || dataType == DataTypes.DATE
         || dataType == DataTypes.TIMESTAMP) {
       return new LongSerializableComparator();
@@ -61,8 +63,12 @@ public final class Comparator {
       return new LongSerializableComparator();
     } else if (dataType == DataTypes.DOUBLE) {
       return new DoubleSerializableComparator();
+    } else if (dataType == DataTypes.FLOAT) {
+      return new FloatSerializableComparator();
     } else if (DataTypes.isDecimal(dataType)) {
       return new BigDecimalSerializableComparator();
+    } else if (dataType == DataTypes.BYTE) {
+      return new ByteArraySerializableComparator();
     } else {
       throw new IllegalArgumentException("Unsupported data type");
     }
@@ -146,6 +152,19 @@ class DoubleSerializableComparator implements SerializableComparator {
   }
 }
 
+class FloatSerializableComparator implements SerializableComparator {
+  @Override public int compare(Object key1, Object key2) {
+    if (key1 == null && key2 == null) {
+      return 0;
+    } else if (key1 == null) {
+      return -1;
+    } else if (key2 == null) {
+      return 1;
+    }
+    return ((Float) key1).compareTo((Float) key2);
+  }
+}
+
 class LongSerializableComparator implements SerializableComparator {
   @Override public int compare(Object key1, Object key2) {
     if (key1 == null && key2 == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/format/src/main/thrift/schema.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index f161a17..d39e548 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -37,6 +37,8 @@ enum DataType {
 	STRUCT = 21,
 	VARCHAR = 22,
 	MAP = 23,
+	FLOAT = 24,
+	BYTE = 25
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
index 8ad6e62..b843709 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
@@ -144,6 +144,12 @@ public class CarbonColumnVectorWrapper implements CarbonColumnVector {
     }
   }
 
+  @Override public void putByte(int rowId, byte value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putByte(counter++, value);
+    }
+  }
+
   @Override public void putBytes(int rowId, byte[] value) {
     if (!filteredRows[rowId]) {
       columnVector.putBytes(counter++, value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 643471c..b80a2f2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -1460,9 +1460,9 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
          |'$writerPath' """.stripMargin)
 
     checkAnswer(sql("select * from sdkOutputTable"), Seq(
-      Row("bob", 10.24, Row("abc","bang")),
-      Row("bob", 10.24, Row("abc","bang")),
-      Row("bob", 10.24, Row("abc","bang"))))
+      Row("bob", 10.24f, Row("abc","bang")),
+      Row("bob", 10.24f, Row("abc","bang")),
+      Row("bob", 10.24f, Row("abc","bang"))))
 
     sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index bc5a387..a0938da 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -162,6 +162,12 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     }
   }
 
+  @Override public void putByte(int rowId, byte value) {
+    if (!filteredRows[rowId]) {
+      sparkColumnVectorProxy.putByte(counter++, value, ordinal);
+    }
+  }
+
   @Override public void putBytes(int rowId, byte[] value) {
     if (!filteredRows[rowId]) {
       sparkColumnVectorProxy.putByteArray(counter++, value, ordinal);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index fa37205..67ea497 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -260,8 +260,9 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
     for (int i = 0; i < queryMeasures.size(); i++) {
       ProjectionMeasure msr = queryMeasures.get(i);
       DataType dataType = msr.getMeasure().getDataType();
-      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT ||
-          dataType == DataTypes.INT || dataType == DataTypes.LONG) {
+      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT
+          || dataType == DataTypes.LONG || dataType == DataTypes.FLOAT
+          || dataType == DataTypes.BYTE) {
         fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
             CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
             null);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
index 73c07b4..337b13b 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -58,7 +58,9 @@ object CarbonSparkDataSourceUtil {
         case CarbonDataTypes.SHORT => ShortType
         case CarbonDataTypes.INT => IntegerType
         case CarbonDataTypes.LONG => LongType
+        case CarbonDataTypes.BYTE => ByteType
         case CarbonDataTypes.DOUBLE => DoubleType
+        case CarbonDataTypes.FLOAT => FloatType
         case CarbonDataTypes.BOOLEAN => BooleanType
         case CarbonDataTypes.TIMESTAMP => TimestampType
         case CarbonDataTypes.DATE => DateType
@@ -78,6 +80,7 @@ object CarbonSparkDataSourceUtil {
       case LongType => CarbonDataTypes.LONG
       case DoubleType => CarbonDataTypes.DOUBLE
       case FloatType => CarbonDataTypes.FLOAT
+      case ByteType => CarbonDataTypes.BYTE
       case DateType => CarbonDataTypes.DATE
       case BooleanType => CarbonDataTypes.BOOLEAN
       case TimestampType => CarbonDataTypes.TIMESTAMP

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
index 1138a29..cb07e04 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
@@ -80,6 +80,8 @@ private[spark] object SparkTypeConverter {
         case CarbonDataTypes.INT => IntegerType
         case CarbonDataTypes.LONG => LongType
         case CarbonDataTypes.DOUBLE => DoubleType
+        case CarbonDataTypes.FLOAT => FloatType
+        case CarbonDataTypes.BYTE => ByteType
         case CarbonDataTypes.BOOLEAN => BooleanType
         case CarbonDataTypes.TIMESTAMP => TimestampType
         case CarbonDataTypes.DATE => DateType

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 727191c..18423f2 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -19,17 +19,19 @@ package org.apache.spark.sql.carbondata.datasource
 
 import java.io.File
 import java.util
+import java.util.Arrays
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.carbondata.datasource.TestUtil._
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
 import org.apache.carbondata.hadoop.testutil.StoreCreator
 import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
 
@@ -877,9 +879,207 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
   }
 
+  test("test Float data type by giving schema explicitly and desc formatted") {
+    spark.sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
+    buildTestDataOtherDataType(5, Array("age", "address"), warehouse1+"/sdk1")
+    spark.sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+              s"string," +
+              s"salary long, floatField float, bytefield byte) using carbon options " +
+              s"(path='$warehouse1/sdk1')")
+    assert(spark.sql("desc formatted sdkout").collect().take(7).reverse.head.get(1).equals("float"))
+    assert(spark.sql("desc formatted sdkout").collect().take(8).reverse.head.get(1).equals
+    ("tinyint"))
+  }
+
+  test("test select * on table with float data type") {
+    spark.sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
+    buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+    spark.sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+              s"string," +
+              s"salary long, floatField float, bytefield byte) using carbon options (path='$warehouse1/sdk1')")
+    checkAnswer(spark.sql("select * from par_table"), spark.sql("select * from sdkout"))
+    checkAnswer(spark.sql("select floatfield from par_table"), spark.sql("select floatfield from sdkout"))
+  }
+
+  test("test various filters on float data") {
+    spark.sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
+    buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+    spark.sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+              s"string," +
+              s"salary long, floatField float, bytefield byte) using carbon options (path='$warehouse1/sdk1')")
+    checkAnswer(spark.sql("select * from par_table where floatfield < 10"),
+      spark.sql("select * from sdkout where floatfield < 10"))
+    checkAnswer(spark.sql("select * from par_table where floatfield > 5.3"),
+      spark.sql("select * from sdkout where floatfield > 5.3"))
+    checkAnswer(spark.sql("select * from par_table where floatfield >= 4.1"),
+      spark.sql("select * from sdkout where floatfield >= 4.1"))
+    checkAnswer(spark.sql("select * from par_table where floatfield != 5.5"),
+      spark.sql("select * from sdkout where floatfield != 5.5"))
+    checkAnswer(spark.sql("select * from par_table where floatfield <= 5"),
+      spark.sql("select * from sdkout where floatfield <= 5"))
+    checkAnswer(spark.sql("select * from par_table where floatfield >= 5"),
+      spark.sql("select * from sdkout where floatfield >= 5"))
+    checkAnswer(spark.sql("select * from par_table where floatfield IN ('5.5','6.6')"),
+      spark.sql("select * from sdkout where floatfield IN ('5.5','6.6')"))
+    checkAnswer(spark.sql("select * from par_table where floatfield NOT IN ('5.5','6.6')"),
+      spark.sql("select * from sdkout where floatfield NOT IN ('5.5','6.6')"))
+    checkAnswer(spark.sql("select * from par_table where floatfield = cast('6.6' as float)"),
+      spark.sql("select * from sdkout where floatfield = cast('6.6' as float)"))
+  }
+
+  test("test select * on table with byte data type") {
+    spark.sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
+    buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+    spark.sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+              s"string," +
+              s"salary long, floatField float, bytefield byte) using carbon options " +
+              s"(path='$warehouse1/sdk1')")
+    checkAnswer(spark.sql("select * from par_table"), spark.sql("select * from sdkout"))
+    checkAnswer(spark.sql("select byteField from par_table"), spark.sql("select bytefield from sdkout"))
+  }
+
+  test("test various filters on byte data") {
+    spark.sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
+    buildTestDataOtherDataType(11, Array("age", "address"), warehouse1 + "/sdk1")
+    spark.sql(s"create table sdkout(male boolean, age int, height double, name string, address " +
+              s"string," +
+              s"salary long, floatField float, bytefield byte) using carbon options " +
+              s"(path='$warehouse1/sdk1')")
+    checkAnswer(spark.sql("select * from par_table where bytefield < 10"),
+      spark.sql("select * from sdkout where bytefield < 10"))
+    checkAnswer(spark.sql("select * from par_table where bytefield > 5"),
+      spark.sql("select * from sdkout where bytefield > 5"))
+    checkAnswer(spark.sql("select * from par_table where bytefield >= 4"),
+      spark.sql("select * from sdkout where bytefield >= 4"))
+    checkAnswer(spark.sql("select * from par_table where bytefield != 5"),
+      spark.sql("select * from sdkout where bytefield != 5"))
+    checkAnswer(spark.sql("select * from par_table where bytefield <= 5"),
+      spark.sql("select * from sdkout where bytefield <= 5"))
+    checkAnswer(spark.sql("select * from par_table where bytefield >= 5"),
+      spark.sql("select * from sdkout where bytefield >= 5"))
+    checkAnswer(spark.sql("select * from par_table where bytefield IN ('5','6')"),
+      spark.sql("select * from sdkout where bytefield IN ('5','6')"))
+    checkAnswer(spark.sql("select * from par_table where bytefield NOT IN ('5','6')"),
+      spark.sql("select * from sdkout where bytefield NOT IN ('5','6')"))
+  }
+
+  test("test struct of float type and byte type") {
+    import scala.collection.JavaConverters._
+    val path = new File(warehouse1+"/sdk1").getAbsolutePath
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
+    spark.sql("drop table if exists complextable")
+    val fields = List(new StructField
+    ("byteField", DataTypes.BYTE), new StructField("floatField", DataTypes.FLOAT))
+    val structType = Array(new Field("stringfield", DataTypes.STRING), new Field
+    ("structField", "struct", fields.asJava))
+
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(path)
+          .isTransactionalTable(false)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
+          .buildWriterForCSVInput(new Schema(structType), spark.sparkContext
+            .hadoopConfiguration)
+
+      var i = 0
+      while (i < 11) {
+        val array = Array[String](s"name$i", s"$i" + "$" +s"$i.${i}12")
+        writer.write(array)
+        i += 1
+      }
+      writer.close()
+      spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " +
+                "byte, floatfield: float>) " +
+                s"using carbon location '$path'")
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+      case _ => None
+    }
+    checkAnswer(spark.sql("select * from complextable limit 1"), Seq(Row("name0", Row(0
+      .asInstanceOf[Byte], 0.012.asInstanceOf[Float]))))
+    checkAnswer(spark.sql("select * from complextable where structfield.bytefield > 9"), Seq(Row
+    ("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+    checkAnswer(spark.sql("select * from complextable where structfield.bytefield > 9"), Seq(Row
+    ("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+    checkAnswer(spark.sql("select * from complextable where structfield.floatfield > 9.912"), Seq
+    (Row
+    ("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+    checkAnswer(spark.sql("select * from complextable where structfield.floatfield > 9.912 and " +
+                          "structfield.bytefield < 11"), Seq(Row("name10", Row(10.asInstanceOf[Byte], 10.1012.asInstanceOf[Float]))))
+  }
+
+  test("test array of float type and byte type") {
+    import scala.collection.JavaConverters._
+    val path = new File(warehouse1+"/sdk1").getAbsolutePath
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
+    spark.sql("drop table if exists complextable")
+    val structType =
+      Array(new Field("stringfield", DataTypes.STRING),
+        new Field("bytearray", "array", List(new StructField("byteField", DataTypes.BYTE))
+          .asJava),
+        new Field("floatarray", "array", List(new StructField("floatfield", DataTypes.FLOAT))
+          .asJava))
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(path)
+          .isTransactionalTable(false)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
+          .buildWriterForCSVInput(new Schema(structType), spark.sparkContext
+            .hadoopConfiguration)
+
+      var i = 0
+      while (i < 10) {
+        val array = Array[String](s"name$i",s"$i" + "$" + s"${i*2}", s"${i/2}" + "$" + s"${i/3}")
+        writer.write(array)
+        i += 1
+      }
+      writer.close()
+      spark.sql(s"create table complextable (stringfield string, bytearray " +
+                s"array<byte>, floatarray array<float>) using carbon " +
+                s"location " +
+                s"'$path'")
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+      case _ => None
+    }
+    checkAnswer(spark.sql("select * from complextable limit 1"), Seq(Row("name0", mutable
+      .WrappedArray.make(Array[Byte](0, 0)), mutable.WrappedArray.make(Array[Float](0.0f, 0.0f)))))
+    checkAnswer(spark.sql("select * from complextable where bytearray[0] = 1"), Seq(Row("name1",
+      mutable.WrappedArray.make(Array[Byte](1, 2)), mutable.WrappedArray.make(Array[Float](0.0f,
+        0.0f)))))
+    checkAnswer(spark.sql("select * from complextable where bytearray[0] > 8"), Seq(Row("name9",
+      mutable.WrappedArray.make(Array[Byte](9, 18)), mutable.WrappedArray.make(Array[Float](4.0f,
+        3.0f)))))
+    checkAnswer(spark.sql("select * from complextable where floatarray[0] IN (4.0) and stringfield = 'name8'"), Seq(Row
+    ("name8",
+      mutable.WrappedArray.make(Array[Byte](8, 16)), mutable.WrappedArray.make(Array[Float](4.0f,
+      2.0f)))))
+  }
+
+  private def createParquetTable {
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
+    spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " +
+              s"string," +
+              s"salary long, floatField float, bytefield byte) using parquet location " +
+              s"'$warehouse1/../warehouse2'")
+    (0 to 10).foreach {
+      i => spark.sql(s"insert into par_table select 'true','$i', ${i.toDouble / 2}, 'name$i', " +
+                     s"'address$i', ${i*100}, $i.$i, '$i'")
+    }
+  }
+
   // prepare sdk writer output with other schema
   def buildTestDataOtherDataType(rows: Int, sortColumns: Array[String], writerPath: String, colCount: Int = -1): Any = {
-    var fields: Array[Field] = new Array[Field](6)
+    var fields: Array[Field] = new Array[Field](8)
     // same column name, but name as boolean type
     fields(0) = new Field("male", DataTypes.BOOLEAN)
     fields(1) = new Field("age", DataTypes.INT)
@@ -887,6 +1087,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     fields(3) = new Field("name", DataTypes.STRING)
     fields(4) = new Field("address", DataTypes.STRING)
     fields(5) = new Field("salary", DataTypes.LONG)
+    fields(6) = new Field("floatField", DataTypes.FLOAT)
+    fields(7) = new Field("bytefield", DataTypes.BYTE)
 
     if (colCount > 0) {
       val fieldsToWrite: Array[Field] = new Array[Field](colCount)
@@ -913,7 +1115,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
           String.valueOf(i.toDouble / 2),
           "name" + i,
           "address" + i,
-          (i * 100).toString)
+          (i * 100).toString,
+          s"$i.$i", s"$i")
         if (colCount > 0) {
           writer.write(array.slice(0, colCount))
         } else {
@@ -927,8 +1130,10 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
       case _ => None
     }
   }
+
   override protected def beforeAll(): Unit = {
     drop
+    createParquetTable
   }
 
   override def afterAll(): Unit = {
@@ -939,5 +1144,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     spark.sql("drop table if exists testformat")
     spark.sql("drop table if exists carbon_table")
     spark.sql("drop table if exists testparquet")
+    spark.sql("drop table if exists par_table")
+    spark.sql("drop table if exists sdkout")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edfcdca0/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index e6d4d48..9ccc02c 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -353,10 +353,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
         s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
            |'$writerPath' """.stripMargin)
     }
-    val result=checkAnswer(spark.sql("select count(*) from sdkOutputTable"),Seq(Row(800000)))
-    if(result.isDefined){
-      assert(false,result.get)
-    }
+    checkAnswer(spark.sql("select count(*) from sdkOutputTable"),Seq(Row(800000)))
     checkAnswer(spark
       .sql(
         "select count(*) from sdkOutputTable where from_email='Email for testing min max for " +