You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/25 11:02:54 UTC

carbondata git commit: [CARBONDATA-3025]add more metadata in carbon file footer

Repository: carbondata
Updated Branches:
  refs/heads/master e19c5da6d -> 9578786b2


[CARBONDATA-3025]add more metadata in carbon file footer

Changes Proposed in this PR:
Add more info to carbon file footer, like written_by (which will be spark application_name)
in case of insert into and load command. To read this info one can use CLI.
For SDK this API will be exposed to write this info in footer and one API will exposed to read this info from SDK.
footer will have information about in which version of carbon the file is written,
which will be helpful for getting details, for comaptibility etc.

This closes #2829


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9578786b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9578786b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9578786b

Branch: refs/heads/master
Commit: 9578786b28bda3728ad5917afab2c0005688f03a
Parents: e19c5da
Author: akashrn5 <ak...@gmail.com>
Authored: Wed Oct 17 19:56:49 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Thu Oct 25 16:31:03 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 15 ++++++
 format/src/main/thrift/carbondata.thrift        |  1 +
 .../hadoop/ft/CarbonTableInputFormatTest.java   |  2 +
 .../hadoop/ft/CarbonTableOutputFormatTest.java  |  2 +
 .../presto/impl/CarbonTableReader.java          |  2 +
 .../integrationtest/PrestoAllDataTypeTest.scala |  2 +
 ...FileInputFormatWithExternalCarbonTable.scala |  2 +-
 .../TestNonTransactionalCarbonTable.scala       | 38 ++++++-------
 ...tNonTransactionalCarbonTableJsonWriter.scala |  2 +-
 ...ansactionalCarbonTableWithAvroDataType.scala | 56 ++++++++++----------
 ...ransactionalCarbonTableWithComplexType.scala |  6 +--
 .../load/DataLoadProcessBuilderOnSpark.scala    |  4 ++
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |  6 +++
 .../datasources/SparkCarbonFileFormat.scala     |  6 ++-
 .../datasource/SparkCarbonDataSourceTest.scala  | 10 ++--
 ...tCreateTableUsingSparkCarbonFileFormat.scala |  6 +--
 .../sql/carbondata/datasource/TestUtil.scala    |  2 +-
 .../datasources/SparkCarbonTableFormat.scala    |  3 ++
 .../CarbonGetTableDetailComandTestCase.scala    |  7 +--
 .../writer/v3/CarbonFactDataWriterImplV3.java   |  6 +++
 .../carbondata/sdk/file/CarbonSchemaReader.java | 28 ++++++++++
 .../sdk/file/CarbonWriterBuilder.java           | 18 +++++++
 .../apache/carbondata/sdk/file/TestUtil.java    |  2 +-
 .../sdk/file/AvroCarbonWriterTest.java          | 16 +++---
 .../sdk/file/CSVCarbonWriterTest.java           | 18 +++----
 .../carbondata/sdk/file/CarbonReaderTest.java   | 23 +++++---
 .../sdk/file/ConcurrentAvroSdkWriterTest.java   |  2 +-
 .../sdk/file/ConcurrentSdkWriterTest.java       |  2 +-
 28 files changed, 194 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 5085b5f..fa5227b 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1710,6 +1710,21 @@ public final class CarbonCommonConstants {
   public static final int CARBON_MINMAX_ALLOWED_BYTE_COUNT_MIN = 10;
   public static final int CARBON_MINMAX_ALLOWED_BYTE_COUNT_MAX = 1000;
 
+  /**
+   * Written by detail to be written in carbondata footer for better maintanability
+   */
+  public static final String CARBON_WRITTEN_BY_FOOTER_INFO = "written_by";
+
+  /**
+   * carbondata project version used while writing the carbondata file
+   */
+  public static final String CARBON_WRITTEN_VERSION = "version";
+
+  /**
+   * property to set the appName of who is going to write the carbondata
+   */
+  public static final String CARBON_WRITTEN_BY_APPNAME = "carbon.writtenby.app.name";
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Unused constants and parameters start here
   //////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index ec51ff7..5cad5ac 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -206,6 +206,7 @@ struct FileFooter3{
     4: optional list<BlockletInfo3> blocklet_info_list3;	// Information about blocklets of all columns in this file for V3 format
     5: optional dictionary.ColumnDictionaryChunk dictionary; // Blocklet local dictionary
     6: optional bool is_sort; // True if the data is sorted in this file, it is used for compaction to decide whether to use merge sort or not
+    7: optional map<string, string> extra_info; // map used to write extra info/metadata to file footer ,like who is writing the file and in which version the file is written etc
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
index 136d3cc..d379d33 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
@@ -62,6 +62,8 @@ public class CarbonTableInputFormatTest {
         addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
     CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, "/tmp/carbon/");
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, "CarbonTableInputFormatTest");
     try {
       creator = new StoreCreator(new File("target/store").getAbsolutePath(),
           new File("../hadoop/src/test/resources/data.csv").getCanonicalPath());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
index 379fdaf..6fb7252 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java
@@ -53,6 +53,8 @@ public class CarbonTableOutputFormatTest {
         addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
     CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, "/tmp/carbon/");
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, "CarbonTableOutputFormatTest");
     try {
       carbonLoadModel = new StoreCreator(new File("target/store").getAbsolutePath(),
           new File("../hadoop/src/test/resources/data.csv").getCanonicalPath()).createTableAndLoadModel();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 6ddee42..7baf7ea 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -534,6 +534,8 @@ public class CarbonTableReader {
         config.getEnableUnsafeColumnPage());
     addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, config.getEnableUnsafeSort());
     addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, config.getEnableQueryStatistics());
+    // TODO: Support configurable
+    addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, "Presto_Server");
   }
 
   private void setS3Properties() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
index 6995117..d6e322b 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
@@ -73,6 +73,8 @@ class PrestoAllDataTypeTest extends FunSuiteLike with BeforeAndAfterAll {
     import org.apache.carbondata.presto.util.CarbonDataStoreCreator
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
       systemPath)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+      "Presto")
     CarbonDataStoreCreator
       .createCarbonStore(storePath,
         s"$rootPath/integration/presto/src/test/resources/alldatatype.csv")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 3ab956c..553fa25 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -56,7 +56,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
       val builder = CarbonWriter.builder()
       val writer =
         builder.outputPath(writerPath + "/Fact/Part0/Segment_null")
-          .withCsvInput(Schema.parseJson(schema)).build()
+          .withCsvInput(Schema.parseJson(schema)).writtenBy("TestCarbonFileInputFormatWithExternalCarbonTable").build()
 
       var i = 0
       while (i < 100) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 38cf201..d2e33e2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -139,13 +139,13 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
             .sortBy(sortColumns.toArray)
             .uniqueIdentifier(
               System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
-            .withCsvInput(Schema.parseJson(schema)).build()
+            .withCsvInput(Schema.parseJson(schema)).writtenBy("TestNonTransactionalCarbonTable").build()
         } else {
           builder.outputPath(writerPath)
             .sortBy(sortColumns.toArray)
             .uniqueIdentifier(
               System.currentTimeMillis).withBlockSize(2)
-            .withCsvInput(Schema.parseJson(schema)).build()
+            .withCsvInput(Schema.parseJson(schema)).writtenBy("TestNonTransactionalCarbonTable").build()
         }
       var i = 0
       while (i < rows) {
@@ -180,7 +180,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       val writer =
         builder.outputPath(writerPath)
           .uniqueIdentifier(System.currentTimeMillis()).withBlockSize(2).sortBy(sortColumns)
-          .withCsvInput(new Schema(fields)).build()
+          .withCsvInput(new Schema(fields)).writtenBy("TestNonTransactionalCarbonTable").build()
       var i = 0
       while (i < rows) {
         writer.write(Array[String]("true", String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -211,7 +211,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
           .sortBy(sortColumns.toArray)
           .uniqueIdentifier(
             123).withBlockSize(2)
-          .withCsvInput(Schema.parseJson(schema)).build()
+          .withCsvInput(Schema.parseJson(schema)).writtenBy("TestNonTransactionalCarbonTable").build()
       var i = 0
       while (i < rows) {
         writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -985,7 +985,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val builder: CarbonWriterBuilder = CarbonWriter.builder.outputPath(writerPath)
       .withLoadOptions(options)
 
-    val writer: CarbonWriter = builder.withCsvInput(new Schema(fields)).build()
+    val writer: CarbonWriter = builder.withCsvInput(new Schema(fields)).writtenBy("TestNonTransactionalCarbonTable").build()
     writer.write(Array("babu","1","02-01-2002","02-01-2002 01:01:00"))
     writer.close()
 
@@ -1110,7 +1110,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     try {
       val writer = CarbonWriter.builder
         .outputPath(writerPath)
-        .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build()
+        .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTable").build()
       var i = 0
       while (i < rows) {
         writer.write(record)
@@ -2084,7 +2084,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     assert(intercept[RuntimeException] {
       val writer = CarbonWriter.builder.sortBy(Array("name", "id"))
-        .outputPath(writerPath).withAvroInput(nn).build()
+        .outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTable").build()
       writer.write(record)
       writer.close()
     }.getMessage.toLowerCase.contains("column: name specified in sort columns"))
@@ -2124,7 +2124,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).withAvroInput(nn).build()
+      .outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTable").build()
     writer.write(record)
     writer.close()
   }
@@ -2162,7 +2162,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder.sortBy(Array("id"))
-      .outputPath(writerPath).withAvroInput(nn).build()
+      .outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTable").build()
     writer.write(record)
     writer.close()
   }
@@ -2206,7 +2206,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).withAvroInput(nn).build()
+      .outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTable").build()
     writer.write(record)
     writer.close()
   }
@@ -2246,7 +2246,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).withAvroInput(nn).build()
+      .outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTable").build()
     writer.write(record)
     writer.close()
     sql(
@@ -2292,7 +2292,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = testUtil.jsonToAvro(json1, schema1)
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).withAvroInput(nn).build()
+      .outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTable").build()
     writer.write(record)
     writer.close()
     sql(
@@ -2339,7 +2339,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
 
     val writer = CarbonWriter.builder
-      .outputPath(writerPath).withAvroInput(nn).build()
+      .outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTable").build()
     writer.write(record)
     writer.close()
     sql(
@@ -2359,7 +2359,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val writer: CarbonWriter = CarbonWriter.builder
       .outputPath(writerPath)
       .withTableProperties(options)
-      .withCsvInput(new Schema(fields)).build()
+      .withCsvInput(new Schema(fields)).writtenBy("TestNonTransactionalCarbonTable").build()
     writer.write(Array("carbon", "1"))
     writer.write(Array("hydrogen", "10"))
     writer.write(Array("boron", "4"))
@@ -2377,7 +2377,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     // write local sort data
     val writer1: CarbonWriter = CarbonWriter.builder
       .outputPath(writerPath)
-      .withCsvInput(new Schema(fields)).build()
+      .withCsvInput(new Schema(fields)).writtenBy("TestNonTransactionalCarbonTable").build()
     writer1.write(Array("carbon", "1"))
     writer1.write(Array("hydrogen", "10"))
     writer1.write(Array("boron", "4"))
@@ -2393,7 +2393,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     FileUtils.deleteDirectory(new File(writerPath))
     val builder = CarbonWriter.builder
       .sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true)
-      .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
+      .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath).writtenBy("TestNonTransactionalCarbonTable")
     generateCarbonData(builder)
     assert(FileFactory.getCarbonFile(writerPath).exists())
     assert(testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))
@@ -2418,7 +2418,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         "local_dictionary_enable" -> "true").asJava
     val builder = CarbonWriter.builder
       .withTableProperties(tablePropertiesMap)
-      .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
+      .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath).writtenBy("TestNonTransactionalCarbonTable")
     generateCarbonData(builder)
     assert(FileFactory.getCarbonFile(writerPath).exists())
     assert(testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))
@@ -2439,7 +2439,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val builder = CarbonWriter.builder
       .sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true)
       .localDictionaryThreshold(5)
-      .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
+      .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath).writtenBy("TestNonTransactionalCarbonTable")
     generateCarbonData(builder)
     assert(FileFactory.getCarbonFile(writerPath).exists())
     assert(!testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))
@@ -2460,7 +2460,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val builder = CarbonWriter.builder
       .sortBy(Array[String]("name")).withBlockSize(12).enableLocalDictionary(true)
       .localDictionaryThreshold(200)
-      .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath)
+      .uniqueIdentifier(System.currentTimeMillis).taskNo(System.nanoTime).outputPath(writerPath).writtenBy("TestNonTransactionalCarbonTable")
     generateCarbonData(builder)
     assert(FileFactory.getCarbonFile(writerPath).exists())
     assert(testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
index 7ad698c..862c72a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
@@ -98,7 +98,7 @@ class TestNonTransactionalCarbonTableJsonWriter extends QueryTest with BeforeAnd
         .outputPath(writerPath)
         .uniqueIdentifier(System.currentTimeMillis())
         .withLoadOptions(options)
-        .withJsonInput(carbonSchema).build()
+        .withJsonInput(carbonSchema).writtenBy("TestNonTransactionalCarbonTableJsonWriter").build()
       writer.write(jsonRow)
       writer.close()
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
index d5da794..a7613c1 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
@@ -92,7 +92,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -138,7 +138,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(mySchema)
     val record = testUtil.jsonToAvro(json1, mySchema)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -181,7 +181,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(mySchema)
     val record = testUtil.jsonToAvro(json, mySchema)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -211,7 +211,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -240,7 +240,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -269,7 +269,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -298,7 +298,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -327,7 +327,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -356,7 +356,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
     val exception1 = intercept[UnsupportedOperationException] {
-      val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+      val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
       writer.write(record)
       writer.close()
     }
@@ -392,7 +392,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -427,7 +427,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -464,7 +464,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -496,7 +496,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -554,7 +554,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -648,7 +648,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -700,7 +700,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     avroRec.put("union_field", bytes1)
 
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -769,7 +769,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     avroRec.put("record2", 10.24)
     avroRec.put("struct_field_decimal", genericByteArray)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -840,7 +840,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     avroRec.put("age", 10)
     avroRec.put("dec_fields", genericByteArray)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -886,7 +886,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -932,7 +932,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val record = testUtil.jsonToAvro(json1, schema1)
 
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -976,7 +976,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
     val avroRec = new GenericData. Record(nn)
     avroRec.put("id", bytes1)
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1020,7 +1020,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes1)
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1064,7 +1064,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val bytes1 = ByteBuffer.wrap(DatatypeConverter.parseBase64Binary(data1))
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes1)
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1104,7 +1104,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
       s"""{"dec_field":"$data"}""".stripMargin
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes)
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(avroRec)
     writer.close()
     sql(
@@ -1145,7 +1145,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val avroRec = new GenericData. Record(nn)
     avroRec.put("dec_field", bytes)
     val exception1 = intercept[Exception] {
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(avroRec)
     writer.close()
     }
@@ -1194,7 +1194,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -1229,7 +1229,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     val nn = new org.apache.avro.Schema.Parser().parse(schema1)
     val record = testUtil.jsonToAvro(json1, schema1)
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(record)
     writer.close()
     sql(
@@ -1278,7 +1278,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     avroRec.put("union_field", bytes1)
 
 
-    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).build()
+    val writer = CarbonWriter.builder.outputPath(writerPath).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithAvroDataType").build()
     writer.write(avroRec)
     writer.close()
     sql(s"create table sdkOutputTable(union_field struct<union_field0:decimal(10,2),union_field1:int>) " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index 42bb791..36cf5f4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -68,11 +68,11 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
         CarbonWriter.builder
           .outputPath(writerPath).enableLocalDictionary(true)
           .localDictionaryThreshold(2000)
-          .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build()
+          .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithComplexType").build()
       } else {
         CarbonWriter.builder
           .outputPath(writerPath)
-          .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build()
+          .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithComplexType").build()
       }
       var i = 0
       while (i < rows) {
@@ -268,7 +268,7 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
       """.stripMargin
     val pschema= org.apache.avro.Schema.parse(mySchema)
     val records = testUtil.jsonToAvro(jsonvalue, mySchema)
-    val writer = CarbonWriter.builder().outputPath(writerPath).withAvroInput(pschema).build()
+    val writer = CarbonWriter.builder().outputPath(writerPath).withAvroInput(pschema).writtenBy("TestNonTransactionalCarbonTableWithComplexType").build()
     writer.write(records)
     writer.close()
     sql("DROP TABLE IF EXISTS sdkOutputTable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 923676c..d794636 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -66,6 +66,10 @@ object DataLoadProcessBuilderOnSpark {
     val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
     val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
 
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+        sparkSession.sparkContext.getConf.get("spark.app.name"))
+
     val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
     // 1. Input
     val inputRDD = originRDD

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 83cd59c..cdfce56 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.util.SparkSQLUtil
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util._
 
@@ -56,6 +57,11 @@ abstract class CarbonRDD[T: ClassTag](
 
   protected def internalGetPartitions: Array[Partition]
 
+
+  CarbonProperties.getInstance()
+    .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+      ss.sparkContext.getConf.get("spark.app.name"))
+
   override def getPartitions: Array[Partition] = {
     ThreadLocalSessionInfo.setConfigurationToCurrentThread(hadoopConf)
     internalGetPartitions

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 53b1bb1..88b7ff9 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -44,7 +44,7 @@ import org.apache.spark.util.{SerializableConfiguration, TaskCompletionListener}
 import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnarFormatVersion}
@@ -126,6 +126,10 @@ class SparkCarbonFileFormat extends FileFormat
     model.setLoadWithoutConverterStep(true)
     CarbonTableOutputFormat.setLoadModel(conf, model)
 
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+        sparkSession.sparkContext.getConf.get("spark.app.name"))
+
     new OutputWriterFactory {
       override def newInstance(
           path: String,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index b15ad6d..a1a5b8e 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -987,7 +987,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
       val writer =
         builder.outputPath(path)
           .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
-          .withCsvInput(new Schema(structType)).build()
+          .withCsvInput(new Schema(structType)).writtenBy("SparkCarbonDataSourceTest").build()
 
       var i = 0
       while (i < 11) {
@@ -1033,7 +1033,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
       val writer =
         builder.outputPath(path)
           .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(Array("bytefield"))
-          .withCsvInput(new Schema(fields)).build()
+          .withCsvInput(new Schema(fields)).writtenBy("SparkCarbonDataSourceTest").build()
 
       var i = 0
       while (i < 11) {
@@ -1087,7 +1087,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
       val writer =
         builder.outputPath(path)
           .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
-          .withCsvInput(new Schema(structType)).build()
+          .withCsvInput(new Schema(structType)).writtenBy("SparkCarbonDataSourceTest").build()
 
       var i = 0
       while (i < 10) {
@@ -1158,7 +1158,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
       val writer =
         builder.outputPath(writerPath)
           .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns)
-          .withCsvInput(new Schema(fields)).build()
+          .withCsvInput(new Schema(fields)).writtenBy("SparkCarbonDataSourceTest").build()
 
       var i = 0
       while (i < rows) {
@@ -1255,7 +1255,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
       val writer =
         builder.outputPath(path)
           .uniqueIdentifier(System.nanoTime()).withBlockSize(2)
-          .withCsvInput(new Schema(fields)).build()
+          .withCsvInput(new Schema(fields)).writtenBy("SparkCarbonDataSourceTest").build()
 
       var i = 0
       while (i < 33000) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 250e9a6..607b7d5 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -79,7 +79,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
     try {
       val builder = CarbonWriter.builder()
       val writer =
-        builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
+        builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).writtenBy("TestCreateTableUsingSparkCarbonFileFormat").build()
       var i = 0
       while (i < 100) {
         writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -376,7 +376,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
       val options=Map("bad_records_action"->"FORCE","complex_delimiter_level_1"->"$").asJava
       val writer = CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16)
         .sortBy(Array("myid", "ingestion_time", "event_id")).withLoadOptions(options)
-        .withCsvInput(new Schema(fields)).build()
+        .withCsvInput(new Schema(fields)).writtenBy("TestCreateTableUsingSparkCarbonFileFormat").build()
       val timeF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
       val date_F=new SimpleDateFormat("yyyy-MM-dd")
       for(i<- 1 to recordsInBlocklet1){
@@ -434,7 +434,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
       .append("]")
       .toString()
     val builder = CarbonWriter.builder()
-    val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
+    val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).writtenBy("TestCreateTableUsingSparkCarbonFileFormat").build()
     val totalRecordsNum = 3
     for (i <- 0 until totalRecordsNum) {
       // write a varchar with 75,000 length

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
index 03aaf1c..994ec43 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
@@ -151,7 +151,7 @@ object TestUtil {
     try {
       val writer = CarbonWriter.builder
         .outputPath(writerPath)
-        .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build()
+        .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).writtenBy("DataSource").build()
       var i = 0
       while (i < rows) {
         writer.write(record)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 6bbdcec..47d6a71 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -121,6 +121,9 @@ with Serializable {
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
     model.setLoadWithoutConverterStep(true)
+    carbonProperty
+      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+        sparkSession.sparkContext.getConf.get("spark.app.name"))
 
     val staticPartition = options.getOrElse("staticpartition", null)
     if (staticPartition != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index 2669417..ce57e46 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -42,10 +42,11 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
 
     assertResult(2)(result.length)
     assertResult("table_info1")(result(0).getString(0))
-    // 2221 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata
-    assertResult(2221)(result(0).getLong(1))
+    // 2282 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata
+    // and more metadata like written_by and version details are added
+    assertResult(2282)(result(0).getLong(1))
     assertResult("table_info2")(result(1).getString(0))
-    assertResult(2221)(result(1).getLong(1))
+    assertResult(2282)(result(1).getLong(1))
   }
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index d97d80c..7256c1e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonVersionConstants;
 import org.apache.carbondata.core.datastore.blocklet.BlockletEncodedColumnPage;
 import org.apache.carbondata.core.datastore.blocklet.EncodedBlocklet;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
@@ -103,6 +104,11 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
           .convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality,
               thriftColumnSchemaList.size());
       convertFileMeta.setIs_sort(isSorted);
+      convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO,
+          CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME));
+      convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_VERSION,
+          CarbonVersionConstants.CARBONDATA_VERSION);
       // fill the carbon index details
       fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), carbonDataFileName, currentPosition);
       // write the footer

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
index e84a25a..53ebc53 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
@@ -19,18 +19,23 @@ package org.apache.carbondata.sdk.file;
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonFooterReaderV3;
 import org.apache.carbondata.core.reader.CarbonHeaderReader;
 import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.FileFooter3;
 
 import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
 
@@ -77,6 +82,29 @@ public class CarbonSchemaReader {
   }
 
   /**
+   * This method return the version details in formatted string by reading from carbondata file
+   * @param dataFilePath
+   * @return
+   * @throws IOException
+   */
+  public static String getVersionDetails(String dataFilePath) throws IOException {
+    long fileSize =
+        FileFactory.getCarbonFile(dataFilePath, FileFactory.getFileType(dataFilePath)).getSize();
+    FileReader fileReader = FileFactory.getFileHolder(FileFactory.getFileType(dataFilePath));
+    ByteBuffer buffer =
+        fileReader.readByteBuffer(FileFactory.getUpdatedFilePath(dataFilePath), fileSize - 8, 8);
+    CarbonFooterReaderV3 footerReader = new CarbonFooterReaderV3(dataFilePath, buffer.getLong());
+    FileFooter3 footer = footerReader.readFooterVersion3();
+    if (null != footer.getExtra_info()) {
+      return footer.getExtra_info().get(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO)
+          + " in version: " + footer.getExtra_info()
+          .get(CarbonCommonConstants.CARBON_WRITTEN_VERSION);
+    } else {
+      return "Version Details are not found in carbondata file";
+    }
+  }
+
+  /**
    * Read carbonindex file and return the schema
    *
    * @param indexFilePath complete path including index file name

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index ed2c956..a47cc68 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
@@ -66,6 +67,7 @@ public class CarbonWriterBuilder {
   private boolean isLocalDictionaryEnabled;
   private short numOfThreads;
   private Configuration hadoopConf;
+  private String writtenByApp;
   private enum WRITER_TYPE {
     CSV, AVRO, JSON
   }
@@ -294,6 +296,15 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * @param appName appName which is writing the carbondata files
+   * @return
+   */
+  public CarbonWriterBuilder writtenBy(String appName) {
+    this.writtenByApp = appName;
+    return this;
+  }
+
+  /**
    * @param enableLocalDictionary enable local dictionary  , default is false
    * @return updated CarbonWriterBuilder
    */
@@ -372,8 +383,15 @@ public class CarbonWriterBuilder {
           "Writer type is not set, use withCsvInput() or withAvroInput() or withJsonInput()  "
               + "API based on input");
     }
+    if (this.writtenByApp == null || this.writtenByApp.isEmpty()) {
+      throw new RuntimeException(
+          "AppName is not set, please use writtenBy() API to set the App Name"
+              + "which is using SDK");
+    }
     CarbonLoadModel loadModel = buildLoadModel(schema);
     loadModel.setSdkWriterCores(numOfThreads);
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, writtenByApp);
     if (hadoopConf == null) {
       hadoopConf = FileFactory.getConfiguration();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java
index df31f9f..d786c86 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -113,7 +113,7 @@ public class TestUtil {
         builder = builder.withBlockSize(blockSize);
       }
 
-      CarbonWriter writer = builder.withCsvInput(schema).build();
+      CarbonWriter writer = builder.withCsvInput(schema).writtenBy("TestUtil").build();
 
       for (int i = 0; i < rows; i++) {
         writer.write(new String[]{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index 53ed90f..f740ae2 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -85,7 +85,7 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema);
     try {
       CarbonWriter writer = CarbonWriter.builder().outputPath(path)
-          .withAvroInput(new Schema.Parser().parse(avroSchema)).build();
+          .withAvroInput(new Schema.Parser().parse(avroSchema)).writtenBy("AvroCarbonWriterTest").build();
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -150,7 +150,7 @@ public class AvroCarbonWriterTest {
       CarbonWriter writer = CarbonWriter.builder()
           .outputPath(path)
           
-          .withAvroInput(new Schema.Parser().parse(avroSchema)).build();
+          .withAvroInput(new Schema.Parser().parse(avroSchema)).writtenBy("AvroCarbonWriterTest").build();
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -236,7 +236,7 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = TestUtil.jsonToAvro(json, mySchema);
 
     try {
-      CarbonWriter writer = CarbonWriter.builder().outputPath(path).withAvroInput(nn).build();
+      CarbonWriter writer = CarbonWriter.builder().outputPath(path).withAvroInput(nn).writtenBy("AvroCarbonWriterTest").build();
       for (int i = 0; i < 100; i++) {
         writer.write(record);
       }
@@ -290,7 +290,7 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = TestUtil.jsonToAvro(json, mySchema);
 
     try {
-      CarbonWriter writer = CarbonWriter.builder().outputPath(path).withAvroInput(nn).build();
+      CarbonWriter writer = CarbonWriter.builder().outputPath(path).withAvroInput(nn).writtenBy("AvroCarbonWriterTest").build();
       for (int i = 0; i < 100; i++) {
         writer.write(record);
       }
@@ -320,7 +320,7 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = TestUtil.jsonToAvro(json, mySchema);
     try {
       CarbonWriter writer =
-          CarbonWriter.builder().outputPath(path).sortBy(sortColumns).withAvroInput(nn).build();
+          CarbonWriter.builder().outputPath(path).sortBy(sortColumns).withAvroInput(nn).writtenBy("AvroCarbonWriterTest").build();
       for (int i = 0; i < 100; i++) {
         writer.write(record);
       }
@@ -434,7 +434,7 @@ public class AvroCarbonWriterTest {
         .uniqueIdentifier(System.currentTimeMillis()).outputPath(path);
 
     try {
-      writer.withCsvInput(new org.apache.carbondata.sdk.file.Schema(field)).build();
+      writer.withCsvInput(new org.apache.carbondata.sdk.file.Schema(field)).writtenBy("AvroCarbonWriterTest").build();
       Assert.fail();
     } catch (Exception e) {
       assert(e.getMessage().contains("Duplicate column name found in table schema"));
@@ -454,7 +454,7 @@ public class AvroCarbonWriterTest {
       Map<String, String> loadOptions = new HashMap<String, String>();
       loadOptions.put("bad_records_action", "fail");
       CarbonWriter carbonWriter =
-          writer.withLoadOptions(loadOptions).withCsvInput(new org.apache.carbondata.sdk.file.Schema(field)).build();
+          writer.withLoadOptions(loadOptions).withCsvInput(new org.apache.carbondata.sdk.file.Schema(field)).writtenBy("AvroCarbonWriterTest").build();
       carbonWriter.write(new String[] { "k", "20-02-2233" });
       carbonWriter.close();
       Assert.fail();
@@ -481,7 +481,7 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema);
     try {
       CarbonWriter writer = CarbonWriter.builder().outputPath(path)
-          .withAvroInput(new Schema.Parser().parse(avroSchema)).build();
+          .withAvroInput(new Schema.Parser().parse(avroSchema)).writtenBy("AvroCarbonWriterTest").build();
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index ba6d772..483ec88 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -123,7 +123,7 @@ public class CSVCarbonWriterTest {
 
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
-      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CSVCarbonWriterTest").build();
 
       for (int i = 0; i < 100; i++) {
         String[] row = new String[]{
@@ -224,7 +224,7 @@ public class CSVCarbonWriterTest {
     fields[1] = new Field("age", DataTypes.INT);
     try {
       carbonWriter = CarbonWriter.builder().
-          outputPath(path).withCsvInput(new Schema(fields)).build();
+          outputPath(path).withCsvInput(new Schema(fields)).writtenBy("CSVCarbonWriterTest").build();
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
       Assert.assertTrue(false);
@@ -244,7 +244,7 @@ public class CSVCarbonWriterTest {
     fields[1] = new Field("age", DataTypes.INT);
     try {
       carbonWriter = CarbonWriter.builder().
-          outputPath(path).withCsvInput(new Schema(fields)).build();
+          outputPath(path).withCsvInput(new Schema(fields)).writtenBy("CSVCarbonWriterTest").build();
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
       Assert.assertTrue(false);
@@ -270,7 +270,7 @@ public class CSVCarbonWriterTest {
           .taskNo(5)
           .outputPath(path);
 
-      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CSVCarbonWriterTest").build();
 
       for (int i = 0; i < 2; i++) {
         String[] row = new String[]{
@@ -345,7 +345,7 @@ public class CSVCarbonWriterTest {
 
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path);
-      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CSVCarbonWriterTest").build();
       for (int i = 0; i < 15; i++) {
         String[] row = new String[] { "robot" + (i % 10), String.valueOf(i + "." + i),
             String.valueOf(i + "." + i) };
@@ -380,7 +380,7 @@ public class CSVCarbonWriterTest {
 
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path);
-      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CSVCarbonWriterTest").build();
       for (int i = 0; i < 15; i++) {
         String[] row = new String[] { "robot" + (i % 10),  "" + i };
         writer.write(row);
@@ -414,7 +414,7 @@ public class CSVCarbonWriterTest {
 
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path);
-      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CSVCarbonWriterTest").build();
       for (int i = 0; i < 15; i++) {
         String[] row = new String[] { "robot" + (i % 10), "" + i, i + "." + i };
         writer.write(row);
@@ -454,7 +454,7 @@ public class CSVCarbonWriterTest {
 
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path);
-      CarbonWriter writer = builder.withCsvInput(new Schema(new Field[] {structType})).build();
+      CarbonWriter writer = builder.withCsvInput(new Schema(new Field[] {structType})).writtenBy("CSVCarbonWriterTest").build();
       for (int i = 0; i < 15; i++) {
         String[] row = new String[] { "robot" + (i % 10)+"$" + i+ "$" + i + "." + i };
         writer.write(row);
@@ -493,7 +493,7 @@ public class CSVCarbonWriterTest {
 
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder().taskNo(5).outputPath(path);
-      CarbonWriter writer = builder.withCsvInput(new Schema(new Field[] {structType1, structType2})).build();
+      CarbonWriter writer = builder.withCsvInput(new Schema(new Field[] {structType1, structType2})).writtenBy("CSVCarbonWriterTest").build();
       for (int i = 0; i < 15; i++) {
         String[] row = new String[] { "1.0$2.0$3.0", "1$2$3" };
         writer.write(row);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index f37b832..c8b2c96 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -502,7 +502,7 @@ public class CarbonReaderTest extends TestCase {
     CarbonWriter carbonWriter = null;
     try {
       carbonWriter = builder.outputPath(path1).uniqueIdentifier(12345)
-  .withCsvInput(schema).build();
+  .withCsvInput(schema).writtenBy("CarbonReaderTest").build();
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
     }
@@ -516,7 +516,7 @@ public class CarbonReaderTest extends TestCase {
     CarbonWriter carbonWriter1 = null;
     try {
       carbonWriter1 = builder1.outputPath(path2).uniqueIdentifier(12345)
-   .withCsvInput(schema1).build();
+   .withCsvInput(schema1).writtenBy("CarbonReaderTest").build();
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
     }
@@ -766,7 +766,6 @@ public class CarbonReaderTest extends TestCase {
 
     FileUtils.deleteDirectory(new File(path));
   }
-
   @Test
   public void testWriteAndReadFilesNonTransactional() throws IOException, InterruptedException {
     String path = "./testWriteFiles";
@@ -855,7 +854,7 @@ public class CarbonReaderTest extends TestCase {
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
 
-      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CarbonReaderTest").build();
 
       for (int i = 0; i < 100; i++) {
         String[] row = new String[]{
@@ -973,7 +972,7 @@ public class CarbonReaderTest extends TestCase {
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
 
-      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
+      CarbonWriter writer = builder.withCsvInput(new Schema(fields)).writtenBy("CarbonReaderTest").build();
 
       for (int i = 0; i < 100; i++) {
         String[] row2 = new String[]{
@@ -1081,7 +1080,8 @@ public class CarbonReaderTest extends TestCase {
     fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
 
     try {
-      CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
+      CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path)
+          .writtenBy("SDK_1.0.0");
 
       CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build();
 
@@ -1105,6 +1105,14 @@ public class CarbonReaderTest extends TestCase {
       Assert.fail(e.getMessage());
     }
 
+    File[] dataFiles1 = new File(path).listFiles(new FilenameFilter() {
+      @Override public boolean accept(File dir, String name) {
+        return name.endsWith("carbondata");
+      }
+    });
+    String versionDetails = CarbonSchemaReader.getVersionDetails(dataFiles1[0].getAbsolutePath());
+    assertTrue(versionDetails.contains("SDK_1.0.0 in version: "));
+
     File[] dataFiles2 = new File(path).listFiles(new FilenameFilter() {
       @Override public boolean accept(File dir, String name) {
         return name.endsWith("carbonindex");
@@ -1112,7 +1120,6 @@ public class CarbonReaderTest extends TestCase {
     });
 
     Schema schema = CarbonSchemaReader.readSchemaInIndexFile(dataFiles2[0].getAbsolutePath()).asOriginOrder();
-
     // Transform the schema
     String[] strings = new String[schema.getFields().length];
     for (int i = 0; i < schema.getFields().length; i++) {
@@ -1252,7 +1259,7 @@ public class CarbonReaderTest extends TestCase {
     try {
       CarbonWriter writer = CarbonWriter.builder()
           .outputPath(path)
-          .withAvroInput(nn).build();
+          .withAvroInput(nn).writtenBy("CarbonReaderTest").build();
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
index 103733c..f27358a 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
@@ -60,7 +60,7 @@ public class ConcurrentAvroSdkWriterTest {
     try {
       CarbonWriterBuilder builder =
           CarbonWriter.builder().outputPath(path).withThreadSafe(numOfThreads);
-      CarbonWriter writer = builder.withAvroInput(avroSchema).build();
+      CarbonWriter writer = builder.withAvroInput(avroSchema).writtenBy("ConcurrentAvroSdkWriterTest").build();
       // write in multi-thread
       for (int i = 0; i < numOfThreads; i++) {
         executorService.submit(new WriteLogic(writer, record));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9578786b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
index d207724..fded0d3 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
@@ -51,7 +51,7 @@ public class ConcurrentSdkWriterTest {
       CarbonWriterBuilder builder = CarbonWriter.builder()
           .outputPath(path).withThreadSafe(numOfThreads);
       CarbonWriter writer =
-          builder.withCsvInput(new Schema(fields)).build();
+          builder.withCsvInput(new Schema(fields)).writtenBy("ConcurrentSdkWriterTest").build();
       // write in multi-thread
       for (int i = 0; i < numOfThreads; i++) {
         executorService.submit(new WriteLogic(writer));