You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2021/07/29 11:21:03 UTC

[carbondata] branch master updated: [CARBONDATA-4247][CARBONDATA-4241] Fix Wrong timestamp value query results for data before 1900 years with Spark 3.1

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new feb0521  [CARBONDATA-4247][CARBONDATA-4241] Fix Wrong timestamp value query results for data before 1900 years with Spark 3.1
feb0521 is described below

commit feb052173b5c834dfa3632b01df275ed061aea1d
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Tue Jul 20 13:42:57 2021 +0530

    [CARBONDATA-4247][CARBONDATA-4241] Fix Wrong timestamp value query results for data before
    1900 years with Spark 3.1
    
    Why is this PR needed?
    1. Spark 3.1, will store timestamp value as julian micros and rebase timestamp value from
    JulianToGregorianMicros during query.
    -> Since carbon parse and formats timestamp value with SimpleDateFormatter, query gives
    incorrect results, when rebased with JulianToGregorianMicros by spark.
    2. CARBONDATA-4241 -> Global sort load and compaction fails on table having timestamp column
    
    What changes were proposed in this PR?
    1. Use Java Instant to parse new timestamp values. For old stores and query with Spark 3.1,
    Rebase the timestamp value from Julian to Gregorian Micros
    2. If timestamp value is of type Instant, then convert value to java timestamp.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No (Existing testcase is sufficient)
    
    This closes #4177
---
 .../core/constants/CarbonCommonConstants.java      |  22 ++++
 .../core/datastore/block/AbstractIndex.java        |   7 ++
 .../core/datastore/block/TableBlockInfo.java       |  15 +++
 .../dimension/v3/DimensionChunkReaderV3.java       |   1 +
 .../reader/measure/v3/MeasureChunkReaderV3.java    |   1 +
 .../indexstore/blockletindex/IndexWrapper.java     |   2 -
 .../core/metadata/blocklet/DataFileFooter.java     |  13 +++
 .../impl/DictionaryBasedVectorResultCollector.java |   2 +
 .../scan/executor/impl/AbstractQueryExecutor.java  |   1 +
 .../core/scan/processor/DataBlockIterator.java     |   3 +
 .../carbondata/core/scan/result/RowBatch.java      |  13 +++
 .../AbstractDetailQueryResultIterator.java         |   4 +
 .../scan/result/iterator/ChunkRowIterator.java     |   4 +
 .../result/iterator/DetailQueryResultIterator.java |   1 +
 .../scan/result/vector/CarbonColumnVector.java     |   2 +
 .../scan/result/vector/CarbonColumnarBatch.java    |  13 +++
 .../core/scan/result/vector/ColumnVectorInfo.java  |   1 +
 .../result/vector/impl/CarbonColumnVectorImpl.java |   5 +
 ...ColumnarVectorWrapperDirectWithDeleteDelta.java |   5 +
 ...lumnarVectorWrapperDirectWithInvertedIndex.java |   5 +
 .../core/util/DataFileFooterConverterV3.java       |   5 +
 .../apache/carbondata/core/util/DataTypeUtil.java  | 113 +++++++++++++++++++++
 .../carbondata/hadoop/CarbonRecordReader.java      |   5 +
 .../presto/ColumnarVectorWrapperDirect.java        |   5 +
 .../org/apache/spark/sql/SparkVersionAdapter.scala |   6 ++
 .../spark/vectorreader/ColumnarVectorWrapper.java  |  11 +-
 .../vectorreader/ColumnarVectorWrapperDirect.java  |  11 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |  54 +++++++++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |   4 +
 .../org/apache/spark/sql/SparkVersionAdapter.scala |  21 +++-
 .../TimestampNoDictionaryColumnTestCase.scala      |  56 ++++------
 .../streaming/parser/FieldConverter.scala          |   3 +-
 .../SparkStreamingUtil.scala                       |   6 ++
 .../carbondata/util/SparkStreamingUtil.scala       |  13 +++
 34 files changed, 392 insertions(+), 41 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 597cfcc..97fb70a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -123,6 +123,16 @@ public final class CarbonCommonConstants {
   public static final String CARBON_TIMESTAMP_MILLIS = "dd-MM-yyyy HH:mm:ss:SSS";
 
   /**
+   * CARBON Default format - time segment
+   */
+  public static final String CARBON_TIME_SEGMENT_DEFAULT_FORMAT = " HH:mm:ss";
+
+  /**
+   * CARBON Default data - time segment
+   */
+  public static final String CARBON_TIME_SEGMENT_DATA_DEFAULT_FORMAT = " 00:00:00";
+
+  /**
    * Property for specifying the format of DATE data type column.
    * e.g. yyyy/MM/dd , or using default value
    */
@@ -2648,4 +2658,16 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_SDK_EMPTY_METADATA_PATH = "emptyMetadataFolder";
 
+  /**
+   * Property to identify if the spark version is above 3.x version
+   */
+  public static final String CARBON_SPARK_VERSION_SPARK3 = "carbon.spark.version.spark3";
+
+  public static final String CARBON_SPARK_VERSION_SPARK3_DEFAULT = "false";
+
+  /**
+   * Carbon Spark 3.x supported data file written version
+   */
+  public static final String CARBON_SPARK3_VERSION = "2.2.0";
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
index 3f9d310..d3b0a92 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.block;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -51,6 +52,12 @@ public abstract class AbstractIndex implements Cacheable {
    */
   private long deleteDeltaTimestamp;
 
+  public List<TableBlockInfo> getBlockInfos() {
+    return blockInfos;
+  }
+
+  protected List<TableBlockInfo> blockInfos;
+
   /**
    * map of blockletIdAndPageId to deleted rows
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 168bbf0..af9bc4a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -88,6 +88,11 @@ public class TableBlockInfo implements Distributable, Serializable {
   private transient DataFileFooter dataFileFooter;
 
   /**
+   * Carbon Data file written version
+   */
+  private String carbonDataFileWrittenVersion = null;
+
+  /**
    * comparator to sort by block size in descending order.
    * Since each line is not exactly the same, the size of a InputSplit may differs,
    * so we allow some deviation for these splits.
@@ -132,6 +137,7 @@ public class TableBlockInfo implements Distributable, Serializable {
     info.deletedDeltaFilePath = deletedDeltaFilePath;
     info.detailInfo = detailInfo.copy();
     info.indexWriterPath = indexWriterPath;
+    info.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
     return info;
   }
 
@@ -353,4 +359,13 @@ public class TableBlockInfo implements Distributable, Serializable {
     sb.append('}');
     return sb.toString();
   }
+
+  public String getCarbonDataFileWrittenVersion() {
+    return carbonDataFileWrittenVersion;
+  }
+
+  public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
+    this.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
+  }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java
index a5463e1..b42e0b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/DimensionChunkReaderV3.java
@@ -256,6 +256,7 @@ public class DimensionChunkReaderV3 extends AbstractDimensionChunkReader {
     if (vectorInfo != null) {
       // set encodings of current page in the vectorInfo, used for decoding the complex child page
       vectorInfo.encodings = encodings;
+      vectorInfo.vector.setCarbonDataFileWrittenVersion(vectorInfo.carbonDataFileWrittenVersion);
       decoder
           .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
               nullBitSet, isLocalDictEncodedPage, pageMetadata.numberOfRowsInpage,
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/MeasureChunkReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/MeasureChunkReaderV3.java
index 7ad92e5..81e5cda 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/MeasureChunkReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/MeasureChunkReaderV3.java
@@ -245,6 +245,7 @@ public class MeasureChunkReaderV3 extends AbstractMeasureChunkReader {
     ColumnPageDecoder codec =
         encodingFactory.createDecoder(encodings, encoderMetas, compressorName, vectorInfo != null);
     if (vectorInfo != null) {
+      vectorInfo.vector.setCarbonDataFileWrittenVersion(vectorInfo.carbonDataFileWrittenVersion);
       codec.decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
           nullBitSet, false, pageMetadata.numberOfRowsInpage, reusableDataBuffer);
       return null;
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
index 09c0148..31e2898 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -31,8 +31,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
  */
 public class IndexWrapper extends AbstractIndex {
 
-  private List<TableBlockInfo> blockInfos;
-
   public IndexWrapper(List<TableBlockInfo> blockInfos, SegmentProperties segmentProperties) {
     this.blockInfos = blockInfos;
     this.segmentProperties = segmentProperties;
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
index 8bff65f..3ddd5e6 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
@@ -76,6 +76,11 @@ public class DataFileFooter implements Serializable {
   private Boolean isSorted = true;
 
   /**
+   * carbon data file written version
+   */
+  private String carbonDataFileWrittenVersion = null;
+
+  /**
    * @return the versionId
    */
   public ColumnarFormatVersion getVersionId() {
@@ -174,4 +179,12 @@ public class DataFileFooter implements Serializable {
   public Boolean isSorted() {
     return isSorted;
   }
+
+  public String getCarbonDataFileWrittenVersion() {
+    return carbonDataFileWrittenVersion;
+  }
+
+  public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
+    this.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 418a0f8..ef133e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -259,6 +259,8 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
       BitSet deltaBitSet) {
     for (int i = 0; i < allColumnInfo.length; i++) {
       allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
+      allColumnInfo[i].carbonDataFileWrittenVersion =
+          columnarBatch.getCarbonDataFileWrittenVersion();
       allColumnInfo[i].vector = columnarBatch.columnVectors[i];
       allColumnInfo[i].deletedRows = deltaBitSet;
       if (null != allColumnInfo[i].dimension) {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 72e3b31..459b2c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -224,6 +224,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
           if (null == blockletDetailInfo) {
             blockletDetailInfo = QueryUtil.getBlockletDetailInfo(fileFooter, blockInfo);
           }
+          blockInfo.setCarbonDataFileWrittenVersion(fileFooter.getCarbonDataFileWrittenVersion());
           blockInfo.setDetailInfo(blockletDetailInfo);
         }
         if (null == segmentProperties) {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
index f570c65..08ce17e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
@@ -244,6 +244,9 @@ public class DataBlockIterator extends CarbonIterator<List<Object[]>> {
   }
 
   public void processNextBatch(CarbonColumnarBatch columnarBatch) {
+    columnarBatch.setCarbonDataFileWrittenVersion(
+        this.blockExecutionInfo.getDataBlock().getDataRefNode().getTableBlockInfo()
+            .getCarbonDataFileWrittenVersion());
     if (updateScanner()) {
       this.scannerResultAggregator.collectResultInColumnarBatch(scannedResult, columnarBatch);
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
index e22a5c8..ef61b65 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
@@ -38,6 +38,11 @@ public class RowBatch extends CarbonIterator<Object[]> {
    */
   protected int counter;
 
+  /**
+   * Carbon data file written version
+   */
+  private String carbonDataFileWrittenVersion = null;
+
   public RowBatch() {
     this.rows = new ArrayList<>();
   }
@@ -115,4 +120,12 @@ public class RowBatch extends CarbonIterator<Object[]> {
     counter = counter + rows.size();
     return rows;
   }
+
+  public String getCarbonDataFileWrittenVersion() {
+    return carbonDataFileWrittenVersion;
+  }
+
+  public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
+    this.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 9ce5b0a..843e382 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -87,6 +87,8 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
    */
   private QueryStatisticsModel queryStatisticsModel;
 
+  String carbonDataFileWrittenVersion;
+
   AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
       ExecutorService execService) {
     batchSize = CarbonProperties.getQueryBatchSize();
@@ -222,6 +224,8 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
     }
     if (blockExecutionInfos.size() > 0) {
       BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
+      carbonDataFileWrittenVersion = executionInfo.getDataBlock().getDataRefNode()
+          .getTableBlockInfo().getCarbonDataFileWrittenVersion();
       blockExecutionInfos.remove(executionInfo);
       return new DataBlockIterator(executionInfo, fileReader, batchSize, queryStatisticsModel,
           execService);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
index 1adb7e5..9760622 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
@@ -69,6 +69,10 @@ public class ChunkRowIterator extends CarbonIterator<Object[]> {
     return currentChunk.next();
   }
 
+  public String getCarbonDataFileWrittenVersion() {
+    return currentChunk.getCarbonDataFileWrittenVersion();
+  }
+
   /**
    * read next batch
    *
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
index 34ec7b9..aeeee2b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
@@ -56,6 +56,7 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator
       updateDataBlockIterator();
       if (dataBlockIterator != null) {
         rowBatch.setRows(dataBlockIterator.next());
+        rowBatch.setCarbonDataFileWrittenVersion(carbonDataFileWrittenVersion);
       }
     }
     return rowBatch;
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index a3b9827..d5cbff5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -116,6 +116,8 @@ public interface CarbonColumnVector {
 
   void setLazyPage(LazyPageLoader lazyPage);
 
+  void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion);
+
   // Added default implementation for interface,
   // to avoid implementing presto required functions for spark or core module.
   default List<CarbonColumnVector> getChildrenVector() {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
index 1019c35..e64c320 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
@@ -33,6 +33,11 @@ public class CarbonColumnarBatch {
 
   private int rowsFiltered;
 
+  /**
+   * Carbon Data file written version
+   */
+  private String carbonDataFileWrittenVersion = null;
+
   public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int batchSize,
       boolean[] filteredRows) {
     this.columnVectors = columnVectors;
@@ -92,4 +97,12 @@ public class CarbonColumnarBatch {
       }
     }
   }
+
+  public String getCarbonDataFileWrittenVersion() {
+    return carbonDataFileWrittenVersion;
+  }
+
+  public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
+    this.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
index afccd3c..01365f5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
@@ -35,6 +35,7 @@ public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
   public int size;
   public CarbonColumnVector vector;
   public int vectorOffset;
+  public String carbonDataFileWrittenVersion;
   public ProjectionDimension dimension;
   public ProjectionMeasure measure;
   public int ordinal;
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index cf5c3c7..715bd9a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -501,6 +501,11 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
   }
 
   @Override
+  public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
+   // do nothing here
+  }
+
+  @Override
   public void putAllByteArray(byte[] data, int offset, int length) {
     byteArr = data;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
index 23aa19d..d49a3b5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java
@@ -246,4 +246,9 @@ class ColumnarVectorWrapperDirectWithDeleteDelta extends AbstractCarbonColumnarV
   public CarbonColumnVector getColumnVector() {
     return this.columnVector;
   }
+
+  @Override
+  public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
+    this.columnVector.setCarbonDataFileWrittenVersion(carbonDataFileWrittenVersion);
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
index 971d219..d66055f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java
@@ -105,6 +105,11 @@ public class ColumnarVectorWrapperDirectWithInvertedIndex extends AbstractCarbon
   }
 
   @Override
+  public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
+    this.columnVector.setCarbonDataFileWrittenVersion(carbonDataFileWrittenVersion);
+  }
+
+  @Override
   public void putFloats(int rowId, int count, float[] src, int srcIndex) {
     for (int i = srcIndex; i < count; i++) {
       columnVector.putFloat(invertedIndex[rowId++], src[i]);
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index aea722b..ab4812a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -73,6 +74,10 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
     DataFileFooter dataFileFooter = new DataFileFooter();
     dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) fileHeader.getVersion()));
     dataFileFooter.setNumberOfRows(footer.getNum_rows());
+    if (null != footer.getExtra_info()) {
+      dataFileFooter.setCarbonDataFileWrittenVersion(
+          footer.getExtra_info().get(CarbonCommonConstants.CARBON_WRITTEN_VERSION));
+    }
     dataFileFooter.setSchemaUpdatedTimeStamp(fileHeader.getTime_stamp());
     if (footer.isSetIs_sort()) {
       dataFileFooter.setSorted(footer.isIs_sort());
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 32381e7..c2d380d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -25,8 +25,16 @@ import java.nio.charset.Charset;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.List;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -439,11 +447,116 @@ public final class DataTypeUtil {
     }
   }
 
+  private static long createTimeInstant(String dimensionValue, String dateFormat) {
+    // dateFormat is null, use default carbon timestamp format
+    if (null == dateFormat || dateFormat.trim().isEmpty()) {
+      dateFormat = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT;
+    } else if (!dateFormat.trim().contains(" ")) {
+      // if format doesn't contain time segment for eg., dateFormat = yyyy/MM/dd, then append the
+      // default time segment with format (yyyy/MM/dd HH:mm:ss), else LocalDateTime.parse will fail
+      dateFormat += CarbonCommonConstants.CARBON_TIME_SEGMENT_DEFAULT_FORMAT;
+    }
+    String updatedDim = dimensionValue;
+    // if value doesn't contain time segment data for eg., value = 2018/08/01, then append the
+    // default time segment with dimValue(2018/08/01 00:00:00), else LocalDateTime.parse will fail
+    if (!dimensionValue.trim().contains(" ")) {
+      updatedDim += CarbonCommonConstants.CARBON_TIME_SEGMENT_DATA_DEFAULT_FORMAT;
+    }
+    // If format is yyyy-MM-dd HH:mm:ss and data is 2017-9-02 1:01:01, then parsing will fail,
+    // because the month segment and hour segment data has single digit. Hence, add 0's to data,
+    // if data doesn't matches the format length
+    List<String> dateFormatPattern = new ArrayList<>();
+    List<String> dimensionData = new ArrayList<>();
+    StringBuilder format = new StringBuilder();
+    // convert input data to proper format
+    // separate year, month, day,... format segment's to a list
+    for (int i = 0; i < dateFormat.length(); i++) {
+      char c = dateFormat.charAt(i);
+      if ((c >= 'a' && c <= 'z' || c >= 'A' && c <= 'Z')) {
+        format.append(c);
+      } else {
+        String value = format.toString();
+        if (value.equals("hh")) {
+          value = "HH";
+        }
+        dateFormatPattern.add(value);
+        dateFormatPattern.add(Character.toString(c));
+        format = new StringBuilder();
+      }
+      if (i + 1 == dateFormat.length()) {
+        dateFormatPattern.add(format.toString());
+      }
+    }
+    format = new StringBuilder();
+    // separate data year, month, day,.. to a list
+    for (int i = 0; i < updatedDim.length(); i++) {
+      char c = updatedDim.charAt(i);
+      if (c >= 'a' && c <= 'z' || c >= 'A' && c <= 'Z') {
+        // bad record
+        break;
+      }
+      if (c >= '0' && c <= '9') {
+        format.append(c);
+      } else {
+        dimensionData.add(format.toString());
+        dimensionData.add(Character.toString(c));
+        format = new StringBuilder();
+      }
+      if (i + 1 == updatedDim.length()) {
+        dimensionData.add(format.toString());
+      }
+    }
+    // add 0's to year/month/day.. if the data format size doesn't match format size
+    if (!dimensionData.isEmpty() && !(dimensionData.size() < dateFormatPattern.size())) {
+      int i;
+      for (i = 0; i < dateFormatPattern.size(); i++) {
+        String currentTimestampFormat = dateFormatPattern.get(i);
+        String currentDimData = dimensionData.get(i);
+        if (currentTimestampFormat.length() != currentDimData.length()) {
+          if (currentDimData.length() < currentTimestampFormat.length()) {
+            dimensionData.set(i, "0" + currentDimData);
+          }
+        }
+      }
+      // if format is yyyy/MM/dd HH:mm:ss, and data is 2018/01/01 01:01:01.001, then parsing will
+      // fail. In that case, remove the unnecessary data segment from the list
+      if (dimensionData.size() > dateFormatPattern.size()) {
+        dimensionData.subList(i, dimensionData.size()).clear();
+      }
+      // prepare the final format and data
+      updatedDim = String.join("", dimensionData);
+      dateFormat = String.join("", dateFormatPattern);
+    }
+    // create java instant
+    Instant instant = Instant.from(ZonedDateTime
+        .of(LocalDateTime.parse(updatedDim, DateTimeFormatter.ofPattern(dateFormat)),
+            ZoneId.systemDefault()));
+    validateTimeStampRange(instant.getEpochSecond());
+    long us = Math.multiplyExact(instant.getEpochSecond(), 1000L);
+    // get nanoseconds from instant
+    int nano = instant.getNano();
+    if (nano != 0) {
+      while (nano % 10 == 0) {
+        nano /= 10;
+      }
+    }
+    return Math.addExact(us, nano);
+  }
+
   private static Object parseTimestamp(String dimensionValue, String dateFormat) {
     Date dateToStr;
     DateFormat dateFormatter = null;
     long timeValue;
     try {
+      if (Boolean.parseBoolean(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3,
+              CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3_DEFAULT))) {
+        try {
+          return createTimeInstant(dimensionValue, dateFormat.trim());
+        } catch (DateTimeParseException e) {
+          throw new NumberFormatException(e.getMessage());
+        }
+      }
       if (null != dateFormat && !dateFormat.trim().isEmpty()) {
         dateFormatter = new SimpleDateFormat(dateFormat);
         dateFormatter.setLenient(false);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 3a3e599..3779a3e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -141,6 +141,11 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
     return readSupport.readRow(carbonIterator.next());
   }
 
+  public String getCarbonDataFileWrittenVersion() {
+    ChunkRowIterator iterator = (ChunkRowIterator) carbonIterator;
+    return iterator.getCarbonDataFileWrittenVersion();
+  }
+
   /**
    * get batch result
    *
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java b/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java
index 099b631..caf3820 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapperDirect.java
@@ -220,6 +220,11 @@ public class ColumnarVectorWrapperDirect implements CarbonColumnVector, Sequenti
   }
 
   @Override
+  public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
+    // do nothing here
+  }
+
+  @Override
   public Object getData(int rowId) {
     throw new UnsupportedOperationException(
         "Not supported this opeartion from " + this.getClass().getName());
diff --git a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala
index 0318701..36e60ba 100644
--- a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala
+++ b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/SparkVersionAdapter.scala
@@ -83,6 +83,12 @@ trait SparkVersionAdapter {
     DateTimeUtils.timestampToString(timeStamp)
   }
 
+  def rebaseTime(timestamp: Long, carbonWrittenVersion: String = null): Long = {
+    // From spark 3.1, spark will store gregorian micros value for timestamp, hence
+    // rebase is required. For 2.x versions, no need rebase
+    timestamp
+  }
+
   def stringToTime(value: String): java.util.Date = {
     DateTimeUtils.stringToTime(value)
   }
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 4f6f33a..e7b74c7 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
 
+import org.apache.spark.sql.CarbonToSparkAdapter;
 import org.apache.spark.sql.CarbonVectorProxy;
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
 import org.apache.spark.sql.types.Decimal;
@@ -48,6 +49,8 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   private CarbonColumnVector dictionaryVector;
 
+  private String carbonDataFileWrittenVersion;
+
   ColumnarVectorWrapper(CarbonVectorProxy writableColumnVector, boolean[] filteredRows,
       int ordinal) {
     this.sparkColumnVectorProxy = writableColumnVector.getColumnVector(ordinal);
@@ -119,7 +122,8 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   @Override
   public void putLong(int rowId, long value) {
     if (!filteredRows[rowId]) {
-      sparkColumnVectorProxy.putLong(counter++, value);
+      sparkColumnVectorProxy
+          .putLong(counter++, CarbonToSparkAdapter.rebaseTime(value, carbonDataFileWrittenVersion));
     }
   }
 
@@ -297,6 +301,11 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   }
 
   @Override
+  public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
+    this.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
+  }
+
+  @Override
   public void setDictionary(CarbonDictionary dictionary) {
     sparkColumnVectorProxy.setDictionary(dictionary);
   }
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java b/integration/spark/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
index 2b5b69e..15766d5 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 import org.apache.carbondata.core.scan.scanner.LazyPageLoader;
 
+import org.apache.spark.sql.CarbonToSparkAdapter;
 import org.apache.spark.sql.CarbonVectorProxy;
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
 import org.apache.spark.sql.types.Decimal;
@@ -51,6 +52,8 @@ class ColumnarVectorWrapperDirect implements CarbonColumnVector {
 
   private CarbonColumnVector dictionaryVector;
 
+  private String carbonDataFileWrittenVersion;
+
   ColumnarVectorWrapperDirect(CarbonVectorProxy writableColumnVector, int ordinal) {
     this.sparkColumnVectorProxy = writableColumnVector.getColumnVector(ordinal);
     this.carbonVectorProxy = writableColumnVector;
@@ -93,7 +96,8 @@ class ColumnarVectorWrapperDirect implements CarbonColumnVector {
 
   @Override
   public void putLong(int rowId, long value) {
-    sparkColumnVectorProxy.putLong(rowId, value);
+    sparkColumnVectorProxy
+        .putLong(rowId, CarbonToSparkAdapter.rebaseTime(value, carbonDataFileWrittenVersion));
   }
 
   @Override
@@ -235,6 +239,11 @@ class ColumnarVectorWrapperDirect implements CarbonColumnVector {
   }
 
   @Override
+  public void setCarbonDataFileWrittenVersion(String carbonDataFileWrittenVersion) {
+    this.carbonDataFileWrittenVersion = carbonDataFileWrittenVersion;
+  }
+
+  @Override
   public void putFloats(int rowId, int count, float[] src, int srcIndex) {
     sparkColumnVectorProxy.putFloats(rowId, count, src, srcIndex);
   }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 6852968..4d36fdf 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -31,8 +31,9 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark._
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{CarbonToSparkAdapter, SparkSession}
 import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.profiler.{GetPartition, Profiler}
@@ -47,7 +48,9 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.{IndexFilter, Segment}
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression
@@ -475,6 +478,8 @@ class CarbonScanRDD[T: ClassTag](
       val model = format.createQueryModel(inputSplit, attemptContext, indexFilter)
       // one query id per table
       model.setQueryId(queryId)
+
+      val timeStampProjectionColumns = getTimeStampProjectionColumns(model.getProjectionColumns)
       // get RecordReader by FileFormat
 
       var reader: RecordReader[Void, Object] =
@@ -566,6 +571,25 @@ class CarbonScanRDD[T: ClassTag](
           }
           havePair = false
           val value = reader.getCurrentValue
+          if (CarbonProperties.getInstance()
+                .getProperty(CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3,
+                  CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3_DEFAULT).toBoolean &&
+              timeStampProjectionColumns.nonEmpty) {
+            value match {
+              case row: GenericInternalRow if needRebaseTimeValue(reader) =>
+                // rebase timestamp data by converting julian to Gregorian time
+                timeStampProjectionColumns.foreach {
+                  projectionColumnWithIndex =>
+                    val timeStampData = row.get(projectionColumnWithIndex._2,
+                      org.apache.spark.sql.types.DataTypes.TimestampType)
+                    if (null != timeStampData) {
+                      row.update(projectionColumnWithIndex._2,
+                        CarbonToSparkAdapter.rebaseTime(timeStampData.asInstanceOf[Long]))
+                    }
+                }
+              case _ =>
+            }
+          }
           value
         }
 
@@ -581,6 +605,34 @@ class CarbonScanRDD[T: ClassTag](
     iterator.asInstanceOf[Iterator[T]]
   }
 
+  private def getTimeStampProjectionColumns(projectionColumns: Array[CarbonColumn]):
+  Array[(CarbonColumn, Int)] = {
+    // filter timestamp projection columns with index, this will be used to iterate and rebase
+    // timestamp value while reading
+    projectionColumns.zipWithIndex.filter {
+      column =>
+        var isComplexDimension = false
+        // ignore Timestamp complex dimensions
+        column._1 match {
+          case dimension: CarbonDimension =>
+            isComplexDimension = dimension.getComplexParentDimension != null
+          case _ =>
+        }
+        !isComplexDimension && column._1.getDataType == DataTypes.TIMESTAMP
+    }
+  }
+
+  def needRebaseTimeValue(reader: RecordReader[Void, Object]): Boolean = {
+    // carbonDataFileWrittenVersion will be in the format x.x.x-SNAPSHOT
+    // (eg., 2.1.0-SNAPSHOT), get the version name and check if the data file is
+    // written before 2.2.0 version, then rebase timestamp value
+    reader.isInstanceOf[CarbonRecordReader[T]] &&
+    null != reader.asInstanceOf[CarbonRecordReader[T]].getCarbonDataFileWrittenVersion &&
+    reader.asInstanceOf[CarbonRecordReader[T]].getCarbonDataFileWrittenVersion
+      .split(CarbonCommonConstants.HYPHEN).head
+      .compareTo(CarbonCommonConstants.CARBON_SPARK3_VERSION) < 0
+  }
+
   private def addTaskCompletionListener(split: Partition,
       context: TaskContext,
       queryStartTime: Long,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 98c18ac..b52bc20 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -138,6 +138,10 @@ class CarbonEnv {
       .addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
     Profiler.initialize(sparkSession.sparkContext)
     CarbonToSparkAdapter.addSparkSessionListener(sparkSession)
+    if(sparkSession.sparkContext.version.startsWith("3.1")) {
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants
+        .CARBON_SPARK_VERSION_SPARK3, "true")
+    }
     initialized = true
     LOGGER.info("Initialize CarbonEnv completed...")
   }
diff --git a/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala b/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
index ff9d2de..5cf5c15 100644
--- a/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
+++ b/integration/spark/src/main/spark3.1/org/apache/spark/sql/SparkVersionAdapter.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{BucketSpecContext, Co
 import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan, OneRowRelation, QualifiedColType}
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
-import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, RebaseDateTime, TimestampFormatter}
 import org.apache.spark.sql.execution.{ExplainMode, QueryExecution, ShuffledRowRDD, SimpleMode, SparkPlan, SQLExecution, UnaryExecNode}
 import org.apache.spark.sql.execution.command.{ExplainCommand, Field, PartitionerField, RefreshTableCommand, TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
@@ -119,6 +119,25 @@ trait SparkVersionAdapter {
     DateTimeUtils.daysToLocalDate(date).toString
   }
 
+  /**
+   * Rebase the timestamp value from julian to gregorian time micros
+   */
+  def rebaseTime(timestamp: Long): Long = {
+    RebaseDateTime.rebaseJulianToGregorianMicros(timestamp)
+  }
+
+  def rebaseTime(timestamp: Long, carbonDataFileWrittenVersion: String): Long = {
+    // carbonDataFileWrittenVersion will be in the format x.x.x-SNAPSHOT(eg., 2.1.0-SNAPSHOT),
+    // get the version name and check if the data file is written before 2.2.0 version
+    if (null != carbonDataFileWrittenVersion &&
+        carbonDataFileWrittenVersion.split(CarbonCommonConstants.HYPHEN).head
+          .compareTo(CarbonCommonConstants.CARBON_SPARK3_VERSION) < 0) {
+      RebaseDateTime.rebaseJulianToGregorianMicros(timestamp)
+    } else {
+      timestamp
+    }
+  }
+
   // Note that due to this scala bug: https://github.com/scala/bug/issues/11016, we need to make
   // this function polymorphic for every scala version >= 2.12, otherwise an overloaded method
   // resolution error occurs at compile time.
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampNoDictionaryColumnTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampNoDictionaryColumnTestCase.scala
index c9ad33a..e75de7f 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampNoDictionaryColumnTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/directdictionary/TimestampNoDictionaryColumnTestCase.scala
@@ -51,47 +51,35 @@ class TimestampNoDictionaryColumnTestCase extends QueryTest with BeforeAndAfterA
   }
 
   test("select projectjoindate, projectenddate from timestamp_nodictionary") {
-    if (!sqlContext.sparkContext.version.startsWith("3.1")) {
-      checkAnswer(
-        sql("select projectjoindate, projectenddate from timestamp_nodictionary"),
-        Seq(Row(Timestamp.valueOf("2000-01-29 00:00:00.0"),
-          Timestamp.valueOf("2016-06-29 00:00:00.0")),
-          Row(Timestamp.valueOf("1800-02-17 00:00:00.0"),
-            Timestamp.valueOf("1900-11-29 00:00:00.0")),
-          Row(null, Timestamp.valueOf("2016-05-29 00:00:00.0")),
-          Row(null, Timestamp.valueOf("2016-11-30 00:00:00.0")),
-          Row(Timestamp.valueOf("3000-10-22 00:00:00.0"),
-            Timestamp.valueOf("3002-11-15 00:00:00.0")),
-          Row(Timestamp.valueOf("1802-06-29 00:00:00.0"),
-            Timestamp.valueOf("1902-12-30 00:00:00.0")),
-          Row(null, Timestamp.valueOf("2016-12-30 00:00:00.0")),
-          Row(Timestamp.valueOf("2038-11-14 00:00:00.0"),
-            Timestamp.valueOf("2041-12-29 00:00:00.0")),
-          Row(null, null),
-          Row(Timestamp.valueOf("2014-09-15 00:00:00.0"),
-            Timestamp.valueOf("2016-05-29 00:00:00.0"))
-        )
+    checkAnswer(
+      sql("select projectjoindate, projectenddate from timestamp_nodictionary"),
+      Seq(Row(Timestamp.valueOf("2000-01-29 00:00:00.0"),
+        Timestamp.valueOf("2016-06-29 00:00:00.0")),
+        Row(Timestamp.valueOf("1800-02-17 00:00:00.0"), Timestamp.valueOf("1900-11-29 00:00:00.0")),
+        Row(null, Timestamp.valueOf("2016-05-29 00:00:00.0")),
+        Row(null, Timestamp.valueOf("2016-11-30 00:00:00.0")),
+        Row(Timestamp.valueOf("3000-10-22 00:00:00.0"), Timestamp.valueOf("3002-11-15 00:00:00.0")),
+        Row(Timestamp.valueOf("1802-06-29 00:00:00.0"), Timestamp.valueOf("1902-12-30 00:00:00.0")),
+        Row(null, Timestamp.valueOf("2016-12-30 00:00:00.0")),
+        Row(Timestamp.valueOf("2038-11-14 00:00:00.0"), Timestamp.valueOf("2041-12-29 00:00:00.0")),
+        Row(null, null),
+        Row(Timestamp.valueOf("2014-09-15 00:00:00.0"), Timestamp.valueOf("2016-05-29 00:00:00.0"))
       )
-    }
+    )
   }
 
-
   test("select projectjoindate, projectenddate from timestamp_nodictionary where in filter") {
-    if (!sqlContext.sparkContext.version.startsWith("3.1")) {
-      checkAnswer(
-        sql("select projectjoindate, projectenddate from timestamp_nodictionary " +
+    checkAnswer(
+      sql("select projectjoindate, projectenddate from timestamp_nodictionary " +
           "where projectjoindate in ('1800-02-17 00:00:00','3000-10-22 00:00:00') or " +
           "projectenddate in ('1900-11-29 00:00:00','3002-11-15 00:00:00','2041-12-29 00:00:00')"),
-        Seq(Row(Timestamp.valueOf("1800-02-17 00:00:00.0"),
-          Timestamp.valueOf("1900-11-29 00:00:00.0")),
-          Row(Timestamp.valueOf("3000-10-22 00:00:00.0"),
-            Timestamp.valueOf("3002-11-15 00:00:00.0")),
-          Row(Timestamp.valueOf("2038-11-14 00:00:00.0"),
-            Timestamp.valueOf("2041-12-29 00:00:00.0")))
-      )
-    }
-  }
+      Seq(Row(Timestamp.valueOf("1800-02-17 00:00:00.0"),
+        Timestamp.valueOf("1900-11-29 00:00:00.0")),
+        Row(Timestamp.valueOf("3000-10-22 00:00:00.0"), Timestamp.valueOf("3002-11-15 00:00:00.0")),
+        Row(Timestamp.valueOf("2038-11-14 00:00:00.0"), Timestamp.valueOf("2041-12-29 00:00:00.0")))
+    )
 
+  }
 
   override def afterAll {
     CarbonProperties.getInstance()
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
index 5977fe7..92a2102 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -24,6 +24,7 @@ import java.util.Base64
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.util.SparkStreamingUtil
 
 object FieldConverter {
 
@@ -125,7 +126,7 @@ object FieldConverter {
             i += 1
           }
           builder.substring(0, builder.length - delimiter.length())
-        case other => other.toString
+        case other => SparkStreamingUtil.checkInstant(other, timeStampFormat)
       }
     }
   }
diff --git a/streaming/src/main/spark2.x/org.apache.carbondata.util/SparkStreamingUtil.scala b/streaming/src/main/spark2.x/org.apache.carbondata.util/SparkStreamingUtil.scala
index 71e143c..399d2f4 100644
--- a/streaming/src/main/spark2.x/org.apache.carbondata.util/SparkStreamingUtil.scala
+++ b/streaming/src/main/spark2.x/org.apache.carbondata.util/SparkStreamingUtil.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.util
 
+import java.text.SimpleDateFormat
+
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -26,4 +28,8 @@ object SparkStreamingUtil {
   def convertInternalRowToRow(expressionEncoder: ExpressionEncoder[Row]): InternalRow => Row = {
     expressionEncoder.fromRow
   }
+
+  def checkInstant(value: Any, timeStampFormat: SimpleDateFormat): String = {
+    value.toString
+  }
 }
diff --git a/streaming/src/main/spark3.1/org/apache/carbondata/util/SparkStreamingUtil.scala b/streaming/src/main/spark3.1/org/apache/carbondata/util/SparkStreamingUtil.scala
index 3c8929f..69499ee 100644
--- a/streaming/src/main/spark3.1/org/apache/carbondata/util/SparkStreamingUtil.scala
+++ b/streaming/src/main/spark3.1/org/apache/carbondata/util/SparkStreamingUtil.scala
@@ -17,13 +17,26 @@
 
 package org.apache.carbondata.util
 
+import java.text.SimpleDateFormat
+import java.time.Instant
+
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 
 object SparkStreamingUtil {
 
   def convertInternalRowToRow(expressionEncoder: ExpressionEncoder[Row]): InternalRow => Row = {
     expressionEncoder.createDeserializer().apply
   }
+
+  def checkInstant(value: Any, timeStampFormat: SimpleDateFormat): String = {
+    value match {
+      case instant: Instant =>
+        // if value is instant, convert instant time to java timestamp
+        timeStampFormat format DateTimeUtils.toJavaTimestamp(DateTimeUtils.instantToMicros(instant))
+      case other => other.toString
+    }
+  }
 }