You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/06/26 09:14:35 UTC

carbondata git commit: [CARBONDATA-2608] SDK Support JSON data loading directly (without AVRO conversion)

Repository: carbondata
Updated Branches:
  refs/heads/master afcaecf20 -> 5804d7570


[CARBONDATA-2608] SDK Support JSON data loading directly (without AVRO conversion)

What changes were proposed in this pull request?
Currently SDK Support JSON data loading only with AVRO.
So, converting json to avro record and avro to carbon object is a two step process. Hence there is a need for a new carbonWriter that works with Json without AVRO.
This PR implents that.

Highlights:
#Works with just the json data and carbon schema.
#Implements hadoop's FileInputFormat to create JsonInputFormat.
#supports reading multiple json files in a folder.
#supports reading json data in multiline with record identifier.
#supports single row json read and write.
#Handles bad records when loading json data
#Supports n-level nesting of complex types as josn data

This closes #2384


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5804d757
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5804d757
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5804d757

Branch: refs/heads/master
Commit: 5804d757045ebcef749282c3983bf436f6b88c99
Parents: afcaecf
Author: ajantha-bhat <aj...@gmail.com>
Authored: Fri Jun 15 15:51:16 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Jun 26 14:44:24 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/core/util/CarbonUtil.java |   1 -
 .../jsonFiles/data/StructOfAllTypes.json        |  15 +
 .../jsonFiles/data/allPrimitiveType.json        |  11 +
 .../data/allPrimitiveTypeBadRecord.json         |  11 +
 .../jsonFiles/data/arrayOfStructOfStruct.json   |  21 ++
 .../data/arrayOfarrayOfarrayOfStruct.json       |  30 ++
 .../MultipleRowSingleLineJson.json              |   4 +
 .../JsonReaderTest/SingleRowSingleLineJson.json |   1 +
 ...RowMultipleLineJsonWithRecordIdentifier.json |  54 ++++
 ...RowMultipleLineJsonWithRecordIdentifier.json |  12 +
 ...leRowSingleLineJsonWithRecordIdentifier.json |   1 +
 .../allPrimitiveTypeMultipleRows.json           |  46 +++
 .../allPrimitiveTypeSingleArray.json            |  13 +
 .../jsonFiles/schema/StructOfAllTypes.avsc      |  85 ++++++
 .../jsonFiles/schema/arrayOfStructOfStruct.avsc |  51 ++++
 .../schema/arrayOfarrayOfarrayOfStruct.avsc     |  42 +++
 ...tNonTransactionalCarbonTableJsonWriter.scala | 286 +++++++++++++++++++
 .../loading/DataLoadProcessBuilder.java         |  37 ++-
 .../loading/jsoninput/JsonInputFormat.java      | 277 ++++++++++++++++++
 .../loading/jsoninput/JsonStreamReader.java     | 105 +++++++
 .../loading/model/CarbonLoadModel.java          |  13 +
 .../loading/parser/impl/JsonRowParser.java      | 157 ++++++++++
 .../loading/steps/InputProcessorStepImpl.java   |  32 +--
 .../InputProcessorStepWithNoConverterImpl.java  |  18 +-
 .../steps/JsonInputProcessorStepImpl.java       |  91 ++++++
 .../util/CarbonDataProcessorUtil.java           |  29 ++
 .../sdk/file/CarbonReaderBuilder.java           |   2 +
 .../sdk/file/CarbonWriterBuilder.java           |  18 +-
 .../carbondata/sdk/file/JsonCarbonWriter.java   |  92 ++++++
 29 files changed, 1515 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 4e2c16f..9b4962b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3117,4 +3117,3 @@ public final class CarbonUtil {
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/StructOfAllTypes.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/StructOfAllTypes.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/StructOfAllTypes.json
new file mode 100644
index 0000000..9806325
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/StructOfAllTypes.json
@@ -0,0 +1,15 @@
+{
+	"StructColumn":{
+		"stringField": "bob",
+		"intField": 10,
+		"longField": 12345678,
+		"doubleField": 123400.78,
+		"boolField": true,
+		"FloorNum": [ 1, 2],
+		"FloorString": [ "abc", "def"],
+		"FloorLong": [ 1234567, 2345678],
+		"FloorDouble": [ 1.0, 2.0, 33.33],
+		"FloorBool": [ true, false, false, true]
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveType.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveType.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveType.json
new file mode 100644
index 0000000..86648c3
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveType.json
@@ -0,0 +1,11 @@
+{
+	"stringField": "ajantha",
+	"intField": 26,
+	"shortField": 26,
+	"longField": 1234567,
+	"doubleField": 23.3333,
+	"boolField": false,
+	"dateField": "2019-03-02",
+	"timeField": "2019-02-12 03:03:34",
+	"decimalField" : 55.35
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveTypeBadRecord.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveTypeBadRecord.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveTypeBadRecord.json
new file mode 100644
index 0000000..ae15047
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/allPrimitiveTypeBadRecord.json
@@ -0,0 +1,11 @@
+{
+	"stringField": 123,
+	"intField": "string",
+	"shortField": 1234567,
+	"longField": 23.5,
+	"doubleField": "string",
+	"boolField": 10,
+	"dateField": 12345,
+	"timeField": 12345,
+	"decimalField" : "String"
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/arrayOfStructOfStruct.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/arrayOfStructOfStruct.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/arrayOfStructOfStruct.json
new file mode 100644
index 0000000..eeedc7f
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/arrayOfStructOfStruct.json
@@ -0,0 +1,21 @@
+{
+	"name": "bob",
+	"age": 10,
+	"doorNum": [
+		{
+			"street": "abc",
+			"city": "city1",
+			"FloorNum": {"wing" : "a", "number" : 1}
+		},
+		{
+			"street": "def",
+			"city": "city2",
+			"FloorNum": {"wing" : "b", "number" : 0}
+		},
+		{
+			"street": "ghi",
+			"city": "city3",
+			"FloorNum": {"wing" : "a", "number" : 2}
+		}
+	]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/arrayOfarrayOfarrayOfStruct.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/arrayOfarrayOfarrayOfStruct.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/arrayOfarrayOfarrayOfStruct.json
new file mode 100644
index 0000000..ce45c02
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/arrayOfarrayOfarrayOfStruct.json
@@ -0,0 +1,30 @@
+{
+	"name": "ajantha",
+	"age": 26,
+	"BuildNum": [
+		[
+			[
+				{"street":"abc", "city":"city1"},
+				{"street":"def", "city":"city2"},
+				{"street":"cfg", "city":"city3"}
+			],
+			[
+				 {"street":"abc1", "city":"city3"},
+				 {"street":"def1", "city":"city4"},
+				 {"street":"cfg1", "city":"city5"}
+			]
+		],
+		[
+			[
+				 {"street":"abc2", "city":"cityx"},
+				 {"street":"abc3", "city":"cityy"},
+				 {"street":"abc4", "city":"cityz"}
+			],
+			[
+				 {"street":"a1bc", "city":"cityA"},
+				 {"street":"a1bc", "city":"cityB"},
+				 {"street":"a1bc", "city":"cityc"}
+			]
+		]
+	]
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/MultipleRowSingleLineJson.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/MultipleRowSingleLineJson.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/MultipleRowSingleLineJson.json
new file mode 100644
index 0000000..555bd6d
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/MultipleRowSingleLineJson.json
@@ -0,0 +1,4 @@
+{"stringField": "kkkk","intField": 26,"shortField": 26,"longField": 1234567,"doubleField": 23.3333,"boolField": false,"dateField": "2019-03-02","timeField": "2019-02-12 03:03:34","decimalField" : 55.35}
+{"stringField": "bbbb","intField": 26,"shortField": 26,"longField": 1234567,"doubleField": 23.3333,"boolField": false,"dateField": "2019-03-02","timeField": "2019-02-12 03:03:34","decimalField" : 55.35}
+{"stringField": "cccc","intField": 26,"shortField": 26,"longField": 1234567,"doubleField": 23.3333,"boolField": false,"dateField": "2019-03-02","timeField": "2019-02-12 03:03:34","decimalField" : 55.35}
+{"stringField": "dddd","intField": 26,"shortField": 26,"longField": 1234567,"doubleField": 23.3333,"boolField": false,"dateField": "2019-03-02","timeField": "2019-02-12 03:03:34","decimalField" : 55.35}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/SingleRowSingleLineJson.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/SingleRowSingleLineJson.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/SingleRowSingleLineJson.json
new file mode 100644
index 0000000..7f652db
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/SingleRowSingleLineJson.json
@@ -0,0 +1 @@
+{"stringField": "kkkk","intField": 26,"shortField": 26,"longField": 1234567,"doubleField": 23.3333,"boolField": false,"dateField": "2019-03-02","timeField": "2019-02-12 03:03:34","decimalField" : 55.35}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/MultipleRowMultipleLineJsonWithRecordIdentifier.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/MultipleRowMultipleLineJsonWithRecordIdentifier.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/MultipleRowMultipleLineJsonWithRecordIdentifier.json
new file mode 100644
index 0000000..468fad7
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/MultipleRowMultipleLineJsonWithRecordIdentifier.json
@@ -0,0 +1,54 @@
+[
+	{
+		"jsonData": {
+			"stringField": "ajantha",
+			"intField": 26,
+			"shortField": 26,
+			"longField": 1234567,
+			"doubleField": 23.3333,
+			"boolField": false,
+			"dateField": "2019-03-02",
+			"timeField": "2019-02-12 03:03:34",
+			"decimalField": 55.35
+		}
+	},
+	{
+		"jsonData": {
+			"stringField": "ab",
+			"intField": 25,
+			"shortField": 25,
+			"longField": 1234567,
+			"doubleField": 23.3333,
+			"boolField": false,
+			"dateField": "2018-03-02",
+			"timeField": "2018-02-12 03:03:34",
+			"decimalField": 55.35
+		}
+	},
+	{
+		"jsonData": {
+			"stringField": "cd",
+			"intField": 24,
+			"shortField": 24,
+			"longField": 1234567,
+			"doubleField": 23.3333,
+			"boolField": false,
+			"dateField": "2017-03-02",
+			"timeField": "2017-02-12 03:03:34",
+			"decimalField": 55.35
+		}
+	},
+	{
+		"jsonData": {
+			"stringField": "ef",
+			"intField": 23,
+			"shortField": 23,
+			"longField": 1234567,
+			"doubleField": 23.3333,
+			"boolField": false,
+			"dateField": "2016-03-02",
+			"timeField": "2016-02-12 03:03:34",
+			"decimalField": 55.35
+		}
+	}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/SingleRowMultipleLineJsonWithRecordIdentifier.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/SingleRowMultipleLineJsonWithRecordIdentifier.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/SingleRowMultipleLineJsonWithRecordIdentifier.json
new file mode 100644
index 0000000..8e6adc4
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/SingleRowMultipleLineJsonWithRecordIdentifier.json
@@ -0,0 +1,12 @@
+{"jsonData":{
+		"stringField": "ajantha",
+		"intField": 26,
+		"shortField": 26,
+		"longField": 1234567,
+		"doubleField": 23.3333,
+		"boolField": false,
+		"dateField": "2019-03-02",
+		"timeField": "2019-02-12 03:03:34",
+		"decimalField": 55.35
+	     }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/SingleRowSingleLineJsonWithRecordIdentifier.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/SingleRowSingleLineJsonWithRecordIdentifier.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/SingleRowSingleLineJsonWithRecordIdentifier.json
new file mode 100644
index 0000000..02c8b80
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/JsonReaderTest/withRecordIdentifier/SingleRowSingleLineJsonWithRecordIdentifier.json
@@ -0,0 +1 @@
+{"jsonField":{"stringField": "kkkk","intField": 26,"shortField": 26,"longField": 1234567,"doubleField": 23.3333,"boolField": false,"dateField": "2019-03-02","timeField": "2019-02-12 03:03:34","decimalField" : 55.35}}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/allPrimitiveTypeMultipleRows.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/allPrimitiveTypeMultipleRows.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/allPrimitiveTypeMultipleRows.json
new file mode 100644
index 0000000..89e362d
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/allPrimitiveTypeMultipleRows.json
@@ -0,0 +1,46 @@
+[
+	{
+		"stringField": "ajantha",
+		"intField": 26,
+		"shortField": 26,
+		"longField": 1234567,
+		"doubleField": 23.3333,
+		"boolField": false,
+		"dateField": "2019-03-02",
+		"timeField": "2019-02-12 03:03:34",
+		"decimalField": 55.35
+	},
+	{
+		"stringField": "ab",
+		"intField": 25,
+		"shortField": 25,
+		"longField": 1234567,
+		"doubleField": 23.3333,
+		"boolField": false,
+		"dateField": "2018-03-02",
+		"timeField": "2018-02-12 03:03:34",
+		"decimalField": 55.35
+	},
+	{
+		"stringField": "cd",
+		"intField": 24,
+		"shortField": 24,
+		"longField": 1234567,
+		"doubleField": 23.3333,
+		"boolField": false,
+		"dateField": "2017-03-02",
+		"timeField": "2017-02-12 03:03:34",
+		"decimalField": 55.35
+	},
+	{
+		"stringField": "ef",
+		"intField": 23,
+		"shortField": 23,
+		"longField": 1234567,
+		"doubleField": 23.3333,
+		"boolField": false,
+		"dateField": "2016-03-02",
+		"timeField": "2016-02-12 03:03:34",
+		"decimalField": 55.35
+	}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/allPrimitiveTypeSingleArray.json
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/allPrimitiveTypeSingleArray.json b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/allPrimitiveTypeSingleArray.json
new file mode 100644
index 0000000..554ebb9
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/data/similarSchemaFiles/allPrimitiveTypeSingleArray.json
@@ -0,0 +1,13 @@
+[
+	{
+		"stringField": "ZZ",
+		"intField": 100,
+		"shortField": 100,
+		"longField": 1234567,
+		"doubleField": 23.3333,
+		"boolField": false,
+		"dateField": "2020-03-02",
+		"timeField": "2020-02-12 03:03:34",
+		"decimalField": 55.35
+	}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/schema/StructOfAllTypes.avsc
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/schema/StructOfAllTypes.avsc b/integration/spark-common-test/src/test/resources/jsonFiles/schema/StructOfAllTypes.avsc
new file mode 100644
index 0000000..259d734
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/schema/StructOfAllTypes.avsc
@@ -0,0 +1,85 @@
+{
+	"name": "address",
+	"type": "record",
+	"fields": [
+		{
+    	"name": "StructColumn",
+    	"type": {
+    		"type": "record",
+    		"name": "my_struct",
+    		"fields": [
+    			{
+    				"name": "stringField",
+    				"type": "string"
+    			},
+    			{
+    				"name": "intField",
+    				"type": "int"
+    			},
+    			{
+    				"name": "longField",
+    				"type": "long"
+    			},
+    			{
+    				"name": "doubleField",
+    				"type": "double"
+    			},
+    			{
+    				"name": "boolField",
+    				"type": "boolean"
+    			},
+    			{
+    				"name": "FloorNum",
+    				"type": {
+    					"type": "array",
+    					"items": {
+    						"name": "IntegerE",
+    						"type": "int"
+    					}
+    				}
+    			},
+    			{
+    				"name": "FloorString",
+    				"type": {
+    					"type": "array",
+    					"items": {
+    						"name": "StringE",
+    						"type": "string"
+    					}
+    				}
+    			},
+    			{
+    				"name": "FloorLong",
+    				"type": {
+    					"type": "array",
+    					"items": {
+    						"name": "longFieldE",
+    						"type": "long"
+    					}
+    				}
+    			},
+    			{
+    				"name": "FloorDouble",
+    				"type": {
+    					"type": "array",
+    					"items": {
+    						"name": "doubleFieldE",
+    						"type": "double"
+    					}
+    				}
+    			},
+    			{
+    				"name": "FloorBool",
+    				"type": {
+    					"type": "array",
+    					"items": {
+    						"name": "boolFieldE",
+    						"type": "boolean"
+    					}
+    				}
+    			}
+    		]
+    	}
+    }
+	]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/schema/arrayOfStructOfStruct.avsc
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/schema/arrayOfStructOfStruct.avsc b/integration/spark-common-test/src/test/resources/jsonFiles/schema/arrayOfStructOfStruct.avsc
new file mode 100644
index 0000000..7cf8f5d
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/schema/arrayOfStructOfStruct.avsc
@@ -0,0 +1,51 @@
+{
+                     	"name": "address",
+                     	"type": "record",
+                     	"fields": [
+                     		{
+                     			"name": "name",
+                     			"type": "string"
+                     		},
+                     		{
+                     			"name": "age",
+                     			"type": "int"
+                     		},
+                     		{
+                     			"name": "doorNum",
+                     			"type": {
+                     				"type": "array",
+                     				"items": {
+                     					"type": "record",
+                     					"name": "my_address",
+                     					"fields": [
+                     						{
+                     							"name": "street",
+                     							"type": "string"
+                     						},
+                     						{
+                     							"name": "city",
+                     							"type": "string"
+                     						},
+                     						{
+                     							"name": "FloorNum",
+                                     			"type": {
+                                     				"type": "record",
+                                     				"name": "Floor",
+                                     				"fields": [
+                                     					{
+                                     						"name": "wing",
+                                     						"type": "string"
+                                     					},
+                                     					{
+                                     						"name": "number",
+                                     						"type": "int"
+                                     					}
+                                     				]
+                                     			}
+                     						}
+                     					]
+                     				}
+                     			}
+                     		}
+                     	]
+                     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/resources/jsonFiles/schema/arrayOfarrayOfarrayOfStruct.avsc
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/jsonFiles/schema/arrayOfarrayOfarrayOfStruct.avsc b/integration/spark-common-test/src/test/resources/jsonFiles/schema/arrayOfarrayOfarrayOfStruct.avsc
new file mode 100644
index 0000000..b6ac8b0
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/jsonFiles/schema/arrayOfarrayOfarrayOfStruct.avsc
@@ -0,0 +1,42 @@
+{
+"name": "address",
+"type": "record",
+"fields": [
+	{
+		"name": "name",
+		"type": "string"
+	},
+	{
+		"name": "age",
+		"type": "int"
+	},
+	{
+		"name": "BuildNum",
+		"type": {
+			"type": "array",
+			"items": {
+				"name": "FloorNum",
+				"type": "array",
+				"items": {
+					"name": "doorNum",
+					"type": "array",
+					"items": {
+						"name": "my_address",
+						"type": "record",
+						"fields": [
+							{
+								"name": "street",
+								"type": "string"
+							},
+							{
+								"name": "city",
+								"type": "string"
+							}
+						]
+					}
+				}
+			}
+		}
+	}
+]
+} 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
new file mode 100644
index 0000000..299c966
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.{File, IOException}
+import java.sql.Timestamp
+import java.util
+
+import org.apache.avro
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.junit.Assert
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.sdk.file._
+
+import scala.collection.JavaConverters._
+
+class TestNonTransactionalCarbonTableJsonWriter extends QueryTest with BeforeAndAfterAll {
+
+  var writerPath = new File(this.getClass.getResource("/").getPath
+                            + "../."
+                            + "./target/SparkCarbonFileFormat/WriterOutput/").getCanonicalPath
+  //getCanonicalPath gives path with \, but the code expects /.
+  writerPath = writerPath.replace("\\", "/")
+
+  var backupdateFormat = CarbonProperties.getInstance().getProperty(
+    CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+
+  var backupTimeFormat = CarbonProperties.getInstance().getProperty(
+    CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+    CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+
+  override def beforeAll(): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        backupTimeFormat)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        backupdateFormat)
+  }
+
+  /**
+   * Utility function to read a whole file as a string,
+   * Must not use this if the file is very huge. As it may result in memory exhaustion.
+   *
+   * @param filePath
+   * @return
+   */
+  def readFromFile(filePath: String): String = {
+    val file = new File(filePath)
+    val uri = file.toURI
+    try {
+      val bytes = java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(uri))
+      new String(bytes, "UTF-8")
+    } catch {
+      case e: IOException =>
+        e.printStackTrace()
+        return "ERROR loading file " + filePath
+    }
+  }
+
+  private def writeCarbonFileFromJsonRowInput(jsonRow: String,
+      carbonSchema: Schema) = {
+    try {
+      var options: util.Map[String, String] = Map("bAd_RECords_action" -> "FAIL").asJava
+      val writer = CarbonWriter.builder
+        .outputPath(writerPath).isTransactionalTable(false)
+        .uniqueIdentifier(System.currentTimeMillis())
+        .withLoadOptions(options)
+        .buildWriterForJsonInput(carbonSchema)
+      writer.write(jsonRow)
+      writer.close()
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        Assert.fail(e.getMessage)
+      }
+    }
+  }
+
+  // test all primitive type
+  test("Read sdk writer Json output of all primitive type") {
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    var dataPath: String = null
+    dataPath = resourcesPath + "/jsonFiles/data/allPrimitiveType.json"
+
+    val fields = new Array[Field](9)
+    fields(0) = new Field("stringField", DataTypes.STRING)
+    fields(1) = new Field("intField", DataTypes.INT)
+    fields(2) = new Field("shortField", DataTypes.SHORT)
+    fields(3) = new Field("longField", DataTypes.LONG)
+    fields(4) = new Field("doubleField", DataTypes.DOUBLE)
+    fields(5) = new Field("boolField", DataTypes.BOOLEAN)
+    fields(6) = new Field("dateField", DataTypes.DATE)
+    fields(7) = new Field("timeField", DataTypes.TIMESTAMP)
+    fields(8) = new Field("decimalField", DataTypes.createDecimalType(8, 2))
+
+    val jsonRow = readFromFile(dataPath)
+    writeCarbonFileFromJsonRowInput(jsonRow, new Schema(fields))
+    assert(new File(writerPath).exists())
+
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"),
+      Seq(Row("ajantha",
+        26,
+        26,
+        1234567,
+        23.3333,
+        false,
+        java.sql.Date.valueOf("2019-03-02"),
+        Timestamp.valueOf("2019-02-12 03:03:34"),
+        55.35)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).listFiles().length > 0)
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  // test all primitive type with bad record
+  test("Read sdk writer Json output of all primitive type with Bad record") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    var dataPath: String = null
+    dataPath = resourcesPath + "/jsonFiles/data/allPrimitiveTypeBadRecord.json"
+
+    val fields = new Array[Field](9)
+    fields(0) = new Field("stringField", DataTypes.STRING)
+    fields(1) = new Field("intField", DataTypes.INT)
+    fields(2) = new Field("shortField", DataTypes.SHORT)
+    fields(3) = new Field("longField", DataTypes.LONG)
+    fields(4) = new Field("doubleField", DataTypes.DOUBLE)
+    fields(5) = new Field("boolField", DataTypes.BOOLEAN)
+    fields(6) = new Field("dateField", DataTypes.DATE)
+    fields(7) = new Field("timeField", DataTypes.TIMESTAMP)
+    fields(8) = new Field("decimalField", DataTypes.createDecimalType(8, 2))
+
+    val jsonRow = readFromFile(dataPath)
+    var exception = intercept[java.lang.AssertionError] {
+      writeCarbonFileFromJsonRowInput(jsonRow, new Schema(fields))
+    }
+    assert(exception.getMessage()
+      .contains("Data load failed due to bad record"))
+
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  // test array Of array Of array Of Struct
+  test("Read sdk writer Json output of array Of array Of array Of Struct") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    var dataPath = resourcesPath + "/jsonFiles/data/arrayOfarrayOfarrayOfStruct.json"
+    // for testing purpose get carbonSchema from avro schema.
+    // Carbon schema will be passed without AVRO in the real scenarios
+    var schemaPath = resourcesPath + "/jsonFiles/schema/arrayOfarrayOfarrayOfStruct.avsc"
+    val avroSchema = new avro.Schema.Parser().parse(readFromFile(schemaPath))
+    val carbonSchema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema)
+
+    val jsonRow = readFromFile(dataPath)
+    writeCarbonFileFromJsonRowInput(jsonRow, carbonSchema)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(false)
+    /*
+    * +-------+---+-----------------------------------------+
+      |name   |age|BuildNum                                 |
+      +-------+---+-----------------------------------------+
+      |ajantha|26 |[WrappedArray(WrappedArray([abc,city1]))]|
+      +-------+---+-----------------------------------------+
+    *
+    */
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).listFiles().length > 0)
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  // test array Of Struct Of Struct
+  test("Read sdk writer Json output of array Of Struct Of Struct") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    var dataPath = resourcesPath + "/jsonFiles/data/arrayOfStructOfStruct.json"
+    // for testing purpose get carbonSchema from avro schema.
+    // Carbon schema will be passed without AVRO in the real scenarios
+    var schemaPath = resourcesPath + "/jsonFiles/schema/arrayOfStructOfStruct.avsc"
+    val avroSchema = new avro.Schema.Parser().parse(readFromFile(schemaPath))
+    val carbonSchema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema)
+
+    val jsonRow = readFromFile(dataPath)
+    writeCarbonFileFromJsonRowInput(jsonRow, carbonSchema)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    /*
+    *  +----+---+-------------------+
+    *  |name|age|doorNum            |
+    *  +----+---+-------------------+
+    *  |bob |10 |[[abc,city1,[a,1]]]|
+    *  +----+---+-------------------+
+    * */
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).listFiles().length > 0)
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  // test struct of all types
+  test("Read sdk writer Json output of Struct of all types") {
+    FileUtils.deleteDirectory(new File(writerPath))
+    var dataPath = resourcesPath + "/jsonFiles/data/StructOfAllTypes.json"
+    // for testing purpose get carbonSchema from avro schema.
+    // Carbon schema will be passed without AVRO in the real scenarios
+    var schemaPath = resourcesPath + "/jsonFiles/schema/StructOfAllTypes.avsc"
+    val avroSchema = new avro.Schema.Parser().parse(readFromFile(schemaPath))
+    val carbonSchema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema)
+
+    val jsonRow = readFromFile(dataPath)
+    writeCarbonFileFromJsonRowInput(jsonRow, carbonSchema)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(false)
+    /*
+    * +--------------------------------------------------------------------+
+      |StructColumn                                                        |
+      +--------------------------------------------------------------------+
+      |[bob,10,12345678,123400.78,true,WrappedArray(1),WrappedArray(abc),  |
+      | WrappedArray(1234567),WrappedArray(1.0),WrappedArray(true)]        |
+      +--------------------------------------------------------------------+
+    * */
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).listFiles().length > 0)
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index b7b725c..2a9ab6d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -46,6 +46,7 @@ import org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorSt
 import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl;
 import org.apache.carbondata.processing.loading.steps.InputProcessorStepImpl;
 import org.apache.carbondata.processing.loading.steps.InputProcessorStepWithNoConverterImpl;
+import org.apache.carbondata.processing.loading.steps.JsonInputProcessorStepImpl;
 import org.apache.carbondata.processing.loading.steps.SortProcessorStepImpl;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -64,8 +65,10 @@ public final class DataLoadProcessBuilder {
     SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
     if (loadModel.isLoadWithoutConverterStep()) {
       return buildInternalWithNoConverter(inputIterators, configuration, sortScope);
-    } else if (!configuration.isSortTable() ||
-        sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
+    } else if (loadModel.isJsonFileLoad()) {
+      return buildInternalWithJsonInputProcessor(inputIterators, configuration, sortScope);
+    } else if (!configuration.isSortTable() || sortScope.equals(
+        SortScopeOptions.SortScope.NO_SORT)) {
       return buildInternalForNoSort(inputIterators, configuration);
     } else if (configuration.getBucketingInfo() != null) {
       return buildInternalForBucketing(inputIterators, configuration);
@@ -131,6 +134,36 @@ public final class DataLoadProcessBuilder {
     }
   }
 
+  /**
+   * Build pipe line for Load with json input processor.
+   */
+  private AbstractDataLoadProcessorStep buildInternalWithJsonInputProcessor(
+      CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration,
+      SortScopeOptions.SortScope sortScope) {
+    // currently row by row conversion of string json to carbon row is supported.
+    AbstractDataLoadProcessorStep inputProcessorStep =
+        new JsonInputProcessorStepImpl(configuration, inputIterators);
+    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+    // data types and configurations.
+    AbstractDataLoadProcessorStep converterProcessorStep =
+        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
+    if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT)) {
+      AbstractDataLoadProcessorStep sortProcessorStep =
+          new SortProcessorStepImpl(configuration, converterProcessorStep);
+      //  Writes the sorted data in carbondata format.
+      return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
+    } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
+      //  Sorts the data by SortColumn or not
+      AbstractDataLoadProcessorStep sortProcessorStep =
+          new SortProcessorStepImpl(configuration, converterProcessorStep);
+      // Writes the sorted data in carbondata format.
+      return new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep);
+    } else {
+      // In all other cases like global sort and no sort uses this step
+      return new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep);
+    }
+  }
+
   private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators,
       CarbonDataLoadConfiguration configuration) {
     // 1. Reads the data input iterators and parses the data.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java
new file mode 100644
index 0000000..3eb36c9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonInputFormat.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.jsoninput;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.security.InvalidParameterException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
+ * The JsonInputFormat will read two types of JSON formatted data. The default
+ * expectation is each JSON record is newline delimited. This method is
+ * generally faster and is backed by the {@link LineRecordReader} you are likely
+ * familiar with. The other method is 'pretty print' of JSON records, where
+ * records span multiple lines and often have some type of root identifier. This
+ * method is likely slower, but respects record boundaries much like the
+ * LineRecordReader.<br>
+ * <br>
+ * Use of the 'pretty print' reader requires a record identifier.
+ */
+public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
+
+  private static JsonFactory factory = new JsonFactory();
+
+  private static ObjectMapper mapper = new ObjectMapper(factory);
+
+  public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line";
+
+  public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier";
+
+  @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    RecordReader<LongWritable, Text> rdr;
+
+    if (JsonInputFormat.getOneRecordPerLine(context.getConfiguration())) {
+      rdr = new SimpleJsonRecordReader();
+    } else {
+      return new JsonRecordReader();
+    }
+    rdr.initialize(split, context);
+    return rdr;
+  }
+
+  /**
+   * This class uses the {@link LineRecordReader} to read a line of JSON and
+   * return it as a Text object.
+   */
+  public static class SimpleJsonRecordReader extends RecordReader<LongWritable, Text> {
+
+    private LineRecordReader reader = null;
+
+    private LongWritable outKey = new LongWritable(0L);
+
+    private Text outValue = new Text();
+
+    @Override public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException {
+
+      reader = new LineRecordReader();
+      reader.initialize(split, context);
+    }
+
+    @Override public boolean nextKeyValue() throws IOException {
+      if (reader.nextKeyValue()) {
+        outValue.set(reader.getCurrentValue());
+        outKey.set(reader.getCurrentKey().get());
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override public void close() throws IOException {
+      reader.close();
+    }
+
+    @Override public float getProgress() throws IOException {
+      return reader.getProgress();
+    }
+
+    @Override public LongWritable getCurrentKey() {
+      return outKey;
+    }
+
+    @Override public Text getCurrentValue() {
+      return outValue;
+    }
+  }
+
+  /**
+   * This class uses the {@link JsonStreamReader} to read JSON records from a
+   * file. It respects split boundaries to complete full JSON records, as
+   * specified by the root identifier. This class will discard any records
+   * that it was unable to decode using
+   * {@link JsonInputFormat#decodeLineToJsonNode(String)}
+   */
+  public static class JsonRecordReader extends RecordReader<LongWritable, Text> {
+
+    private Logger LOG = Logger.getLogger(JsonRecordReader.class);
+
+    private JsonStreamReader rdr = null;
+
+    private long start = 0, end = 0;
+
+    private float toRead = 0;
+
+    private String identifier = null;
+
+    private Logger log = Logger.getLogger(JsonRecordReader.class);
+
+    private Text outJson = new Text();
+
+    private LongWritable outKey = new LongWritable();
+
+    @Override public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+
+      this.identifier = JsonInputFormat.getRecordIdentifier(context.getConfiguration());
+
+      if (this.identifier == null || identifier.isEmpty()) {
+        throw new InvalidParameterException(JsonInputFormat.RECORD_IDENTIFIER + " is not set.");
+      } else {
+        LOG.info("Initializing JsonRecordReader with identifier " + identifier);
+      }
+
+      FileSplit fSplit = (FileSplit) split;
+
+      // get relevant data
+      Path file = fSplit.getPath();
+
+      log.info("File is " + file);
+
+      start = fSplit.getStart();
+      end = start + split.getLength();
+      toRead = end - start;
+
+      FSDataInputStream strm = FileSystem.get(context.getConfiguration()).open(file);
+
+      if (start != 0) {
+        strm.seek(start);
+      }
+
+      rdr = new JsonStreamReader(identifier, new BufferedInputStream(strm));
+    }
+
+    @Override public boolean nextKeyValue() throws IOException {
+      boolean retVal = false;
+      boolean keepGoing;
+      do {
+        keepGoing = false;
+        String record = rdr.getJsonRecord();
+        if (record != null) {
+          if (JsonInputFormat.decodeLineToJsonNode(record) == null) {
+            keepGoing = true;
+          } else {
+            outJson.set(record);
+            outKey.set(rdr.getBytesRead());
+            retVal = true;
+          }
+        }
+      } while (keepGoing);
+
+      return retVal;
+    }
+
+    @Override public void close() throws IOException {
+      rdr.close();
+    }
+
+    @Override public float getProgress() {
+      return (float) rdr.getBytesRead() / toRead;
+    }
+
+    @Override public LongWritable getCurrentKey() {
+      return outKey;
+    }
+
+    @Override public Text getCurrentValue() {
+      return outJson;
+    }
+  }
+
+  /**
+   * Decodes a given string of text to a {@link JsonNode}.
+   *
+   * @param line The line of text
+   * @return The JsonNode or null if a JsonParseException,
+   * JsonMappingException, or IOException error occurs
+   */
+  public static synchronized JsonNode decodeLineToJsonNode(String line) {
+
+    try {
+      return mapper.readTree(line);
+    } catch (IOException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Sets the input format to use the {@link SimpleJsonRecordReader} if true,
+   * otherwise {@link JsonRecordReader}.<br>
+   * <br>
+   * Default is true.
+   *
+   * @param job                The job to configure
+   * @param isOneRecordPerLine True if JSON records are new line delimited, false otherwise.
+   */
+  public static void setOneRecordPerLine(Job job, boolean isOneRecordPerLine) {
+    job.getConfiguration().setBoolean(ONE_RECORD_PER_LINE, isOneRecordPerLine);
+  }
+
+  /**
+   * Gets if this is configured as one JSON record per line.
+   *
+   * @param conf the Job configuration
+   * @return True if one JSON record per line, false otherwise.
+   */
+  public static boolean getOneRecordPerLine(Configuration conf) {
+    return conf.getBoolean(ONE_RECORD_PER_LINE, false);
+  }
+
+  /**
+   * Specifies a record identifier to be used with the
+   * {@link JsonRecordReader}<br>
+   * <br>
+   * Must be set if {@link JsonInputFormat#setOneRecordPerLine} is false.
+   *
+   * @param job    The job to configure
+   * @param record The record identifier
+   */
+  public static void setRecordIdentifier(Job job, String record) {
+    job.getConfiguration().set(RECORD_IDENTIFIER, record);
+  }
+
+  /**
+   * Gets the record identifier
+   *
+   * @param conf the Job configuration
+   * @return The record identifier or null if not set
+   */
+  public static String getRecordIdentifier(Configuration conf) {
+    return conf.get(RECORD_IDENTIFIER);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonStreamReader.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonStreamReader.java b/processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonStreamReader.java
new file mode 100644
index 0000000..8547c9a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/jsoninput/JsonStreamReader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.jsoninput;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+
+/**
+ * Code ported from Hydra-Spark {package com.pluralsight.hydra.hadoop.io} package
+ * The JsonStreamReader handles byte-by-byte reading of a JSON stream, creating
+ * records based on a base 'identifier'. This identifier is given at object
+ * creation.
+ */
+public class JsonStreamReader extends BufferedReader {
+
+  private StringBuilder bldr = new StringBuilder();
+
+  private String identifier = null;
+
+  private long bytesRead = 0;
+
+  public JsonStreamReader(String identifier, InputStream strm) {
+    super(new InputStreamReader(strm, Charset.defaultCharset()));
+    this.identifier = identifier;
+  }
+
+  /**
+   * Advances the input stream to the next JSON record, returned a String
+   * object.
+   *
+   * @return A string of JSON or null
+   * @throws IOException If an error occurs reading from the stream
+   */
+  public String getJsonRecord() throws IOException {
+    bldr.delete(0, bldr.length());
+
+    boolean foundRecord = false;
+
+    int c = 0, numBraces = 1;
+    while ((c = super.read()) != -1) {
+      ++bytesRead;
+      if (!foundRecord) {
+        bldr.append((char) c);
+
+        if (bldr.toString().contains(identifier)) {
+          forwardToBrace();
+          foundRecord = true;
+
+          bldr.delete(0, bldr.length());
+          bldr.append('{');
+        }
+      } else {
+        bldr.append((char) c);
+
+        if (c == '{') {
+          ++numBraces;
+        } else if (c == '}') {
+          --numBraces;
+        }
+
+        if (numBraces == 0) {
+          break;
+        }
+      }
+    }
+
+    if (foundRecord) {
+      return bldr.toString();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Gets the number of bytes read by the stream reader
+   *
+   * @return The number of bytes read
+   */
+  public long getBytesRead() {
+    return bytesRead;
+  }
+
+  private void forwardToBrace() throws IOException {
+    while (super.read() != '{') {
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 4d3f3fc..b9b42b2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -208,6 +208,11 @@ public class CarbonLoadModel implements Serializable {
   private boolean isLoadWithoutConverterStep;
 
   /**
+   * To identify the suitable input processor step for json file loading.
+   */
+  private boolean isJsonFileLoad;
+
+  /**
    * Flder path to where data should be written for this load.
    */
   private String dataWritePath;
@@ -850,6 +855,14 @@ public class CarbonLoadModel implements Serializable {
     isLoadWithoutConverterStep = loadWithoutConverterStep;
   }
 
+  public boolean isJsonFileLoad() {
+    return isJsonFileLoad;
+  }
+
+  public void setJsonFileLoad(boolean isJsonFileLoad) {
+    this.isJsonFileLoad = isJsonFileLoad;
+  }
+
   public String getDataWritePath() {
     return dataWritePath;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java
new file mode 100644
index 0000000..6b06d89
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
+import org.apache.carbondata.processing.loading.complexobjects.StructObject;
+import org.apache.carbondata.processing.loading.parser.RowParser;
+
+import org.apache.htrace.fasterxml.jackson.core.type.TypeReference;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+
+public class JsonRowParser implements RowParser {
+
+  private DataField[] dataFields;
+
+  public JsonRowParser(DataField[] dataFields) {
+    this.dataFields = dataFields;
+  }
+
+  @Override public Object[] parseRow(Object[] row) {
+    try {
+      return convertJsonToNoDictionaryToBytes((String) row[0]);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Object[] convertJsonToNoDictionaryToBytes(String jsonString)
+      throws IOException {
+    ObjectMapper objectMapper = new ObjectMapper();
+    try {
+      Map<String, Object> jsonNodeMap =
+          objectMapper.readValue(jsonString, new TypeReference<Map<String, Object>>() {
+          });
+      return jsonToCarbonRecord(jsonNodeMap, dataFields);
+    } catch (IOException e) {
+      throw new IOException("Failed to parse Json String: " + e.getMessage());
+    }
+  }
+
+  private Object[] jsonToCarbonRecord(Map<String, Object> jsonNodeMap, DataField[] dataFields) {
+    List<Object> fields = new ArrayList<>();
+    for (DataField dataField : dataFields) {
+      Object field = jsonToCarbonObject(jsonNodeMap, dataField.getColumn());
+      if (field != null) {
+        fields.add(field);
+      }
+    }
+    // use this array object to form carbonRow
+    return fields.toArray();
+  }
+
+  private Object jsonToCarbonObject(Map<String, Object> jsonNodeMap, CarbonColumn column) {
+    DataType type = column.getDataType();
+    if (DataTypes.isArrayType(type)) {
+      CarbonDimension carbonDimension = (CarbonDimension) column;
+      int size = carbonDimension.getNumberOfChild();
+      ArrayList array = (ArrayList) jsonNodeMap.get(extractChildColumnName(column));
+      // stored as array in carbonObject
+      Object[] arrayChildObjects = new Object[size];
+      for (int i = 0; i < size; i++) {
+        CarbonDimension childCol = carbonDimension.getListOfChildDimensions().get(i);
+        arrayChildObjects[i] = jsonChildElementToCarbonChildElement(array.get(i), childCol);
+      }
+      return new ArrayObject(arrayChildObjects);
+    } else if (DataTypes.isStructType(type)) {
+      CarbonDimension carbonDimension = (CarbonDimension) column;
+      int size = carbonDimension.getNumberOfChild();
+      Map<String, Object> jsonMap =
+          (Map<String, Object>) jsonNodeMap.get(extractChildColumnName(column));
+      Object[] structChildObjects = new Object[size];
+      for (int i = 0; i < size; i++) {
+        CarbonDimension childCol = carbonDimension.getListOfChildDimensions().get(i);
+        Object childObject =
+            jsonChildElementToCarbonChildElement(jsonMap.get(extractChildColumnName(childCol)),
+                childCol);
+        if (childObject != null) {
+          structChildObjects[i] = childObject;
+        }
+      }
+      return new StructObject(structChildObjects);
+    } else {
+      // primitive type
+      return jsonNodeMap.get(extractChildColumnName(column)).toString();
+    }
+  }
+
+  private Object jsonChildElementToCarbonChildElement(Object childObject,
+      CarbonDimension column) {
+    DataType type = column.getDataType();
+    if (DataTypes.isArrayType(type)) {
+      int size = column.getNumberOfChild();
+      ArrayList array = (ArrayList) childObject;
+      // stored as array in carbonObject
+      Object[] arrayChildObjects = new Object[size];
+      for (int i = 0; i < size; i++) {
+        CarbonDimension childCol = column.getListOfChildDimensions().get(i);
+        arrayChildObjects[i] = jsonChildElementToCarbonChildElement(array.get(i), childCol);
+      }
+      return new ArrayObject(arrayChildObjects);
+    } else if (DataTypes.isStructType(type)) {
+      Map<String, Object> childFieldsMap = (Map<String, Object>) childObject;
+      int size = column.getNumberOfChild();
+      Object[] structChildObjects = new Object[size];
+      for (int i = 0; i < size; i++) {
+        CarbonDimension childCol = column.getListOfChildDimensions().get(i);
+        Object child = jsonChildElementToCarbonChildElement(
+            childFieldsMap.get(extractChildColumnName(childCol)), childCol);
+        if (child != null) {
+          structChildObjects[i] = child;
+        }
+      }
+      return new StructObject(structChildObjects);
+    } else {
+      // primitive type
+      return childObject.toString();
+    }
+  }
+  private static String extractChildColumnName(CarbonColumn column) {
+    String columnName = column.getColName();
+    if (columnName.contains(".")) {
+      // complex type child column names can be like following
+      // a) struct type --> parent.child
+      // b) array type --> parent.val.val...child [If create table flow]
+      // c) array type --> parent.val0.val1...child [If SDK flow]
+      // But json data's key is only child column name. So, extracting below
+      String[] splits = columnName.split("\\.");
+      columnName = splits[splits.length - 1];
+    }
+    return columnName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index d0e78fc..9521db4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -17,7 +17,6 @@
 package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -75,7 +74,8 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   @Override public Iterator<CarbonRowBatch>[] execute() {
     int batchSize = CarbonProperties.getInstance().getBatchSize();
-    List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
+    List<CarbonIterator<Object[]>>[] readerIterators =
+        CarbonDataProcessorUtil.partitionInputReaderIterators(inputIterators);
     Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
     for (int i = 0; i < outIterators.length; i++) {
       outIterators[i] =
@@ -85,32 +85,6 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
     return outIterators;
   }
 
-  /**
-   * Partition input iterators equally as per the number of threads.
-   * @return
-   */
-  private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
-    // Get the number of cores configured in property.
-    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
-    // Get the minimum of number of cores and iterators size to get the number of parallel threads
-    // to be launched.
-    int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
-
-    if (parallelThreadNumber <= 0) {
-      parallelThreadNumber = 1;
-    }
-
-    List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
-    for (int i = 0; i < parallelThreadNumber; i++) {
-      iterators[i] = new ArrayList<>();
-    }
-    // Equally partition the iterators as per number of threads
-    for (int i = 0; i < inputIterators.length; i++) {
-      iterators[i % parallelThreadNumber].add(inputIterators[i]);
-    }
-    return iterators;
-  }
-
   @Override protected CarbonRow processRow(CarbonRow row) {
     return null;
   }
@@ -133,7 +107,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
    * This iterator wraps the list of iterators and it starts iterating the each
    * iterator of the list one by one. It also parse the data while iterating it.
    */
-  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
+  public static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
 
     private List<CarbonIterator<Object[]>> inputIterators;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 5f7a94c..7c4f161 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -50,6 +50,7 @@ import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingExcep
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
+
 /**
  * It reads data from record reader and sends data to next step.
  */
@@ -113,8 +114,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
         // create a ComplexDataType
         dataFieldsWithComplexDataType.put(srcDataField[i].getColumn().getOrdinal(),
             fieldConverterFactory
-                .createComplexDataType(srcDataField[i], configuration.getTableIdentifier(),
-                    null, false, null, i, nullFormat, isEmptyBadRecord));
+                .createComplexDataType(srcDataField[i], configuration.getTableIdentifier(), null,
+                    false, null, i, nullFormat, isEmptyBadRecord));
       }
     }
   }
@@ -139,8 +140,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
     for (int i = 0; i < outIterators.length; i++) {
       outIterators[i] =
           new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(),
-              rowCounter, orderOfData, noDictionaryMapping, dataTypes,
-              configuration, dataFieldsWithComplexDataType);
+              rowCounter, orderOfData, noDictionaryMapping, dataTypes, configuration,
+              dataFieldsWithComplexDataType);
     }
     return outIterators;
   }
@@ -274,13 +275,17 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
       // Create batch and fill it.
       CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
       int count = 0;
+
       while (internalHasNext() && count < batchSize) {
         carbonRowBatch.addRow(
             new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next(), dataFields)));
         count++;
       }
       rowCounter.getAndAdd(carbonRowBatch.getSize());
+
+
       return carbonRowBatch;
+
     }
 
     private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) {
@@ -328,9 +333,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
               newData[i] = dateDictionaryGenerator.generateKey((long) data[orderOfData[i]]);
             } else if (dataType == DataTypes.TIMESTAMP && data[orderOfData[i]] instanceof Long) {
               if (timestampDictionaryGenerator == null) {
-                timestampDictionaryGenerator =
-                    DirectDictionaryKeyGeneratorFactory
-                        .getDirectDictionaryGenerator(dataType, dataFields[i].getTimestampFormat());
+                timestampDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+                    .getDirectDictionaryGenerator(dataType, dataFields[i].getTimestampFormat());
               }
               newData[i] = timestampDictionaryGenerator.generateKey((long) data[orderOfData[i]]);
             } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
new file mode 100644
index 0000000..892be93
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.parser.RowParser;
+import org.apache.carbondata.processing.loading.parser.impl.JsonRowParser;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class JsonInputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private RowParser rowParser;
+
+  private CarbonIterator<Object[]>[] inputIterators;
+
+  boolean isRawDataRequired = false;
+
+  public JsonInputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+      CarbonIterator<Object[]>[] inputIterators) {
+    super(configuration, null);
+    this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+    return configuration.getDataFields();
+  }
+
+  @Override public void initialize() throws IOException {
+    super.initialize();
+    rowParser = new JsonRowParser(getOutput());
+    // if logger is enabled then raw data will be required.
+    this.isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(configuration);
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] execute() {
+    int batchSize = CarbonProperties.getInstance().getBatchSize();
+    List<CarbonIterator<Object[]>>[] readerIterators =
+        CarbonDataProcessorUtil.partitionInputReaderIterators(inputIterators);
+    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
+    for (int i = 0; i < outIterators.length; i++) {
+      outIterators[i] =
+          new InputProcessorStepImpl.InputProcessorIterator(readerIterators[i], rowParser,
+              batchSize, false, null, rowCounter, isRawDataRequired);
+    }
+    return outIterators;
+  }
+
+  @Override protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+  @Override public void close() {
+    if (!closed) {
+      super.close();
+      for (CarbonIterator inputIterator : inputIterators) {
+        inputIterator.close();
+      }
+    }
+  }
+
+  @Override protected String getStepName() {
+    return "Json Input Processor";
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 12c95a9..c1d5d90 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -675,4 +676,32 @@ public final class CarbonDataProcessorUtil {
     return isRawDataRequired;
   }
 
+  /**
+   * Partition input iterators equally as per the number of threads.
+   *
+   * @return
+   */
+  public static List<CarbonIterator<Object[]>>[] partitionInputReaderIterators(
+      CarbonIterator<Object[]>[] inputIterators) {
+    // Get the number of cores configured in property.
+    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    // Get the minimum of number of cores and iterators size to get the number of parallel threads
+    // to be launched.
+    int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
+
+    if (parallelThreadNumber <= 0) {
+      parallelThreadNumber = 1;
+    }
+
+    List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
+    for (int i = 0; i < parallelThreadNumber; i++) {
+      iterators[i] = new ArrayList<>();
+    }
+    // Equally partition the iterators as per number of threads
+    for (int i = 0; i < inputIterators.length; i++) {
+      iterators[i % parallelThreadNumber].add(inputIterators[i]);
+    }
+    return iterators;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index ebee41a..cf90515 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -61,6 +61,7 @@ public class CarbonReaderBuilder {
     this.tableName = tableName;
   }
 
+
   /**
    * Configure the projection column names of carbon reader
    *
@@ -226,4 +227,5 @@ public class CarbonReaderBuilder {
       throw ex;
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 0f670fe..0ea5808 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -327,7 +327,6 @@ public class CarbonWriterBuilder {
     Objects.requireNonNull(schema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
     CarbonLoadModel loadModel = createLoadModel();
-
     // AVRO records are pushed to Carbon as Object not as Strings. This was done in order to
     // handle multi level complex type support. As there are no conversion converter step is
     // removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder
@@ -336,6 +335,23 @@ public class CarbonWriterBuilder {
     return new AvroCarbonWriter(loadModel);
   }
 
+  /**
+   * Build a {@link CarbonWriter}, which accepts Json object
+   * @param carbonSchema carbon Schema object
+   * @return JsonCarbonWriter
+   * @throws IOException
+   * @throws InvalidLoadOptionException
+   */
+  public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema)
+      throws IOException, InvalidLoadOptionException {
+    Objects.requireNonNull(carbonSchema, "schema should not be null");
+    Objects.requireNonNull(path, "path should not be null");
+    this.schema = carbonSchema;
+    CarbonLoadModel loadModel = createLoadModel();
+    loadModel.setJsonFileLoad(true);
+    return new JsonCarbonWriter(loadModel);
+  }
+
   private void setCsvHeader(CarbonLoadModel model) {
     Field[] fields = schema.getFields();
     StringBuilder builder = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5804d757/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
new file mode 100644
index 0000000..73742b0
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * Writer Implementation to write Json Record to carbondata file.
+ * json writer requires the path of json file and carbon schema.
+ */
+@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter {
+  private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
+  private TaskAttemptContext context;
+  private ObjectArrayWritable writable;
+
+  JsonCarbonWriter(CarbonLoadModel loadModel) throws IOException {
+    Configuration OutputHadoopConf = new Configuration();
+    CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel);
+    CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat();
+    JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
+    Random random = new Random();
+    TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
+    TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
+    TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID);
+    this.recordWriter = outputFormat.getRecordWriter(context);
+    this.context = context;
+    this.writable = new ObjectArrayWritable();
+  }
+
+  /**
+   * Write single row data, accepts one row of data as json string
+   *
+   * @param object (json row as a string)
+   * @throws IOException
+   */
+  @Override public void write(Object object) throws IOException {
+    Objects.requireNonNull(object, "Input cannot be null");
+    try {
+      String[] jsonString = new String[1];
+      jsonString[0] = (String) object;
+      writable.set(jsonString);
+      recordWriter.write(NullWritable.get(), writable);
+    } catch (Exception e) {
+      close();
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Flush and close the writer
+   */
+  @Override public void close() throws IOException {
+    try {
+      recordWriter.close(context);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+}