You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/16 09:33:58 UTC

[1/6] carbondata git commit: [CARBONDATA-2255] Rename the streaming examples

Repository: carbondata
Updated Branches:
  refs/heads/carbonfile 6cb6f8380 -> 7a124ecd8


[CARBONDATA-2255] Rename the streaming examples

optimize streaming examples

This closes #2064


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d4f9003a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d4f9003a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d4f9003a

Branch: refs/heads/carbonfile
Commit: d4f9003af3593d30799b573ce729de1918b8c800
Parents: 6cb6f83
Author: QiangCai <qi...@qq.com>
Authored: Wed Mar 14 17:42:39 2018 +0800
Committer: chenliang613 <ch...@huawei.com>
Committed: Thu Mar 15 15:33:00 2018 +0800

----------------------------------------------------------------------
 .../CarbonBatchSparkStreamingExample.scala      | 207 ------------------
 .../CarbonStreamSparkStreamingExample.scala     | 213 ------------------
 .../CarbonStructuredStreamingExample.scala      | 200 -----------------
 ...CarbonStructuredStreamingWithRowParser.scala | 216 -------------------
 .../examples/SparkStreamingExample.scala        | 213 ++++++++++++++++++
 .../StreamingUsingBatchLoadExample.scala        | 207 ++++++++++++++++++
 .../StreamingWithRowParserExample.scala         | 216 +++++++++++++++++++
 .../examples/StructuredStreamingExample.scala   | 200 +++++++++++++++++
 8 files changed, 836 insertions(+), 836 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
deleted file mode 100644
index bcbf190..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession}
-import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
-
-/**
- * This example introduces how to use CarbonData batch load to integrate
- * with Spark Streaming(it's DStream, not Spark Structured Streaming)
- */
-// scalastyle:off println
-
-case class DStreamData(id: Int, name: String, city: String, salary: Float)
-
-object CarbonBatchSparkStreamingExample {
-
-  def main(args: Array[String]): Unit = {
-
-    // setup paths
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-    val checkpointPath =
-      s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
-      System.currentTimeMillis().toString()
-    val streamTableName = s"dstream_batch_table"
-
-    val spark = ExampleUtils.createCarbonSession("CarbonBatchSparkStreamingExample", 4)
-
-    val requireCreateTable = true
-
-    if (requireCreateTable) {
-      // drop table if exists previously
-      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-      // Create target carbon table and populate with initial data
-      // set AUTO_LOAD_MERGE to true to compact segment automatically
-      spark.sql(
-        s"""
-           | CREATE TABLE ${ streamTableName }(
-           | id INT,
-           | name STRING,
-           | city STRING,
-           | salary FLOAT
-           | )
-           | STORED BY 'carbondata'
-           | TBLPROPERTIES(
-           | 'sort_columns'='name',
-           | 'dictionary_include'='city',
-           | 'AUTO_LOAD_MERGE'='true',
-           | 'COMPACTION_LEVEL_THRESHOLD'='4,10')
-           | """.stripMargin)
-
-      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
-      // batch load
-      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
-      spark.sql(
-        s"""
-           | LOAD DATA LOCAL INPATH '$path'
-           | INTO TABLE $streamTableName
-           | OPTIONS('HEADER'='true')
-         """.stripMargin)
-
-      // streaming ingest
-      val serverSocket = new ServerSocket(7071)
-      val thread1 = writeSocket(serverSocket)
-      val thread2 = showTableCount(spark, streamTableName)
-      val ssc = startStreaming(spark, streamTableName, checkpointPath)
-      // wait for stop signal to stop Spark Streaming App
-      waitForStopSignal(ssc)
-      // it need to start Spark Streaming App in main thread
-      // otherwise it will encounter an not-serializable exception.
-      ssc.start()
-      ssc.awaitTermination()
-      thread1.interrupt()
-      thread2.interrupt()
-      serverSocket.close()
-    }
-
-    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
-
-    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
-
-    // record(id = 100000001) comes from batch segment_0
-    // record(id = 1) comes from stream segment_1
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
-
-    // not filter
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id < 10 limit 100").show(100, truncate = false)
-
-    // show segments
-    spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
-
-    // drop table
-    spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-
-    spark.stop()
-    System.out.println("streaming finished")
-  }
-
-  def showTableCount(spark: SparkSession, tableName: String): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        for (_ <- 0 to 1000) {
-          spark.sql(s"select count(*) from $tableName").show(truncate = false)
-          spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
-          Thread.sleep(1000 * 5)
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def waitForStopSignal(ssc: StreamingContext): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
-        new ServerSocket(7072).accept()
-        // don't stop SparkContext here
-        ssc.stop(false, true)
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def startStreaming(spark: SparkSession, tableName: String,
-      checkpointPath: String): StreamingContext = {
-    var ssc: StreamingContext = null
-    try {
-      // recommend: the batch interval must set larger, such as 30s, 1min.
-      ssc = new StreamingContext(spark.sparkContext, Seconds(15))
-      ssc.checkpoint(checkpointPath)
-
-      val readSocketDF = ssc.socketTextStream("localhost", 7071)
-
-      val batchData = readSocketDF
-        .map(_.split(","))
-        .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
-
-      batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
-        val df = spark.createDataFrame(rdd).toDF("id", "name", "city", "salary")
-        println("at time: " + time.toString() + " the count of received data: " + df.count())
-        df.write
-          .format("carbondata")
-          .option("tableName", tableName)
-          .mode(SaveMode.Append)
-          .save()
-      }}
-    } catch {
-      case ex: Exception =>
-        ex.printStackTrace()
-        println("Done reading and writing streaming data")
-    }
-    ssc
-  }
-
-  def writeSocket(serverSocket: ServerSocket): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // wait for client to connection request and accept
-        val clientSocket = serverSocket.accept()
-        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
-        var index = 0
-        for (_ <- 1 to 1000) {
-          // write 5 records per iteration
-          for (_ <- 0 to 1000) {
-            index = index + 1
-            socketWriter.println(index.toString + ",name_" + index
-                                 + ",city_" + index + "," + (index * 10000.00).toString +
-                                 ",school_" + index + ":school_" + index + index + "$" + index)
-          }
-          socketWriter.flush()
-          Thread.sleep(1000)
-        }
-        socketWriter.close()
-        System.out.println("Socket closed")
-      }
-    }
-    thread.start()
-    thread
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
deleted file mode 100644
index 856084b..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.CarbonSparkStreamingFactory
-import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.streaming.CarbonSparkStreamingListener
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
-
-/**
- * This example introduces how to use Spark Streaming to write data
- * to CarbonData stream table.
- *
- * NOTE: Current integration with Spark Streaming is an alpha feature.
- */
-// scalastyle:off println
-object CarbonStreamSparkStreamingExample {
-
-  def main(args: Array[String]): Unit = {
-
-    // setup paths
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-    val checkpointPath =
-      s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
-      System.currentTimeMillis().toString()
-    val streamTableName = s"dstream_stream_table"
-
-    val spark = ExampleUtils.createCarbonSession("CarbonStreamSparkStreamingExample", 4)
-
-    val requireCreateTable = true
-
-    if (requireCreateTable) {
-      // drop table if exists previously
-      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-      // Create target carbon table and populate with initial data
-      spark.sql(
-        s"""
-           | CREATE TABLE ${ streamTableName }(
-           | id INT,
-           | name STRING,
-           | city STRING,
-           | salary FLOAT
-           | )
-           | STORED BY 'carbondata'
-           | TBLPROPERTIES(
-           | 'streaming'='true',
-           | 'sort_columns'='name',
-           | 'dictionary_include'='city')
-           | """.stripMargin)
-      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
-      // batch load
-      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
-      spark.sql(
-        s"""
-           | LOAD DATA LOCAL INPATH '$path'
-           | INTO TABLE $streamTableName
-           | OPTIONS('HEADER'='true')
-         """.stripMargin)
-
-      // streaming ingest
-      val serverSocket = new ServerSocket(7071)
-      val thread1 = writeSocket(serverSocket)
-      val thread2 = showTableCount(spark, streamTableName)
-      val ssc = startStreaming(spark, streamTableName, checkpointPath)
-      // add a Spark Streaming Listener to remove all lock for stream tables when stop app
-      ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
-      // wait for stop signal to stop Spark Streaming App
-      waitForStopSignal(ssc)
-      // it need to start Spark Streaming App in main thread
-      // otherwise it will encounter an not-serializable exception.
-      ssc.start()
-      ssc.awaitTermination()
-      thread1.interrupt()
-      thread2.interrupt()
-      serverSocket.close()
-    }
-
-    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
-
-    spark.sql(s"select * from ${ streamTableName } order by id desc").show(100, truncate = false)
-
-    // record(id = 100000001) comes from batch segment_0
-    // record(id = 1) comes from stream segment_1
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
-
-    // not filter
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id < 10 limit 100").show(100, truncate = false)
-
-    // show segments
-    spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
-
-    spark.stop()
-    System.out.println("streaming finished")
-  }
-
-  def showTableCount(spark: SparkSession, tableName: String): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        for (_ <- 0 to 1000) {
-          println(System.currentTimeMillis())
-          spark.sql(s"select count(*) from $tableName").show(truncate = false)
-          spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
-          Thread.sleep(1000 * 5)
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def waitForStopSignal(ssc: StreamingContext): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
-        new ServerSocket(7072).accept()
-        // don't stop SparkContext here
-        ssc.stop(false, true)
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def startStreaming(spark: SparkSession, tableName: String,
-      checkpointPath: String): StreamingContext = {
-    var ssc: StreamingContext = null
-    try {
-      // recommend: the batch interval must set larger, such as 30s, 1min.
-      ssc = new StreamingContext(spark.sparkContext, Seconds(30))
-      ssc.checkpoint(checkpointPath)
-
-      val readSocketDF = ssc.socketTextStream("localhost", 7071)
-
-      val batchData = readSocketDF
-        .map(_.split(","))
-        .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
-
-      println("init carbon table info")
-      batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
-        val df = spark.createDataFrame(rdd).toDF()
-        println(System.currentTimeMillis().toString() +
-          " at batch time: " + time.toString() +
-          " the count of received data: " + df.count())
-        CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, "default", tableName)
-          .option(CarbonStreamParser.CARBON_STREAM_PARSER,
-            CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
-          .mode(SaveMode.Append)
-          .writeStreamData(df, time)
-      }}
-    } catch {
-      case ex: Exception =>
-        ex.printStackTrace()
-        println("Done reading and writing streaming data")
-    }
-    ssc
-  }
-
-  def writeSocket(serverSocket: ServerSocket): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // wait for client to connection request and accept
-        val clientSocket = serverSocket.accept()
-        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
-        var index = 0
-        for (_ <- 1 to 1000) {
-          // write 5 records per iteration
-          for (_ <- 0 to 100) {
-            index = index + 1
-            socketWriter.println(index.toString + ",name_" + index
-                                 + ",city_" + index + "," + (index * 10000.00).toString +
-                                 ",school_" + index + ":school_" + index + index + "$" + index)
-          }
-          socketWriter.flush()
-          Thread.sleep(2000)
-        }
-        socketWriter.close()
-        System.out.println("Socket closed")
-      }
-    }
-    thread.start()
-    thread
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
deleted file mode 100644
index bc65b2f..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
-
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
-// scalastyle:off println
-object CarbonStructuredStreamingExample {
-  def main(args: Array[String]) {
-
-    // setup paths
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-
-    val spark = ExampleUtils.createCarbonSession("CarbonStructuredStreamingExample", 4)
-    val streamTableName = s"stream_table"
-
-    val requireCreateTable = true
-    val useComplexDataType = false
-
-    if (requireCreateTable) {
-      // drop table if exists previously
-      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-      // Create target carbon table and populate with initial data
-      if (useComplexDataType) {
-        spark.sql(
-          s"""
-             | CREATE TABLE ${ streamTableName }(
-             | id INT,
-             | name STRING,
-             | city STRING,
-             | salary FLOAT,
-             | file struct<school:array<string>, age:int>
-             | )
-             | STORED BY 'carbondata'
-             | TBLPROPERTIES(
-             | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
-             | """.stripMargin)
-      } else {
-        spark.sql(
-          s"""
-             | CREATE TABLE ${ streamTableName }(
-             | id INT,
-             | name STRING,
-             | city STRING,
-             | salary FLOAT
-             | )
-             | STORED BY 'carbondata'
-             | TBLPROPERTIES(
-             | 'streaming'='true', 'sort_columns'='name')
-             | """.stripMargin)
-      }
-
-      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
-      // batch load
-      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
-      spark.sql(
-        s"""
-           | LOAD DATA LOCAL INPATH '$path'
-           | INTO TABLE $streamTableName
-           | OPTIONS('HEADER'='true')
-         """.stripMargin)
-
-      // streaming ingest
-      val serverSocket = new ServerSocket(7071)
-      val thread1 = startStreaming(spark, carbonTable)
-      val thread2 = writeSocket(serverSocket)
-      val thread3 = showTableCount(spark, streamTableName)
-
-      System.out.println("type enter to interrupt streaming")
-      System.in.read()
-      thread1.interrupt()
-      thread2.interrupt()
-      thread3.interrupt()
-      serverSocket.close()
-    }
-
-    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
-
-    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
-
-    // record(id = 100000001) comes from batch segment_0
-    // record(id = 1) comes from stream segment_1
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
-
-    // not filter
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id < 10 limit 100").show(100, truncate = false)
-
-    if (useComplexDataType) {
-      // complex
-      spark.sql(s"select file.age, file.school " +
-                s"from ${ streamTableName } " +
-                s"where where file.age = 30 ").show(100, truncate = false)
-    }
-
-    spark.stop()
-    System.out.println("streaming finished")
-  }
-
-  def showTableCount(spark: SparkSession, tableName: String): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        for (_ <- 0 to 1000) {
-          spark.sql(s"select count(*) from $tableName").show(truncate = false)
-          Thread.sleep(1000 * 3)
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def startStreaming(spark: SparkSession, carbonTable: CarbonTable): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        var qry: StreamingQuery = null
-        try {
-          val readSocketDF = spark.readStream
-            .format("socket")
-            .option("host", "localhost")
-            .option("port", 7071)
-            .load()
-
-          // Write data from socket stream to carbondata file
-          qry = readSocketDF.writeStream
-            .format("carbondata")
-            .trigger(ProcessingTime("5 seconds"))
-            .option("checkpointLocation",
-              CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
-            .option("dbName", "default")
-            .option("tableName", "stream_table")
-            .start()
-
-          qry.awaitTermination()
-        } catch {
-          case ex: Exception =>
-            ex.printStackTrace()
-            println("Done reading and writing streaming data")
-        } finally {
-          qry.stop()
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def writeSocket(serverSocket: ServerSocket): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // wait for client to connection request and accept
-        val clientSocket = serverSocket.accept()
-        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
-        var index = 0
-        for (_ <- 1 to 1000) {
-          // write 5 records per iteration
-          for (_ <- 0 to 1000) {
-            index = index + 1
-            socketWriter.println(index.toString + ",name_" + index
-                                 + ",city_" + index + "," + (index * 10000.00).toString +
-                                 ",school_" + index + ":school_" + index + index + "$" + index)
-          }
-          socketWriter.flush()
-          Thread.sleep(1000)
-        }
-        socketWriter.close()
-        System.out.println("Socket closed")
-      }
-    }
-    thread.start()
-    thread
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
deleted file mode 100644
index 9ca0e07..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
-
-case class FileElement(school: Array[String], age: Int)
-case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
-
-// scalastyle:off println
-object CarbonStructuredStreamingWithRowParser {
-  def main(args: Array[String]) {
-
-    // setup paths
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-
-    val spark = ExampleUtils.createCarbonSession("CarbonStructuredStreamingWithRowParser", 4)
-    val streamTableName = s"stream_table_with_row_parser"
-
-    val requireCreateTable = true
-    val useComplexDataType = false
-
-    if (requireCreateTable) {
-      // drop table if exists previously
-      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-      // Create target carbon table and populate with initial data
-      if (useComplexDataType) {
-        spark.sql(
-          s"""
-             | CREATE TABLE ${ streamTableName }(
-             | id INT,
-             | name STRING,
-             | city STRING,
-             | salary FLOAT,
-             | file struct<school:array<string>, age:int>
-             | )
-             | STORED BY 'carbondata'
-             | TBLPROPERTIES(
-             | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
-             | """.stripMargin)
-      } else {
-        spark.sql(
-          s"""
-             | CREATE TABLE ${ streamTableName }(
-             | id INT,
-             | name STRING,
-             | city STRING,
-             | salary FLOAT
-             | )
-             | STORED BY 'carbondata'
-             | TBLPROPERTIES(
-             | 'streaming'='true', 'sort_columns'='name')
-             | """.stripMargin)
-      }
-
-      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
-      // batch load
-      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
-      spark.sql(
-        s"""
-           | LOAD DATA LOCAL INPATH '$path'
-           | INTO TABLE $streamTableName
-           | OPTIONS('HEADER'='true')
-         """.stripMargin)
-
-      // streaming ingest
-      val serverSocket = new ServerSocket(7071)
-      val thread1 = startStreaming(spark, carbonTable.getTablePath)
-      val thread2 = writeSocket(serverSocket)
-      val thread3 = showTableCount(spark, streamTableName)
-
-      System.out.println("type enter to interrupt streaming")
-      System.in.read()
-      thread1.interrupt()
-      thread2.interrupt()
-      thread3.interrupt()
-      serverSocket.close()
-    }
-
-    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
-
-    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
-
-    // record(id = 100000001) comes from batch segment_0
-    // record(id = 1) comes from stream segment_1
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
-
-    // not filter
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id < 10 limit 100").show(100, truncate = false)
-
-    if (useComplexDataType) {
-      // complex
-      spark.sql(s"select file.age, file.school " +
-                s"from ${ streamTableName } " +
-                s"where where file.age = 30 ").show(100, truncate = false)
-    }
-
-    spark.stop()
-    System.out.println("streaming finished")
-  }
-
-  def showTableCount(spark: SparkSession, tableName: String): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        for (_ <- 0 to 1000) {
-          spark.sql(s"select count(*) from $tableName").show(truncate = false)
-          Thread.sleep(1000 * 3)
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def startStreaming(spark: SparkSession, tablePath: String): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        var qry: StreamingQuery = null
-        try {
-          import spark.implicits._
-          val readSocketDF = spark.readStream
-            .format("socket")
-            .option("host", "localhost")
-            .option("port", 7071)
-            .load()
-            .as[String]
-            .map(_.split(","))
-            .map { fields => {
-              val tmp = fields(4).split("\\$")
-              val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
-              if (fields(0).toInt % 2 == 0) {
-                StreamData(fields(0).toInt, null, fields(2), fields(3).toFloat, file)
-              } else {
-                StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file)
-              }
-            } }
-
-          // Write data from socket stream to carbondata file
-          qry = readSocketDF.writeStream
-            .format("carbondata")
-            .trigger(ProcessingTime("5 seconds"))
-            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
-            .option("dbName", "default")
-            .option("tableName", "stream_table_with_row_parser")
-            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
-              CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
-            .start()
-
-          qry.awaitTermination()
-        } catch {
-          case ex: Exception =>
-            ex.printStackTrace()
-            println("Done reading and writing streaming data")
-        } finally {
-          qry.stop()
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def writeSocket(serverSocket: ServerSocket): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // wait for client to connection request and accept
-        val clientSocket = serverSocket.accept()
-        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
-        var index = 0
-        for (_ <- 1 to 1000) {
-          // write 5 records per iteration
-          for (_ <- 0 to 1000) {
-            index = index + 1
-            socketWriter.println(index.toString + ",name_" + index
-                                 + ",city_" + index + "," + (index * 10000.00).toString +
-                                 ",school_" + index + ":school_" + index + index + "$" + index)
-          }
-          socketWriter.flush()
-          Thread.sleep(1000)
-        }
-        socketWriter.close()
-        System.out.println("Socket closed")
-      }
-    }
-    thread.start()
-    thread
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
new file mode 100644
index 0000000..d819a3f
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.CarbonSparkStreamingFactory
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.CarbonSparkStreamingListener
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+/**
+ * This example introduces how to use Spark Streaming to write data
+ * to CarbonData stream table.
+ *
+ * NOTE: Current integration with Spark Streaming is an alpha feature.
+ */
+// scalastyle:off println
+object SparkStreamingExample {
+
+  def main(args: Array[String]): Unit = {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val checkpointPath =
+      s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
+      System.currentTimeMillis().toString()
+    val streamTableName = s"dstream_stream_table"
+
+    val spark = ExampleUtils.createCarbonSession("SparkStreamingExample", 4)
+
+    val requireCreateTable = true
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      spark.sql(
+        s"""
+           | CREATE TABLE ${ streamTableName }(
+           | id INT,
+           | name STRING,
+           | city STRING,
+           | salary FLOAT
+           | )
+           | STORED BY 'carbondata'
+           | TBLPROPERTIES(
+           | 'streaming'='true',
+           | 'sort_columns'='name',
+           | 'dictionary_include'='city')
+           | """.stripMargin)
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+      // batch load
+      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = writeSocket(serverSocket)
+      val thread2 = showTableCount(spark, streamTableName)
+      val ssc = startStreaming(spark, streamTableName, checkpointPath)
+      // add a Spark Streaming Listener to remove all lock for stream tables when stop app
+      ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
+      // wait for stop signal to stop Spark Streaming App
+      waitForStopSignal(ssc)
+      // it need to start Spark Streaming App in main thread
+      // otherwise it will encounter an not-serializable exception.
+      ssc.start()
+      ssc.awaitTermination()
+      thread1.interrupt()
+      thread2.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+    spark.sql(s"select * from ${ streamTableName } order by id desc").show(100, truncate = false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    // show segments
+    spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          println(System.currentTimeMillis())
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
+          Thread.sleep(1000 * 5)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def waitForStopSignal(ssc: StreamingContext): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
+        new ServerSocket(7072).accept()
+        // don't stop SparkContext here
+        ssc.stop(false, true)
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, tableName: String,
+      checkpointPath: String): StreamingContext = {
+    var ssc: StreamingContext = null
+    try {
+      // recommend: the batch interval must set larger, such as 30s, 1min.
+      ssc = new StreamingContext(spark.sparkContext, Seconds(30))
+      ssc.checkpoint(checkpointPath)
+
+      val readSocketDF = ssc.socketTextStream("localhost", 7071)
+
+      val batchData = readSocketDF
+        .map(_.split(","))
+        .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
+
+      println("init carbon table info")
+      batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
+        val df = spark.createDataFrame(rdd).toDF()
+        println(System.currentTimeMillis().toString() +
+          " at batch time: " + time.toString() +
+          " the count of received data: " + df.count())
+        CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, "default", tableName)
+          .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+            CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
+          .mode(SaveMode.Append)
+          .writeStreamData(df, time)
+      }}
+    } catch {
+      case ex: Exception =>
+        ex.printStackTrace()
+        println("Done reading and writing streaming data")
+    }
+    ssc
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 100) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 10000.00).toString +
+                                 ",school_" + index + ":school_" + index + index + "$" + index)
+          }
+          socketWriter.flush()
+          Thread.sleep(2000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala
new file mode 100644
index 0000000..89883f8
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession}
+import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
+
+/**
+ * This example introduces how to use CarbonData batch load to integrate
+ * with Spark Streaming(it's DStream, not Spark Structured Streaming)
+ */
+// scalastyle:off println
+
+case class DStreamData(id: Int, name: String, city: String, salary: Float)
+
+object StreamingUsingBatchLoadExample {
+
+  def main(args: Array[String]): Unit = {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val checkpointPath =
+      s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
+      System.currentTimeMillis().toString()
+    val streamTableName = s"dstream_batch_table"
+
+    val spark = ExampleUtils.createCarbonSession("StreamingUsingBatchLoadExample", 4)
+
+    val requireCreateTable = true
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      // set AUTO_LOAD_MERGE to true to compact segment automatically
+      spark.sql(
+        s"""
+           | CREATE TABLE ${ streamTableName }(
+           | id INT,
+           | name STRING,
+           | city STRING,
+           | salary FLOAT
+           | )
+           | STORED BY 'carbondata'
+           | TBLPROPERTIES(
+           | 'sort_columns'='name',
+           | 'dictionary_include'='city',
+           | 'AUTO_LOAD_MERGE'='true',
+           | 'COMPACTION_LEVEL_THRESHOLD'='4,10')
+           | """.stripMargin)
+
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+      // batch load
+      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = writeSocket(serverSocket)
+      val thread2 = showTableCount(spark, streamTableName)
+      val ssc = startStreaming(spark, streamTableName, checkpointPath)
+      // wait for stop signal to stop Spark Streaming App
+      waitForStopSignal(ssc)
+      // it need to start Spark Streaming App in main thread
+      // otherwise it will encounter an not-serializable exception.
+      ssc.start()
+      ssc.awaitTermination()
+      thread1.interrupt()
+      thread2.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    // show segments
+    spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
+
+    // drop table
+    spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
+          Thread.sleep(1000 * 5)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def waitForStopSignal(ssc: StreamingContext): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
+        new ServerSocket(7072).accept()
+        // don't stop SparkContext here
+        ssc.stop(false, true)
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, tableName: String,
+      checkpointPath: String): StreamingContext = {
+    var ssc: StreamingContext = null
+    try {
+      // recommend: the batch interval must set larger, such as 30s, 1min.
+      ssc = new StreamingContext(spark.sparkContext, Seconds(15))
+      ssc.checkpoint(checkpointPath)
+
+      val readSocketDF = ssc.socketTextStream("localhost", 7071)
+
+      val batchData = readSocketDF
+        .map(_.split(","))
+        .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
+
+      batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
+        val df = spark.createDataFrame(rdd).toDF("id", "name", "city", "salary")
+        println("at time: " + time.toString() + " the count of received data: " + df.count())
+        df.write
+          .format("carbondata")
+          .option("tableName", tableName)
+          .mode(SaveMode.Append)
+          .save()
+      }}
+    } catch {
+      case ex: Exception =>
+        ex.printStackTrace()
+        println("Done reading and writing streaming data")
+    }
+    ssc
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 1000) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 10000.00).toString +
+                                 ",school_" + index + ":school_" + index + index + "$" + index)
+          }
+          socketWriter.flush()
+          Thread.sleep(1000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
new file mode 100644
index 0000000..a07d504
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+case class FileElement(school: Array[String], age: Int)
+case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
+
+// scalastyle:off println
+object StreamingWithRowParserExample {
+  def main(args: Array[String]) {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+
+    val spark = ExampleUtils.createCarbonSession("StreamingWithRowParserExample", 4)
+    val streamTableName = s"stream_table_with_row_parser"
+
+    val requireCreateTable = true
+    val useComplexDataType = false
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      if (useComplexDataType) {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT,
+             | file struct<school:array<string>, age:int>
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
+             | """.stripMargin)
+      } else {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name')
+             | """.stripMargin)
+      }
+
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+      // batch load
+      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = startStreaming(spark, carbonTable.getTablePath)
+      val thread2 = writeSocket(serverSocket)
+      val thread3 = showTableCount(spark, streamTableName)
+
+      System.out.println("type enter to interrupt streaming")
+      System.in.read()
+      thread1.interrupt()
+      thread2.interrupt()
+      thread3.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    if (useComplexDataType) {
+      // complex
+      spark.sql(s"select file.age, file.school " +
+                s"from ${ streamTableName } " +
+                s"where where file.age = 30 ").show(100, truncate = false)
+    }
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          Thread.sleep(1000 * 3)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, tablePath: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          import spark.implicits._
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", "localhost")
+            .option("port", 7071)
+            .load()
+            .as[String]
+            .map(_.split(","))
+            .map { fields => {
+              val tmp = fields(4).split("\\$")
+              val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
+              if (fields(0).toInt % 2 == 0) {
+                StreamData(fields(0).toInt, null, fields(2), fields(3).toFloat, file)
+              } else {
+                StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file)
+              }
+            } }
+
+          // Write data from socket stream to carbondata file
+          qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime("5 seconds"))
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
+            .option("dbName", "default")
+            .option("tableName", "stream_table_with_row_parser")
+            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+              CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
+            .start()
+
+          qry.awaitTermination()
+        } catch {
+          case ex: Exception =>
+            ex.printStackTrace()
+            println("Done reading and writing streaming data")
+        } finally {
+          qry.stop()
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 1000) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 10000.00).toString +
+                                 ",school_" + index + ":school_" + index + index + "$" + index)
+          }
+          socketWriter.flush()
+          Thread.sleep(1000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
new file mode 100644
index 0000000..1d4bedf
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+// scalastyle:off println
+object StructuredStreamingExample {
+  def main(args: Array[String]) {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+
+    val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4)
+    val streamTableName = s"stream_table"
+
+    val requireCreateTable = true
+    val useComplexDataType = false
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      if (useComplexDataType) {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT,
+             | file struct<school:array<string>, age:int>
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city')
+             | """.stripMargin)
+      } else {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name')
+             | """.stripMargin)
+      }
+
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+      // batch load
+      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = startStreaming(spark, carbonTable)
+      val thread2 = writeSocket(serverSocket)
+      val thread3 = showTableCount(spark, streamTableName)
+
+      System.out.println("type enter to interrupt streaming")
+      System.in.read()
+      thread1.interrupt()
+      thread2.interrupt()
+      thread3.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    if (useComplexDataType) {
+      // complex
+      spark.sql(s"select file.age, file.school " +
+                s"from ${ streamTableName } " +
+                s"where where file.age = 30 ").show(100, truncate = false)
+    }
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          Thread.sleep(1000 * 3)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, carbonTable: CarbonTable): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", "localhost")
+            .option("port", 7071)
+            .load()
+
+          // Write data from socket stream to carbondata file
+          qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime("5 seconds"))
+            .option("checkpointLocation",
+              CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
+            .option("dbName", "default")
+            .option("tableName", "stream_table")
+            .start()
+
+          qry.awaitTermination()
+        } catch {
+          case ex: Exception =>
+            ex.printStackTrace()
+            println("Done reading and writing streaming data")
+        } finally {
+          qry.stop()
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 1000) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 10000.00).toString +
+                                 ",school_" + index + ":school_" + index + index + "$" + index)
+          }
+          socketWriter.flush()
+          Thread.sleep(1000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println


[5/6] carbondata git commit: [CARBONDATA-2224][File Level Reader Support] External File level reader support

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 0298eea..cf22569 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -150,7 +150,7 @@ case class CarbonDropTableCommand(
 
       // delete table data only if it is not external table
       if (FileFactory.isFileExist(tablePath, fileType) &&
-          !carbonTable.isExternalTable) {
+          !(carbonTable.isExternalTable || carbonTable.isFileLevelExternalTable)) {
         val file = FileFactory.getCarbonFile(tablePath, fileType)
         CarbonUtil.deleteFoldersAndFilesSilent(file)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
deleted file mode 100644
index 2eed988..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import java.io.File
-import java.util
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
-import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore
-import org.apache.carbondata.core.metadata.SegmentFileStore
-import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
-import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
-import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
-
-class CarbonFileFormat
-  extends FileFormat
-    with DataSourceRegister
-    with Logging
-with Serializable {
-
-  override def shortName(): String = "carbondata"
-
-  override def inferSchema(sparkSession: SparkSession,
-      options: Map[String, String],
-      files: Seq[FileStatus]): Option[StructType] = {
-    None
-  }
-
-  SparkSession.getActiveSession.get.sessionState.conf.setConfString(
-    "spark.sql.sources.commitProtocolClass",
-    "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
-
-  override def prepareWrite(
-      sparkSession: SparkSession,
-      job: Job,
-      options: Map[String, String],
-      dataSchema: StructType): OutputWriterFactory = {
-    val conf = job.getConfiguration
-    conf.setClass(
-      SQLConf.OUTPUT_COMMITTER_CLASS.key,
-      classOf[CarbonOutputCommitter],
-      classOf[CarbonOutputCommitter])
-    conf.set("carbon.commit.protocol", "carbon.commit.protocol")
-    job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
-    val table = CarbonEnv.getCarbonTable(
-      TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
-    val model = new CarbonLoadModel
-    val carbonProperty = CarbonProperties.getInstance()
-    val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
-    val tableProperties = table.getTableInfo.getFactTable.getTableProperties
-    optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
-    val partitionStr =
-      table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
-        _.getColumnName.toLowerCase).mkString(",")
-    optionsFinal.put(
-      "fileheader",
-      dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr)
-    val optionsLocal = new mutable.HashMap[String, String]()
-    optionsLocal ++= options
-    optionsLocal += (("header", "false"))
-    new CarbonLoadModelBuilder(table).build(
-      optionsLocal.toMap.asJava,
-      optionsFinal,
-      model,
-      conf)
-    model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
-    model.setDictionaryServerHost(options.getOrElse("dicthost", null))
-    model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
-    CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
-    model.setPartitionLoad(true)
-
-    val staticPartition = options.getOrElse("staticpartition", null)
-    if (staticPartition != null) {
-      conf.set("carbon.staticpartition", staticPartition)
-    }
-    // In case of update query there is chance to remove the older segments, so here we can set
-    // the to be deleted segments to mark as delete while updating tablestatus
-    val segemntsTobeDeleted = options.get("segmentsToBeDeleted")
-    if (segemntsTobeDeleted.isDefined) {
-      conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get)
-    }
-
-    val currPartition = options.getOrElse("currentpartition", null)
-    if (currPartition != null) {
-      conf.set("carbon.currentpartition", currPartition)
-    }
-    // Update with the current in progress load.
-    val currEntry = options.getOrElse("currentloadentry", null)
-    if (currEntry != null) {
-      val loadEntry =
-        ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails]
-      val details =
-        SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
-      model.setSegmentId(loadEntry.getLoadName)
-      model.setFactTimeStamp(loadEntry.getLoadStartTime)
-      val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
-      list.add(loadEntry)
-      model.setLoadMetadataDetails(list)
-    }
-    // Set the update timestamp if user sets in case of update query. It needs to be updated
-    // in load status update time
-    val updateTimeStamp = options.get("updatetimestamp")
-    if (updateTimeStamp.isDefined) {
-      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
-    }
-    CarbonTableOutputFormat.setLoadModel(conf, model)
-
-    new OutputWriterFactory {
-
-      /**
-       * counter used for generating task numbers. This is used to generate unique partition numbers
-       * in case of partitioning
-       */
-      val counter = new AtomicLong()
-      val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]()
-
-      override def newInstance(
-          path: String,
-          dataSchema: StructType,
-          context: TaskAttemptContext): OutputWriter = {
-        val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
-        val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
-        var storeLocation: Array[String] = Array[String]()
-        val isCarbonUseLocalDir = CarbonProperties.getInstance()
-          .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
-
-
-        val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
-        val tmpLocationSuffix =
-          File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber
-        if (isCarbonUseLocalDir) {
-          val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-          if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
-            // use single dir
-            storeLocation = storeLocation :+
-              (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix)
-            if (storeLocation == null || storeLocation.isEmpty) {
-              storeLocation = storeLocation :+
-                (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-            }
-          } else {
-            // use all the yarn dirs
-            storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
-          }
-        } else {
-          storeLocation =
-            storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-        }
-        CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
-        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model)
-      }
-
-      /**
-       * Generate taskid using the taskid of taskcontext and the path. It should be unique in case
-       * of partition tables.
-       */
-      private def generateTaskNumber(path: String,
-          context: TaskAttemptContext, segmentId: String): String = {
-        var partitionNumber: java.lang.Long = taskIdMap.get(path)
-        if (partitionNumber == null) {
-          partitionNumber = counter.incrementAndGet()
-          // Generate taskid using the combination of taskid and partition number to make it unique.
-          taskIdMap.put(path, partitionNumber)
-        }
-        val taskID = context.getTaskAttemptID.getTaskID.getId
-        CarbonScalaUtil.generateUniqueNumber(taskID, segmentId, partitionNumber)
-      }
-
-      override def getFileExtension(context: TaskAttemptContext): String = {
-        CarbonTablePath.CARBON_DATA_EXT
-      }
-
-    }
-  }
-}
-
-case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
-  extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
-  override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext,
-      absoluteDir: String,
-      ext: String): String = {
-    val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol")
-    if (carbonFlow != null) {
-      super.newTaskTempFile(taskContext, Some(absoluteDir), ext)
-    } else {
-      super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
-    }
-  }
-}
-
-/**
- * It is a just class to make compile between spark 2.1 and 2.2
- */
-private trait AbstractCarbonOutputWriter {
-  def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
-  def writeInternal(row: InternalRow): Unit = {
-    writeCarbon(row)
-  }
-  def write(row: InternalRow): Unit = {
-    writeCarbon(row)
-  }
-  def writeCarbon(row: InternalRow): Unit
-}
-
-private class CarbonOutputWriter(path: String,
-    context: TaskAttemptContext,
-    fieldTypes: Seq[DataType],
-    taskNo : String,
-    model: CarbonLoadModel)
-  extends OutputWriter with AbstractCarbonOutputWriter {
-
-  val converter = new DataTypeConverterImpl
-
-  val partitions =
-    getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName)
-  val staticPartition: util.HashMap[String, Boolean] = {
-    val staticPart = context.getConfiguration.get("carbon.staticpartition")
-    if (staticPart != null) {
-      ObjectSerializationUtil.convertStringToObject(
-        staticPart).asInstanceOf[util.HashMap[String, Boolean]]
-    } else {
-      null
-    }
-  }
-  lazy val currPartitions: util.List[indexstore.PartitionSpec] = {
-    val currParts = context.getConfiguration.get("carbon.currentpartition")
-    if (currParts != null) {
-      ObjectSerializationUtil.convertStringToObject(
-        currParts).asInstanceOf[util.List[indexstore.PartitionSpec]]
-    } else {
-      new util.ArrayList[indexstore.PartitionSpec]()
-    }
-  }
-  var (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
-    val updatedPartitions = partitions.map(splitPartition)
-    (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
-  } else {
-    (Map.empty[String, String].toArray, Array.empty)
-  }
-
-  private def splitPartition(p: String) = {
-    val value = p.substring(p.indexOf("=") + 1, p.length)
-    val col = p.substring(0, p.indexOf("="))
-    // NUll handling case. For null hive creates with this special name
-    if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
-      (col, null)
-      // we should replace back the special string with empty value.
-    } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-      (col, "")
-    } else {
-      (col, value)
-    }
-  }
-
-  lazy val writePath = {
-    val updatedPath = getPartitionPath(path, context, model)
-    // in case of partition location specified by user then search the partitions from the current
-    // partitions to get the corresponding partitions.
-    if (partitions.isEmpty) {
-      val writeSpec = new indexstore.PartitionSpec(null, updatedPath)
-      val index = currPartitions.indexOf(writeSpec)
-      if (index > -1) {
-        val spec = currPartitions.get(index)
-        updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray
-        partitionData = updatePartitions(updatedPartitions.map(_._2))
-      }
-    }
-    updatedPath
-  }
-
-  val writable = new ObjectArrayWritable
-
-  private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = {
-    model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
-      .getColumnSchemaList.asScala.zipWithIndex.map { case (col, index) =>
-
-      val dataType = if (col.hasEncoding(Encoding.DICTIONARY)) {
-        DataTypes.INT
-      } else if (col.getDataType.equals(DataTypes.TIMESTAMP) ||
-                         col.getDataType.equals(DataTypes.DATE)) {
-        DataTypes.LONG
-      } else {
-        col.getDataType
-      }
-      if (staticPartition != null && staticPartition.get(col.getColumnName.toLowerCase)) {
-        val converetedVal =
-          CarbonScalaUtil.convertStaticPartitions(
-            partitionData(index),
-            col,
-            model.getCarbonDataLoadSchema.getCarbonTable)
-        if (col.hasEncoding(Encoding.DICTIONARY)) {
-          converetedVal.toInt.asInstanceOf[AnyRef]
-        } else {
-          DataTypeUtil.getDataBasedOnDataType(
-            converetedVal,
-            dataType,
-            converter)
-        }
-      } else {
-        DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType, converter)
-      }
-    }.toArray
-  }
-
-  private val recordWriter: CarbonRecordWriter = {
-    context.getConfiguration.set("carbon.outputformat.taskno", taskNo)
-    context.getConfiguration.set("carbon.outputformat.writepath",
-      writePath + "/" + model.getSegmentId + "_" + model.getFactTimeStamp + ".tmp")
-    new CarbonTableOutputFormat() {
-      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-        new Path(path)
-      }
-    }.getRecordWriter(context).asInstanceOf[CarbonRecordWriter]
-  }
-
-  // TODO Implement writesupport interface to support writing Row directly to recordwriter
-  def writeCarbon(row: InternalRow): Unit = {
-    val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
-    var i = 0
-    while (i < fieldTypes.length) {
-      if (!row.isNullAt(i)) {
-        fieldTypes(i) match {
-          case StringType =>
-            data(i) = row.getString(i)
-          case d: DecimalType =>
-            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
-          case other =>
-            data(i) = row.get(i, other)
-        }
-      }
-      i += 1
-    }
-    if (partitionData.length > 0) {
-      System.arraycopy(partitionData, 0, data, fieldTypes.length, partitionData.length)
-    }
-    writable.set(data)
-    recordWriter.write(NullWritable.get(), writable)
-  }
-
-
-  override def writeInternal(row: InternalRow): Unit = {
-    writeCarbon(row)
-  }
-
-  override def close(): Unit = {
-    recordWriter.close(context)
-    // write partition info to new file.
-    val partitonList = new util.ArrayList[String]()
-    val formattedPartitions =
-    // All dynamic partitions need to be converted to proper format
-      CarbonScalaUtil.updatePartitions(
-        updatedPartitions.toMap,
-        model.getCarbonDataLoadSchema.getCarbonTable)
-    formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
-    SegmentFileStore.writeSegmentFile(
-      model.getTablePath,
-      taskNo,
-      writePath,
-      model.getSegmentId + "_" + model.getFactTimeStamp + "",
-      partitonList)
-  }
-
-  def getPartitionPath(path: String,
-      attemptContext: TaskAttemptContext,
-      model: CarbonLoadModel): String = {
-    if (updatedPartitions.nonEmpty) {
-      val formattedPartitions =
-      // All dynamic partitions need to be converted to proper format
-        CarbonScalaUtil.updatePartitions(
-          updatedPartitions.toMap,
-          model.getCarbonDataLoadSchema.getCarbonTable)
-      val partitionstr = formattedPartitions.map{p =>
-        ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2)
-      }.mkString(CarbonCommonConstants.FILE_SEPARATOR)
-      model.getCarbonDataLoadSchema.getCarbonTable.getTablePath +
-        CarbonCommonConstants.FILE_SEPARATOR + partitionstr
-    } else {
-      var updatedPath = FileFactory.getUpdatedFilePath(path)
-      updatedPath.substring(0, updatedPath.lastIndexOf("/"))
-    }
-  }
-
-  def getPartitionsFromPath(
-      path: String,
-      attemptContext: TaskAttemptContext,
-      model: CarbonLoadModel): Array[String] = {
-    var attemptId = attemptContext.getTaskAttemptID.toString + "/"
-    if (path.indexOf(attemptId) > -1) {
-      val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/"))
-      if (str.length > 0) {
-        str.split("/")
-      } else {
-        Array.empty
-      }
-    } else {
-      Array.empty
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
new file mode 100644
index 0000000..fa54e0d
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.net.URI
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.text.TextOutputWriter
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types.{AtomicType, StructField, StructType}
+
+import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment}
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion}
+import org.apache.carbondata.core.reader.CarbonHeaderReader
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.scan.model.QueryModel
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+class SparkCarbonFileFormat extends FileFormat
+  with DataSourceRegister
+  with Logging
+  with Serializable {
+
+  @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def inferSchema(sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    val filePaths = CarbonUtil.getFilePathExternalFilePath(
+      options.get("path").get)
+    if (filePaths.size() == 0) {
+      throw new SparkException("CarbonData file is not present in the location mentioned in DDL")
+    }
+    val carbonHeaderReader: CarbonHeaderReader = new CarbonHeaderReader(filePaths.get(0))
+    val fileHeader = carbonHeaderReader.readHeader
+    val table_columns: java.util.List[org.apache.carbondata.format.ColumnSchema] = fileHeader
+      .getColumn_schema
+    var colArray = ArrayBuffer[StructField]()
+    for (i <- 0 to table_columns.size() - 1) {
+      val col = CarbonUtil.thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i))
+      colArray += (new StructField(col.getColumnName,
+        CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType), false))
+    }
+    colArray.+:(Nil)
+
+    Some(StructType(colArray))
+  }
+
+  override def prepareWrite(sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        new TextOutputWriter(path, dataSchema, context)
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        CarbonTablePath.CARBON_DATA_EXT
+      }
+    }
+  }
+
+  override def shortName(): String = "Carbonfile"
+
+  override def toString: String = "Carbonfile"
+
+  override def hashCode(): Int = getClass.hashCode()
+
+  override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonFileFormat]
+
+  def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = {
+    val vectorizedReader = {
+      if (sparkSession.sqlContext.sparkSession.conf
+        .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
+        sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
+      } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
+        System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
+      } else {
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+      }
+    }
+    vectorizedReader.toBoolean
+  }
+
+
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
+    val conf = sparkSession.sessionState.conf
+    conf.wholeStageEnabled &&
+    schema.length <= conf.wholeStageMaxNumFields &&
+    schema.forall(_.dataType.isInstanceOf[AtomicType])
+  }
+
+
+  def createVectorizedCarbonRecordReader(queryModel: QueryModel,
+      inputMetricsStats: InputMetricsStats, enableBatch: String): RecordReader[Void, Object] = {
+    val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
+    try {
+      val cons = Class.forName(name).getDeclaredConstructors
+      cons.head.setAccessible(true)
+      cons.head.newInstance(queryModel, inputMetricsStats, enableBatch)
+        .asInstanceOf[RecordReader[Void, Object]]
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e)
+        null
+    }
+  }
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String],
+      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+
+    val filter : Option[Expression] = filters.flatMap { filter =>
+      CarbonFilters.createCarbonFilter(dataSchema, filter)
+    }.reduceOption(new AndExpression(_, _))
+
+    val projection = requiredSchema.map(_.name).toArray
+    val carbonProjection = new CarbonProjection
+    projection.foreach(carbonProjection.addColumn)
+
+    val conf = new Configuration()
+    val jobConf = new JobConf(conf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job = Job.getInstance(jobConf)
+    var supportBatchValue: Boolean = false
+
+    val readVector = supportVector(sparkSession, dataSchema)
+    if (readVector) {
+      supportBatchValue = supportBatch(sparkSession, dataSchema)
+    }
+
+    CarbonFileInputFormat.setTableName(job.getConfiguration, "externaldummy")
+    CarbonFileInputFormat.setDatabaseName(job.getConfiguration, "default")
+    CarbonMetadata.getInstance.removeTable("default_externaldummy")
+    val dataMapJob: DataMapJob = CarbonFileInputFormat.getDataMapJob(job.getConfiguration)
+    val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
+
+    (file: PartitionedFile) => {
+      assert(file.partitionValues.numFields == partitionSchema.size)
+
+      if (file.filePath.endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+        val fileSplit =
+          new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
+
+        val path: String = options.get("path").get
+        val endindex: Int = path.indexOf("Fact") - 1
+        val tablePath = path.substring(0, endindex)
+        lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+          tablePath,
+          "default",
+          "externaldummy")
+        val split = CarbonInputSplit.from("null", "0", fileSplit, ColumnarFormatVersion.V3, null)
+
+
+        val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+        val conf1 = new Configuration()
+        conf1.set("mapreduce.input.carboninputformat.tableName", "externaldummy")
+        conf1.set("mapreduce.input.carboninputformat.databaseName", "default")
+        conf1.set("mapreduce.input.fileinputformat.inputdir", tablePath)
+        CarbonFileInputFormat.setColumnProjection(conf1, carbonProjection)
+        filter match {
+          case Some(c) => CarbonFileInputFormat.setFilterPredicates(conf1, c)
+          case None => None
+        }
+        val attemptContext = new TaskAttemptContextImpl(conf1, attemptId)
+
+        val model = format.createQueryModel(split, attemptContext)
+
+        var segments = new java.util.ArrayList[Segment]()
+        val seg = new Segment("null", null)
+        segments.add(seg)
+        var partition : java.util.List[PartitionSpec] = new java.util.ArrayList[PartitionSpec]()
+
+
+        val segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null")
+        val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentPath)
+        if (indexFiles.size() == 0) {
+          throw new SparkException("Index file not present to read the carbondata file")
+        }
+
+        val tab = model.getTable
+        DataMapStoreManager.getInstance().clearDataMaps(identifier)
+        val dataMapExprWrapper = DataMapChooser.get
+          .choose(tab, model.getFilterExpressionResolverTree)
+
+        // TODO : handle the partition for CarbonFileLevelFormat
+        val prunedBlocklets = dataMapExprWrapper.prune(segments, null)
+
+        val detailInfo = prunedBlocklets.get(0).getDetailInfo
+        detailInfo.readColumnSchema(detailInfo.getColumnSchemaBinary)
+        split.setDetailInfo(detailInfo)
+
+        val carbonReader = if (readVector) {
+          val vectorizedReader = createVectorizedCarbonRecordReader(model,
+            null,
+            supportBatchValue.toString)
+          vectorizedReader.initialize(split, attemptContext)
+          logDebug(s"Appending $partitionSchema ${ file.partitionValues }")
+          vectorizedReader
+        } else {
+          val reader = new CarbonRecordReader(model,
+            format.getReadSupportClass(attemptContext.getConfiguration), null)
+          reader.initialize(split, attemptContext)
+          reader
+        }
+
+        val iter = new RecordReaderIterator(carbonReader)
+        Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+
+        iter.asInstanceOf[Iterator[InternalRow]]
+      }
+      else {
+        Iterator.empty
+      }
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
new file mode 100644
index 0000000..d34b201
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
+
+class SparkCarbonTableFormat
+  extends FileFormat
+    with DataSourceRegister
+    with Logging
+with Serializable {
+
+  override def shortName(): String = "carbondata"
+
+  override def inferSchema(sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    None
+  }
+
+  SparkSession.getActiveSession.get.sessionState.conf.setConfString(
+    "spark.sql.sources.commitProtocolClass",
+    "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
+
+  override def prepareWrite(
+      sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    val conf = job.getConfiguration
+    conf.setClass(
+      SQLConf.OUTPUT_COMMITTER_CLASS.key,
+      classOf[CarbonOutputCommitter],
+      classOf[CarbonOutputCommitter])
+    conf.set("carbon.commit.protocol", "carbon.commit.protocol")
+    job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
+    val table = CarbonEnv.getCarbonTable(
+      TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
+    val model = new CarbonLoadModel
+    val carbonProperty = CarbonProperties.getInstance()
+    val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
+    val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+    optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
+      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+    val partitionStr =
+      table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
+        _.getColumnName.toLowerCase).mkString(",")
+    optionsFinal.put(
+      "fileheader",
+      dataSchema.fields.map(_.name.toLowerCase).mkString(",") + "," + partitionStr)
+    val optionsLocal = new mutable.HashMap[String, String]()
+    optionsLocal ++= options
+    optionsLocal += (("header", "false"))
+    new CarbonLoadModelBuilder(table).build(
+      optionsLocal.toMap.asJava,
+      optionsFinal,
+      model,
+      conf)
+    model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
+    model.setDictionaryServerHost(options.getOrElse("dicthost", null))
+    model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
+    CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
+    model.setPartitionLoad(true)
+
+    val staticPartition = options.getOrElse("staticpartition", null)
+    if (staticPartition != null) {
+      conf.set("carbon.staticpartition", staticPartition)
+    }
+    // In case of update query there is chance to remove the older segments, so here we can set
+    // the to be deleted segments to mark as delete while updating tablestatus
+    val segemntsTobeDeleted = options.get("segmentsToBeDeleted")
+    if (segemntsTobeDeleted.isDefined) {
+      conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get)
+    }
+
+    val currPartition = options.getOrElse("currentpartition", null)
+    if (currPartition != null) {
+      conf.set("carbon.currentpartition", currPartition)
+    }
+    // Update with the current in progress load.
+    val currEntry = options.getOrElse("currentloadentry", null)
+    if (currEntry != null) {
+      val loadEntry =
+        ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails]
+      val details =
+        SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
+      model.setSegmentId(loadEntry.getLoadName)
+      model.setFactTimeStamp(loadEntry.getLoadStartTime)
+      val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
+      list.add(loadEntry)
+      model.setLoadMetadataDetails(list)
+    }
+    // Set the update timestamp if user sets in case of update query. It needs to be updated
+    // in load status update time
+    val updateTimeStamp = options.get("updatetimestamp")
+    if (updateTimeStamp.isDefined) {
+      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
+    }
+    CarbonTableOutputFormat.setLoadModel(conf, model)
+
+    new OutputWriterFactory {
+
+      /**
+       * counter used for generating task numbers. This is used to generate unique partition numbers
+       * in case of partitioning
+       */
+      val counter = new AtomicLong()
+      val taskIdMap = new ConcurrentHashMap[String, java.lang.Long]()
+
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
+        val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
+        var storeLocation: Array[String] = Array[String]()
+        val isCarbonUseLocalDir = CarbonProperties.getInstance()
+          .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
+
+
+        val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
+        val tmpLocationSuffix =
+          File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber
+        if (isCarbonUseLocalDir) {
+          val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+          if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
+            // use single dir
+            storeLocation = storeLocation :+
+              (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix)
+            if (storeLocation == null || storeLocation.isEmpty) {
+              storeLocation = storeLocation :+
+                (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
+            }
+          } else {
+            // use all the yarn dirs
+            storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
+          }
+        } else {
+          storeLocation =
+            storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
+        }
+        CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
+        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model)
+      }
+
+      /**
+       * Generate taskid using the taskid of taskcontext and the path. It should be unique in case
+       * of partition tables.
+       */
+      private def generateTaskNumber(path: String,
+          context: TaskAttemptContext, segmentId: String): String = {
+        var partitionNumber: java.lang.Long = taskIdMap.get(path)
+        if (partitionNumber == null) {
+          partitionNumber = counter.incrementAndGet()
+          // Generate taskid using the combination of taskid and partition number to make it unique.
+          taskIdMap.put(path, partitionNumber)
+        }
+        val taskID = context.getTaskAttemptID.getTaskID.getId
+        CarbonScalaUtil.generateUniqueNumber(taskID, segmentId, partitionNumber)
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        CarbonTablePath.CARBON_DATA_EXT
+      }
+
+    }
+  }
+}
+
+case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
+  extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
+  override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext,
+      absoluteDir: String,
+      ext: String): String = {
+    val carbonFlow = taskContext.getConfiguration.get("carbon.commit.protocol")
+    if (carbonFlow != null) {
+      super.newTaskTempFile(taskContext, Some(absoluteDir), ext)
+    } else {
+      super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
+    }
+  }
+}
+
+/**
+ * It is a just class to make compile between spark 2.1 and 2.2
+ */
+private trait AbstractCarbonOutputWriter {
+  def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
+  def writeInternal(row: InternalRow): Unit = {
+    writeCarbon(row)
+  }
+  def write(row: InternalRow): Unit = {
+    writeCarbon(row)
+  }
+  def writeCarbon(row: InternalRow): Unit
+}
+
+private class CarbonOutputWriter(path: String,
+    context: TaskAttemptContext,
+    fieldTypes: Seq[DataType],
+    taskNo : String,
+    model: CarbonLoadModel)
+  extends OutputWriter with AbstractCarbonOutputWriter {
+
+  val converter = new DataTypeConverterImpl
+
+  val partitions =
+    getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName)
+  val staticPartition: util.HashMap[String, Boolean] = {
+    val staticPart = context.getConfiguration.get("carbon.staticpartition")
+    if (staticPart != null) {
+      ObjectSerializationUtil.convertStringToObject(
+        staticPart).asInstanceOf[util.HashMap[String, Boolean]]
+    } else {
+      null
+    }
+  }
+  lazy val currPartitions: util.List[indexstore.PartitionSpec] = {
+    val currParts = context.getConfiguration.get("carbon.currentpartition")
+    if (currParts != null) {
+      ObjectSerializationUtil.convertStringToObject(
+        currParts).asInstanceOf[util.List[indexstore.PartitionSpec]]
+    } else {
+      new util.ArrayList[indexstore.PartitionSpec]()
+    }
+  }
+  var (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
+    val updatedPartitions = partitions.map(splitPartition)
+    (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
+  } else {
+    (Map.empty[String, String].toArray, Array.empty)
+  }
+
+  private def splitPartition(p: String) = {
+    val value = p.substring(p.indexOf("=") + 1, p.length)
+    val col = p.substring(0, p.indexOf("="))
+    // NUll handling case. For null hive creates with this special name
+    if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
+      (col, null)
+      // we should replace back the special string with empty value.
+    } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+      (col, "")
+    } else {
+      (col, value)
+    }
+  }
+
+  lazy val writePath = {
+    val updatedPath = getPartitionPath(path, context, model)
+    // in case of partition location specified by user then search the partitions from the current
+    // partitions to get the corresponding partitions.
+    if (partitions.isEmpty) {
+      val writeSpec = new indexstore.PartitionSpec(null, updatedPath)
+      val index = currPartitions.indexOf(writeSpec)
+      if (index > -1) {
+        val spec = currPartitions.get(index)
+        updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray
+        partitionData = updatePartitions(updatedPartitions.map(_._2))
+      }
+    }
+    updatedPath
+  }
+
+  val writable = new ObjectArrayWritable
+
+  private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = {
+    model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
+      .getColumnSchemaList.asScala.zipWithIndex.map { case (col, index) =>
+
+      val dataType = if (col.hasEncoding(Encoding.DICTIONARY)) {
+        DataTypes.INT
+      } else if (col.getDataType.equals(DataTypes.TIMESTAMP) ||
+                         col.getDataType.equals(DataTypes.DATE)) {
+        DataTypes.LONG
+      } else {
+        col.getDataType
+      }
+      if (staticPartition != null && staticPartition.get(col.getColumnName.toLowerCase)) {
+        val converetedVal =
+          CarbonScalaUtil.convertStaticPartitions(
+            partitionData(index),
+            col,
+            model.getCarbonDataLoadSchema.getCarbonTable)
+        if (col.hasEncoding(Encoding.DICTIONARY)) {
+          converetedVal.toInt.asInstanceOf[AnyRef]
+        } else {
+          DataTypeUtil.getDataBasedOnDataType(
+            converetedVal,
+            dataType,
+            converter)
+        }
+      } else {
+        DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType, converter)
+      }
+    }.toArray
+  }
+
+  private val recordWriter: CarbonRecordWriter = {
+    context.getConfiguration.set("carbon.outputformat.taskno", taskNo)
+    context.getConfiguration.set("carbon.outputformat.writepath",
+      writePath + "/" + model.getSegmentId + "_" + model.getFactTimeStamp + ".tmp")
+    new CarbonTableOutputFormat() {
+      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+        new Path(path)
+      }
+    }.getRecordWriter(context).asInstanceOf[CarbonRecordWriter]
+  }
+
+  // TODO Implement writesupport interface to support writing Row directly to recordwriter
+  def writeCarbon(row: InternalRow): Unit = {
+    val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
+    var i = 0
+    while (i < fieldTypes.length) {
+      if (!row.isNullAt(i)) {
+        fieldTypes(i) match {
+          case StringType =>
+            data(i) = row.getString(i)
+          case d: DecimalType =>
+            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+          case other =>
+            data(i) = row.get(i, other)
+        }
+      }
+      i += 1
+    }
+    if (partitionData.length > 0) {
+      System.arraycopy(partitionData, 0, data, fieldTypes.length, partitionData.length)
+    }
+    writable.set(data)
+    recordWriter.write(NullWritable.get(), writable)
+  }
+
+
+  override def writeInternal(row: InternalRow): Unit = {
+    writeCarbon(row)
+  }
+
+  override def close(): Unit = {
+    recordWriter.close(context)
+    // write partition info to new file.
+    val partitonList = new util.ArrayList[String]()
+    val formattedPartitions =
+    // All dynamic partitions need to be converted to proper format
+      CarbonScalaUtil.updatePartitions(
+        updatedPartitions.toMap,
+        model.getCarbonDataLoadSchema.getCarbonTable)
+    formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
+    SegmentFileStore.writeSegmentFile(
+      model.getTablePath,
+      taskNo,
+      writePath,
+      model.getSegmentId + "_" + model.getFactTimeStamp + "",
+      partitonList)
+  }
+
+  def getPartitionPath(path: String,
+      attemptContext: TaskAttemptContext,
+      model: CarbonLoadModel): String = {
+    if (updatedPartitions.nonEmpty) {
+      val formattedPartitions =
+      // All dynamic partitions need to be converted to proper format
+        CarbonScalaUtil.updatePartitions(
+          updatedPartitions.toMap,
+          model.getCarbonDataLoadSchema.getCarbonTable)
+      val partitionstr = formattedPartitions.map{p =>
+        ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2)
+      }.mkString(CarbonCommonConstants.FILE_SEPARATOR)
+      model.getCarbonDataLoadSchema.getCarbonTable.getTablePath +
+        CarbonCommonConstants.FILE_SEPARATOR + partitionstr
+    } else {
+      var updatedPath = FileFactory.getUpdatedFilePath(path)
+      updatedPath.substring(0, updatedPath.lastIndexOf("/"))
+    }
+  }
+
+  def getPartitionsFromPath(
+      path: String,
+      attemptContext: TaskAttemptContext,
+      model: CarbonLoadModel): Array[String] = {
+    var attemptId = attemptContext.getTaskAttemptID.toString + "/"
+    if (path.indexOf(attemptId) > -1) {
+      val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/"))
+      if (str.length > 0) {
+        str.split("/")
+      } else {
+        Array.empty
+      }
+    } else {
+      Array.empty
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index ec20ec2..d85ef68 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -110,7 +110,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
             alterTableChangeDataTypeModel.databaseName))(sparkSession)
         if (isCarbonTable) {
-          ExecutedCommandExec(dataTypeChange) :: Nil
+          val carbonTable = CarbonEnv.getCarbonTable(alterTableChangeDataTypeModel.databaseName,
+            alterTableChangeDataTypeModel.tableName)(sparkSession)
+          if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported alter operation on Carbon external fileformat table")
+          } else {
+            ExecutedCommandExec(dataTypeChange) :: Nil
+          }
         } else {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
@@ -119,7 +126,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           .tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
             alterTableAddColumnsModel.databaseName))(sparkSession)
         if (isCarbonTable) {
-          ExecutedCommandExec(addColumn) :: Nil
+          val carbonTable = CarbonEnv.getCarbonTable(alterTableAddColumnsModel.databaseName,
+            alterTableAddColumnsModel.tableName)(sparkSession)
+          if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported alter operation on Carbon external fileformat table")
+          } else {
+            ExecutedCommandExec(addColumn) :: Nil
+          }
         } else {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
@@ -128,7 +142,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           .tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
             alterTableDropColumnModel.databaseName))(sparkSession)
         if (isCarbonTable) {
-          ExecutedCommandExec(dropColumn) :: Nil
+          val carbonTable = CarbonEnv.getCarbonTable(alterTableDropColumnModel.databaseName,
+            alterTableDropColumnModel.tableName)(sparkSession)
+          if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported alter operation on Carbon external fileformat table")
+          } else {
+            ExecutedCommandExec(dropColumn) :: Nil
+          }
         } else {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 4996bec..b2f4505 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
-import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index e0fff08..69fd366 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -33,7 +33,9 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
@@ -144,19 +146,24 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
       .getOrElse(Map.empty)
   }
 
-  def createCarbonTable(tableHeader: CreateTableHeaderContext,
-      skewSpecContext: SkewSpecContext,
-      bucketSpecContext: BucketSpecContext,
-      partitionColumns: ColTypeListContext,
-      columns : ColTypeListContext,
-      tablePropertyList : TablePropertyListContext,
-      locationSpecContext: SqlBaseParser.LocationSpecContext,
-      tableComment : Option[String],
-      ctas: TerminalNode,
-      query: QueryContext) : LogicalPlan = {
+  def createCarbonTable(createTableTuple: (CreateTableHeaderContext, SkewSpecContext,
+    BucketSpecContext, ColTypeListContext, ColTypeListContext, TablePropertyListContext,
+    LocationSpecContext, Option[String], TerminalNode, QueryContext, String)): LogicalPlan = {
     // val parser = new CarbonSpark2SqlParser
 
+    val (tableHeader, skewSpecContext,
+      bucketSpecContext,
+      partitionColumns,
+      columns,
+      tablePropertyList,
+      locationSpecContext,
+      tableComment,
+      ctas,
+      query,
+      provider) = createTableTuple
+
     val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
+
     // TODO: implement temporary tables
     if (temp) {
       throw new ParseException(
@@ -256,13 +263,27 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
         CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession),
         tableIdentifier.table)
       val table = try {
-        SchemaReader.getTableInfo(identifier)
-      } catch {
+        val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
+        if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath)) &&
+            provider.equalsIgnoreCase("'Carbonfile'")) {
+          SchemaReader.inferSchema(identifier)
+        }
+        else {
+          SchemaReader.getTableInfo(identifier)
+        }
+      }
+        catch {
         case e: Throwable =>
           operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader)
       }
       // set "_external" property, so that DROP TABLE will not delete the data
-      table.getFactTable.getTableProperties.put("_external", "true")
+      if (provider.equalsIgnoreCase("'Carbonfile'")) {
+        table.getFactTable.getTableProperties.put("_filelevelexternal", "true")
+        table.getFactTable.getTableProperties.put("_external", "false")
+      } else {
+        table.getFactTable.getTableProperties.put("_external", "true")
+        table.getFactTable.getTableProperties.put("_filelevelexternal", "false")
+      }
       table
     } else {
       // prepare table model of the collected tokens

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index ba2fe947..c6bab9e 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -326,18 +326,13 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
     val fileStorage = helper.getFileStorage(ctx.createFileFormat)
 
     if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("'Carbonfile'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      helper.createCarbonTable(
-        tableHeader = ctx.createTableHeader,
-        skewSpecContext = ctx.skewSpec,
-        bucketSpecContext = ctx.bucketSpec,
-        partitionColumns = ctx.partitionColumns,
-        columns = ctx.columns,
-        tablePropertyList = ctx.tablePropertyList,
-        locationSpecContext = ctx.locationSpec(),
-        tableComment = Option(ctx.STRING()).map(string),
-        ctas = ctx.AS,
-        query = ctx.query)
+      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec,
+        ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(),
+        Option(ctx.STRING()).map(string),
+        ctx.AS, ctx.query, fileStorage)
+        helper.createCarbonTable(createTableTuple)
     } else {
       super.visitCreateTable(ctx)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index f033a8e..c28e4ba 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -325,18 +325,12 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
     val fileStorage = helper.getFileStorage(ctx.createFileFormat)
 
     if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+        fileStorage.equalsIgnoreCase("'Carbonfile'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
-      helper.createCarbonTable(
-        tableHeader = ctx.createTableHeader,
-        skewSpecContext = ctx.skewSpec,
-        bucketSpecContext = ctx.bucketSpec,
-        partitionColumns = ctx.partitionColumns,
-        columns = ctx.columns,
-        tablePropertyList = ctx.tablePropertyList,
-        locationSpecContext = ctx.locationSpec(),
-        tableComment = Option(ctx.STRING()).map(string),
-        ctas = ctx.AS,
-        query = ctx.query)
+      val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
+        ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
+        Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
+      helper.createCarbonTable(createTableTuple)
     } else {
       super.visitCreateHiveTable(ctx)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index d09c9b5..5831f3e 100644
--- a/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/integration/spark2/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -14,4 +14,5 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ------------------------------------------------------------------------
-org.apache.spark.sql.CarbonSource
\ No newline at end of file
+org.apache.spark.sql.CarbonSource
+org.apache.spark.sql.SparkCarbonFileFormat
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
index 0ac6f38..b15dafd 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
 
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -113,9 +114,12 @@ public class CSVCarbonWriterSuite {
         writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
       }
       writer.close();
-    } catch (Exception e) {
+    } catch (IOException e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
+    } catch (InvalidLoadOptionException l) {
+      l.printStackTrace();
+      Assert.fail(l.getMessage());
     }
 
     File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));


[6/6] carbondata git commit: [CARBONDATA-2224][File Level Reader Support] External File level reader support

Posted by ja...@apache.org.
[CARBONDATA-2224][File Level Reader Support] External File level reader support

File level reader reads any carbondata file placed in any external file path.

This closes #2055


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7a124ecd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7a124ecd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7a124ecd

Branch: refs/heads/carbonfile
Commit: 7a124ecd87769c0197ae67a0726c5abb4745d3a8
Parents: a386f1f
Author: sounakr <so...@gmail.com>
Authored: Sat Feb 24 07:55:14 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 16 17:33:43 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |   6 +
 .../apache/carbondata/core/util/CarbonUtil.java | 209 +++++-
 .../hadoop/api/CarbonFileInputFormat.java       | 682 +++++++++++++++++++
 .../carbondata/hadoop/util/SchemaReader.java    |  17 +-
 integration/spark-common-test/pom.xml           |   6 +
 ...FileInputFormatWithExternalCarbonTable.scala | 240 +++++++
 ...tCreateTableUsingSparkCarbonFileFormat.scala | 327 +++++++++
 ...tSparkCarbonFileFormatWithSparkSession.scala | 176 +++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  64 +-
 .../VectorizedCarbonRecordReader.java           |  22 +-
 .../management/CarbonLoadDataCommand.scala      |   4 +-
 .../command/table/CarbonDropTableCommand.scala  |   2 +-
 .../datasources/CarbonFileFormat.scala          | 443 ------------
 .../datasources/SparkCarbonFileFormat.scala     | 269 ++++++++
 .../datasources/SparkCarbonTableFormat.scala    | 443 ++++++++++++
 .../sql/execution/strategy/DDLStrategy.scala    |  27 +-
 .../spark/sql/hive/CarbonAnalysisRules.scala    |   2 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  47 +-
 .../spark/sql/hive/CarbonSessionState.scala     |  17 +-
 .../spark/sql/hive/CarbonSessionState.scala     |  16 +-
 ....apache.spark.sql.sources.DataSourceRegister |   3 +-
 .../sdk/file/CSVCarbonWriterSuite.java          |   6 +-
 22 files changed, 2522 insertions(+), 506 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f14672f..278dc96 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -826,6 +826,12 @@ public class CarbonTable implements Serializable {
     return external != null && external.equalsIgnoreCase("true");
   }
 
+  public boolean isFileLevelExternalTable() {
+    String external = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal");
+    return external != null && external.equalsIgnoreCase("true");
+  }
+
+
   public long size() throws IOException {
     Map<String, Long> dataIndexSize = CarbonUtil.calculateDataIndexSize(this);
     Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b961b60..ff49edf 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
@@ -52,18 +53,26 @@ import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypeAdapter;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
 import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
 import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
 import org.apache.carbondata.core.reader.ThriftReader;
 import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
@@ -77,6 +86,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.BlockletHeader;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
+import org.apache.carbondata.format.FileHeader;
+
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -1279,7 +1290,7 @@ public final class CarbonUtil {
     int counter = 0;
     for (int i = 0; i < wrapperColumnSchemaList.size(); i++) {
       if (CarbonUtil.hasEncoding(wrapperColumnSchemaList.get(i).getEncodingList(),
-          org.apache.carbondata.core.metadata.encoder.Encoding.DICTIONARY)) {
+          Encoding.DICTIONARY)) {
         cardinality.add(dictionaryColumnCardinality[counter]);
         counter++;
       } else if (!wrapperColumnSchemaList.get(i).isDimensionColumn()) {
@@ -2068,6 +2079,202 @@ public final class CarbonUtil {
     return tableInfo;
   }
 
+  public static ColumnSchema thriftColumnSchmeaToWrapperColumnSchema(
+      org.apache.carbondata.format.ColumnSchema externalColumnSchema) {
+    ColumnSchema wrapperColumnSchema = new ColumnSchema();
+    wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
+    wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
+    wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar());
+    DataType dataType = thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type);
+    if (DataTypes.isDecimal(dataType)) {
+      DecimalType decimalType = (DecimalType) dataType;
+      decimalType.setPrecision(externalColumnSchema.getPrecision());
+      decimalType.setScale(externalColumnSchema.getScale());
+    }
+    wrapperColumnSchema.setDataType(dataType);
+    wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension());
+    List<Encoding> encoders = new ArrayList<Encoding>();
+    for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) {
+      encoders.add(fromExternalToWrapperEncoding(encoder));
+    }
+    wrapperColumnSchema.setEncodingList(encoders);
+    wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child());
+    wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision());
+    wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id());
+    wrapperColumnSchema.setScale(externalColumnSchema.getScale());
+    wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
+    wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal());
+    Map<String, String> properties = externalColumnSchema.getColumnProperties();
+    if (properties != null) {
+      if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) {
+        wrapperColumnSchema.setSortColumn(true);
+      }
+    }
+    wrapperColumnSchema.setFunction(externalColumnSchema.getAggregate_function());
+    List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation =
+        externalColumnSchema.getParentColumnTableRelations();
+    if (null != parentColumnTableRelation) {
+      wrapperColumnSchema.setParentColumnTableRelations(
+          fromThriftToWrapperParentTableColumnRelations(parentColumnTableRelation));
+    }
+    return wrapperColumnSchema;
+  }
+
+  static List<ParentColumnTableRelation> fromThriftToWrapperParentTableColumnRelations(
+      List<org.apache.carbondata.format.ParentColumnTableRelation> thirftParentColumnRelation) {
+    List<ParentColumnTableRelation> parentColumnTableRelationList = new ArrayList<>();
+    for (org.apache.carbondata.format.ParentColumnTableRelation carbonTableRelation :
+        thirftParentColumnRelation) {
+      RelationIdentifier relationIdentifier =
+          new RelationIdentifier(carbonTableRelation.getRelationIdentifier().getDatabaseName(),
+              carbonTableRelation.getRelationIdentifier().getTableName(),
+              carbonTableRelation.getRelationIdentifier().getTableId());
+      ParentColumnTableRelation parentColumnTableRelation =
+          new ParentColumnTableRelation(relationIdentifier, carbonTableRelation.getColumnId(),
+              carbonTableRelation.getColumnName());
+      parentColumnTableRelationList.add(parentColumnTableRelation);
+    }
+    return parentColumnTableRelationList;
+  }
+
+  static Encoding fromExternalToWrapperEncoding(
+      org.apache.carbondata.format.Encoding encoderThrift) {
+    switch (encoderThrift) {
+      case DICTIONARY:
+        return Encoding.DICTIONARY;
+      case DELTA:
+        return Encoding.DELTA;
+      case RLE:
+        return Encoding.RLE;
+      case INVERTED_INDEX:
+        return Encoding.INVERTED_INDEX;
+      case BIT_PACKED:
+        return Encoding.BIT_PACKED;
+      case DIRECT_DICTIONARY:
+        return Encoding.DIRECT_DICTIONARY;
+      default:
+        throw new IllegalArgumentException(encoderThrift.toString() + " is not supported");
+    }
+  }
+
+  static DataType thriftDataTyopeToWrapperDataType(
+      org.apache.carbondata.format.DataType dataTypeThrift) {
+    switch (dataTypeThrift) {
+      case BOOLEAN:
+        return DataTypes.BOOLEAN;
+      case STRING:
+        return DataTypes.STRING;
+      case SHORT:
+        return DataTypes.SHORT;
+      case INT:
+        return DataTypes.INT;
+      case LONG:
+        return DataTypes.LONG;
+      case DOUBLE:
+        return DataTypes.DOUBLE;
+      case DECIMAL:
+        return DataTypes.createDefaultDecimalType();
+      case DATE:
+        return DataTypes.DATE;
+      case TIMESTAMP:
+        return DataTypes.TIMESTAMP;
+      case ARRAY:
+        return DataTypes.createDefaultArrayType();
+      case STRUCT:
+        return DataTypes.createDefaultStructType();
+      default:
+        return DataTypes.STRING;
+    }
+  }
+
+  public static List<String> getFilePathExternalFilePath(String path) {
+
+    // return the list of carbondata files in the given path.
+    CarbonFile segment = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+    CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+
+        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+          return true;
+        }
+        return false;
+      }
+    });
+    List<String> filePaths = new ArrayList<>(dataFiles.length);
+    for (CarbonFile dfiles : dataFiles) {
+      filePaths.add(dfiles.getAbsolutePath());
+    }
+    return filePaths;
+  }
+
+  /**
+   * This method will read the schema file from a given path
+   *
+   * @param schemaFilePath
+   * @return
+   */
+  public static org.apache.carbondata.format.TableInfo inferSchemaFileExternalTable(
+      String carbonDataFilePath, AbsoluteTableIdentifier absoluteTableIdentifier,
+      boolean schemaExists) throws IOException {
+    TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+      public org.apache.thrift.TBase<org.apache.carbondata.format.TableInfo,
+          org.apache.carbondata.format.TableInfo._Fields> create() {
+        return new org.apache.carbondata.format.TableInfo();
+      }
+    };
+    if (schemaExists == false) {
+      List<String> filePaths =
+          getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
+      String fistFilePath = null;
+      try {
+        fistFilePath = filePaths.get(0);
+      } catch (Exception e) {
+        LOGGER.error("CarbonData file is not present in the table location");
+      }
+      CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(fistFilePath);
+      FileHeader fileHeader = carbonHeaderReader.readHeader();
+      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+      List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
+      for (int i = 0; i < table_columns.size(); i++) {
+        ColumnSchema col = thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i));
+        col.setColumnReferenceId(col.getColumnUniqueId());
+        columnSchemaList.add(col);
+      }
+      TableSchema tableSchema = new TableSchema();
+      tableSchema.setTableName(absoluteTableIdentifier.getTableName());
+      tableSchema.setBucketingInfo(null);
+      tableSchema.setSchemaEvalution(null);
+      tableSchema.setTableId(UUID.randomUUID().toString());
+      tableSchema.setListOfColumns(columnSchemaList);
+
+      ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter =
+          new ThriftWrapperSchemaConverterImpl();
+      SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry();
+      schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis());
+      SchemaEvolution schemaEvol = new SchemaEvolution();
+      List<SchemaEvolutionEntry> schEntryList = new ArrayList<>();
+      schEntryList.add(schemaEvolutionEntry);
+      schemaEvol.setSchemaEvolutionEntryList(schEntryList);
+      tableSchema.setSchemaEvalution(schemaEvol);
+
+      org.apache.carbondata.format.TableSchema thriftFactTable =
+          thriftWrapperSchemaConverter.fromWrapperToExternalTableSchema(tableSchema);
+      org.apache.carbondata.format.TableInfo tableInfo =
+          new org.apache.carbondata.format.TableInfo(thriftFactTable,
+              new ArrayList<org.apache.carbondata.format.TableSchema>());
+
+      tableInfo.setDataMapSchemas(null);
+      return tableInfo;
+    } else {
+      ThriftReader thriftReader = new ThriftReader(carbonDataFilePath, createTBase);
+      thriftReader.open();
+      org.apache.carbondata.format.TableInfo tableInfo =
+          (org.apache.carbondata.format.TableInfo) thriftReader.read();
+      thriftReader.close();
+      return tableInfo;
+    }
+  }
+
 
   public static void dropDatabaseDirectory(String databasePath)
       throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
new file mode 100644
index 0000000..b86b1cc
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.SingleTableProvider;
+import org.apache.carbondata.core.scan.filter.TableProvider;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+
+/**
+ * Input format of CarbonData file.
+ *
+ * @param <T>
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable {
+
+  public static final String READ_SUPPORT_CLASS = "carbon.read.support.class";
+  // comma separated list of input segment numbers
+  public static final String INPUT_SEGMENT_NUMBERS =
+      "mapreduce.input.carboninputformat.segmentnumbers";
+  private static final String VALIDATE_INPUT_SEGMENT_IDs =
+      "mapreduce.input.carboninputformat.validsegments";
+  // comma separated list of input files
+  public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
+  private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
+  private static final Log LOG = LogFactory.getLog(CarbonFileInputFormat.class);
+  private static final String FILTER_PREDICATE =
+      "mapreduce.input.carboninputformat.filter.predicate";
+  private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
+  private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
+  private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
+  private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
+  private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
+  public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
+  public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
+  private static final String PARTITIONS_TO_PRUNE =
+      "mapreduce.input.carboninputformat.partitions.to.prune";
+  public static final String UPADTE_T =
+      "mapreduce.input.carboninputformat.partitions.to.prune";
+
+  // a cache for carbon table, it will be used in task side
+  private CarbonTable carbonTable;
+
+  /**
+   * Set the `tableInfo` in `configuration`
+   */
+  public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
+      throws IOException {
+    if (null != tableInfo) {
+      configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
+    }
+  }
+
+  /**
+   * Get TableInfo object from `configuration`
+   */
+  private static TableInfo getTableInfo(Configuration configuration) throws IOException {
+    String tableInfoStr = configuration.get(TABLE_INFO);
+    if (tableInfoStr == null) {
+      return null;
+    } else {
+      TableInfo output = new TableInfo();
+      output.readFields(
+          new DataInputStream(
+              new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
+      return output;
+    }
+  }
+
+
+  public static void setTablePath(Configuration configuration, String tablePath) {
+    configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+  }
+
+  public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
+    configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
+  }
+
+
+  public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
+      throws IOException {
+    if (dataMapJob != null) {
+      String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
+      configuration.set(DATA_MAP_DSTR, toString);
+    }
+  }
+
+  public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
+    String jobString = configuration.get(DATA_MAP_DSTR);
+    if (jobString != null) {
+      return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
+    }
+    return null;
+  }
+
+  /**
+   * It sets unresolved filter expression.
+   *
+   * @param configuration
+   * @param filterExpression
+   */
+  public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
+    if (filterExpression == null) {
+      return;
+    }
+    try {
+      String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
+      configuration.set(FILTER_PREDICATE, filterString);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while setting filter expression to Job", e);
+    }
+  }
+
+  public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
+    if (projection == null || projection.isEmpty()) {
+      return;
+    }
+    String[] allColumns = projection.getAllColumns();
+    StringBuilder builder = new StringBuilder();
+    for (String column : allColumns) {
+      builder.append(column).append(",");
+    }
+    String columnString = builder.toString();
+    columnString = columnString.substring(0, columnString.length() - 1);
+    configuration.set(COLUMN_PROJECTION, columnString);
+  }
+
+  public static String getColumnProjection(Configuration configuration) {
+    return configuration.get(COLUMN_PROJECTION);
+  }
+
+
+  /**
+   * Set list of segments to access
+   */
+  public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
+    configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
+  }
+
+  /**
+   * Set `CARBON_INPUT_SEGMENTS` from property to configuration
+   */
+  public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
+    String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
+    String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
+    String segmentNumbersFromProperty = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
+    if (!segmentNumbersFromProperty.trim().equals("*")) {
+      CarbonFileInputFormat
+          .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
+    }
+  }
+
+  /**
+   * set list of segment to access
+   */
+  public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
+    configuration.set(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
+  }
+
+  /**
+   * get list of segment to access
+   */
+  public static boolean getValidateSegmentsToAccess(Configuration configuration) {
+    return configuration.get(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
+        .equalsIgnoreCase("true");
+  }
+
+  /**
+   * set list of partitions to prune
+   */
+  public static void setPartitionsToPrune(Configuration configuration,
+      List<PartitionSpec> partitions) {
+    if (partitions == null) {
+      return;
+    }
+    try {
+      String partitionString =
+          ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
+      configuration.set(PARTITIONS_TO_PRUNE, partitionString);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while setting patition information to Job", e);
+    }
+  }
+
+  /**
+   * get list of partitions to prune
+   */
+  private static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
+      throws IOException {
+    String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
+    if (partitionString != null) {
+      return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
+    }
+    return null;
+  }
+
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+      throws IOException {
+    String tablePath = configuration.get(INPUT_DIR, "");
+    try {
+      return AbsoluteTableIdentifier
+          .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
+    } catch (InvalidConfigurationException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * Configurations FileInputFormat.INPUT_DIR
+   * are used to get table path to read.
+   *
+   * @param job
+   * @return List<InputSplit> list of CarbonInputSplit
+   * @throws IOException
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+    CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
+    if (null == carbonTable) {
+      throw new IOException("Missing/Corrupt schema file for table.");
+    }
+    //    TableDataMap blockletMap = DataMapStoreManager.getInstance()
+    //        .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
+
+    if (getValidateSegmentsToAccess(job.getConfiguration())) {
+      // get all valid segments and set them into the configuration
+      // check for externalTable segment (Segment_null)
+      // process and resolve the expression
+      Expression filter = getFilterPredicates(job.getConfiguration());
+      TableProvider tableProvider = new SingleTableProvider(carbonTable);
+      // this will be null in case of corrupt schema file.
+      PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
+      CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
+
+      FilterResolverIntf filterInterface = CarbonInputFormatUtil
+          .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
+
+      String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null");
+      FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
+      if (FileFactory.isFileExist(segmentDir, fileType)) {
+        // if external table Segments are found, add it to the List
+        List<Segment> externalTableSegments = new ArrayList<Segment>();
+        Segment seg = new Segment("null", null);
+        externalTableSegments.add(seg);
+
+        Map<String, String> indexFiles =
+            new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir);
+
+        if (indexFiles.size() == 0) {
+          throw new RuntimeException("Index file not present to read the carbondata file");
+        }
+        // do block filtering and get split
+        List<InputSplit> splits =
+            getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
+
+        return splits;
+      }
+    }
+    return null;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
+   * are used to get table path to read.
+   *
+   * @return
+   * @throws IOException
+   */
+  private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
+      List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
+      List<Integer> oldPartitionIdList) throws IOException {
+
+    List<InputSplit> result = new LinkedList<InputSplit>();
+    UpdateVO invalidBlockVOForSegmentId = null;
+    Boolean isIUDTable = false;
+
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+    isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
+
+    // for each segment fetch blocks matching filter in Driver BTree
+    List<CarbonInputSplit> dataBlocksOfSegment =
+        getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+            validSegments, partitionInfo, oldPartitionIdList);
+    for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
+
+      // Get the UpdateVO for those tables on which IUD operations being performed.
+      if (isIUDTable) {
+        invalidBlockVOForSegmentId =
+            updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
+      }
+      String[] deleteDeltaFilePath = null;
+      if (isIUDTable) {
+        // In case IUD is not performed in this table avoid searching for
+        // invalidated blocks.
+        if (CarbonUtil
+            .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
+                invalidBlockVOForSegmentId, updateStatusManager)) {
+          continue;
+        }
+        // When iud is done then only get delete delta files for a block
+        try {
+          deleteDeltaFilePath = updateStatusManager
+              .getDeleteDeltaFilePath(inputSplit.getPath().toString(), inputSplit.getSegmentId());
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+      inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
+      result.add(inputSplit);
+    }
+    return result;
+  }
+
+  protected Expression getFilterPredicates(Configuration configuration) {
+    try {
+      String filterExprString = configuration.get(FILTER_PREDICATE);
+      if (filterExprString == null) {
+        return null;
+      }
+      Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+      return (Expression) filter;
+    } catch (IOException e) {
+      throw new RuntimeException("Error while reading filter expression", e);
+    }
+  }
+
+  /**
+   * get data blocks of given segment
+   */
+  private List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
+      AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+      BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
+      List<Integer> oldPartitionIdList) throws IOException {
+
+    QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+    QueryStatistic statistic = new QueryStatistic();
+
+    // get tokens for all the required FileSystem for table path
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+        new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+    boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+    DataMapExprWrapper dataMapExprWrapper =
+        DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
+    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
+    List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
+    List<ExtendedBlocklet> prunedBlocklets;
+    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
+      DistributableDataMapFormat datamapDstr =
+          new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
+              segmentIds, partitionsToPrune,
+              BlockletDataMapFactory.class.getName());
+      prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+      // Apply expression on the blocklets.
+      prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
+    } else {
+      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
+    }
+
+    List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+    int partitionIndex = 0;
+    List<Integer> partitionIdList = new ArrayList<>();
+    if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+      partitionIdList = partitionInfo.getPartitionIds();
+    }
+    for (ExtendedBlocklet blocklet : prunedBlocklets) {
+      long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
+          CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
+
+      // OldPartitionIdList is only used in alter table partition command because it change
+      // partition info first and then read data.
+      // For other normal query should use newest partitionIdList
+      if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+        if (oldPartitionIdList != null) {
+          partitionIndex = oldPartitionIdList.indexOf((int)partitionId);
+        } else {
+          partitionIndex = partitionIdList.indexOf((int)partitionId);
+        }
+      }
+      if (partitionIndex != -1) {
+        // matchedPartitions variable will be null in two cases as follows
+        // 1. the table is not a partition table
+        // 2. the table is a partition table, and all partitions are matched by query
+        // for partition table, the task id of carbaondata file name is the partition id.
+        // if this partition is not required, here will skip it.
+        if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
+          CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
+          if (inputSplit != null) {
+            resultFilterredBlocks.add(inputSplit);
+          }
+        }
+      }
+    }
+    statistic
+        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+    recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+    return resultFilterredBlocks;
+  }
+
+  private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
+    CarbonInputSplit split =
+        CarbonInputSplit.from(blocklet.getSegmentId(),
+            blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0,
+                blocklet.getLength(), blocklet.getLocations()),
+            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
+            blocklet.getDataMapWriterPath());
+    split.setDetailInfo(blocklet.getDetailInfo());
+    return split;
+  }
+
+  @Override
+  public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+    Configuration configuration = taskAttemptContext.getConfiguration();
+    QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
+    CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
+    return new CarbonRecordReader<T>(queryModel, readSupport);
+  }
+
+  public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException {
+    Configuration configuration = taskAttemptContext.getConfiguration();
+    CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
+    TableProvider tableProvider = new SingleTableProvider(carbonTable);
+
+    // query plan includes projection column
+    String projectionString = getColumnProjection(configuration);
+    String[] projectionColumnNames = null;
+    if (projectionString != null) {
+      projectionColumnNames = projectionString.split(",");
+    }
+    QueryModel queryModel = carbonTable.createQueryWithProjection(
+        projectionColumnNames, getDataTypeConverter(configuration));
+
+    // set the filter to the query model in order to filter blocklet before scan
+    Expression filter = getFilterPredicates(configuration);
+    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
+    // getAllMeasures returns list of visible and invisible columns
+    boolean[] isFilterMeasures =
+        new boolean[carbonTable.getAllMeasures().size()];
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions,
+        isFilterMeasures);
+    queryModel.setIsFilterDimensions(isFilterDimensions);
+    queryModel.setIsFilterMeasures(isFilterMeasures);
+    FilterResolverIntf filterIntf = CarbonInputFormatUtil
+        .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
+    queryModel.setFilterExpressionResolverTree(filterIntf);
+
+    // update the file level index store if there are invalid segment
+    if (inputSplit instanceof CarbonMultiBlockSplit) {
+      CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+      List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+      if (invalidSegments.size() > 0) {
+        queryModel.setInvalidSegmentIds(invalidSegments);
+      }
+      List<UpdateVO> invalidTimestampRangeList =
+          split.getAllSplits().get(0).getInvalidTimestampRange();
+      if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
+        queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
+      }
+    }
+    return queryModel;
+  }
+
+  private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+    CarbonTable carbonTableTemp;
+    if (carbonTable == null) {
+      // carbon table should be created either from deserialized table info (schema saved in
+      // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+      TableInfo tableInfo = getTableInfo(configuration);
+      CarbonTable localCarbonTable;
+      if (tableInfo != null) {
+        localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+      } else {
+        String schemaPath = CarbonTablePath
+            .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
+        if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
+          TableInfo tableInfoInfer =
+              SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration));
+          localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
+        } else {
+          localCarbonTable =
+              SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration));
+        }
+      }
+      this.carbonTable = localCarbonTable;
+      return localCarbonTable;
+    } else {
+      carbonTableTemp = this.carbonTable;
+      return carbonTableTemp;
+    }
+  }
+
+
+  public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+    String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+    //By default it uses dictionary decoder read class
+    CarbonReadSupport<T> readSupport = null;
+    if (readSupportClass != null) {
+      try {
+        Class<?> myClass = Class.forName(readSupportClass);
+        Constructor<?> constructor = myClass.getConstructors()[0];
+        Object object = constructor.newInstance();
+        if (object instanceof CarbonReadSupport) {
+          readSupport = (CarbonReadSupport) object;
+        }
+      } catch (ClassNotFoundException ex) {
+        LOG.error("Class " + readSupportClass + "not found", ex);
+      } catch (Exception ex) {
+        LOG.error("Error while creating " + readSupportClass, ex);
+      }
+    } else {
+      readSupport = new DictionaryDecodeReadSupport<>();
+    }
+    return readSupport;
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path filename) {
+    try {
+      // Don't split the file if it is local file system
+      FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
+      if (fileSystem instanceof LocalFileSystem) {
+        return false;
+      }
+    } catch (Exception e) {
+      return true;
+    }
+    return true;
+  }
+
+  /**
+   * return valid segment to access
+   */
+  private String[] getSegmentsToAccess(JobContext job) {
+    String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
+    if (segmentString.trim().isEmpty()) {
+      return new String[0];
+    }
+    return segmentString.split(",");
+  }
+
+  public static DataTypeConverter getDataTypeConverter(Configuration configuration)
+      throws IOException {
+    String converter = configuration.get(CARBON_CONVERTER);
+    if (converter == null) {
+      return new DataTypeConverterImpl();
+    }
+    return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
+  }
+
+  public static void setDatabaseName(Configuration configuration, String databaseName) {
+    if (null != databaseName) {
+      configuration.set(DATABASE_NAME, databaseName);
+    }
+  }
+
+  public static String getDatabaseName(Configuration configuration)
+      throws InvalidConfigurationException {
+    String databseName = configuration.get(DATABASE_NAME);
+    if (null == databseName) {
+      throw new InvalidConfigurationException("Database name is not set.");
+    }
+    return databseName;
+  }
+
+  public static void setTableName(Configuration configuration, String tableName) {
+    if (null != tableName) {
+      configuration.set(TABLE_NAME, tableName);
+    }
+  }
+
+  public static String getTableName(Configuration configuration)
+      throws InvalidConfigurationException {
+    String tableName = configuration.get(TABLE_NAME);
+    if (tableName == null) {
+      throw new InvalidConfigurationException("Table name is not set");
+    }
+    return tableName;
+  }
+
+  public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(
+      org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
+      throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index dfa8dd1..ab7c333 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
  * TODO: It should be removed after store manager implementation.
@@ -59,6 +58,7 @@ public class SchemaReader {
       throw new IOException("File does not exist: " + schemaFilePath);
     }
   }
+
   /**
    * the method returns the Wrapper TableInfo
    *
@@ -79,4 +79,19 @@ public class SchemaReader {
         carbonTableIdentifier.getTableName(),
         identifier.getTablePath());
   }
+
+
+  public static TableInfo inferSchema(AbsoluteTableIdentifier identifier)
+      throws IOException {
+    // This routine is going to infer schema from the carbondata file footer
+    // Convert the ColumnSchema -> TableSchema -> TableInfo.
+    // Return the TableInfo.
+    org.apache.carbondata.format.TableInfo tableInfo =
+        CarbonUtil.inferSchemaFileExternalTable(identifier.getTablePath(), identifier, false);
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    TableInfo wrapperTableInfo = schemaConverter
+        .fromExternalToWrapperTableInfo(tableInfo, identifier.getDatabaseName(),
+            identifier.getTableName(), identifier.getTablePath());
+    return wrapperTableInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index b7f19fd..1c6cee9 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -105,6 +105,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-store-sdk</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
new file mode 100644
index 0000000..8b1f63f
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll {
+
+  var writerPath = new File(this.getClass.getResource("/").getPath
+                            +
+                            "../." +
+                            "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+    .getCanonicalPath
+  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  writerPath = writerPath.replace("\\", "/");
+
+
+  def buildTestData(persistSchema:Boolean) = {
+
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+      if (persistSchema) {
+        builder.persistSchemaFile(true)
+        builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+      } else {
+        builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+      }
+
+      var i = 0
+      while (i < 100) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => None
+      case _ => None
+    }
+  }
+
+  def cleanTestData() = {
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  def deleteIndexFile(path: String, extension: String) : Unit = {
+    val file: CarbonFile = FileFactory
+      .getCarbonFile(path, FileFactory.getFileType(path))
+
+    for (eachDir <- file.listFiles) {
+      if (!eachDir.isDirectory) {
+        if (eachDir.getName.endsWith(extension)) {
+          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+        }
+      } else {
+        deleteIndexFile(eachDir.getPath, extension)
+      }
+    }
+  }
+
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    // create carbon table and insert data
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  //TO DO, need to remove segment dependency and tableIdentifier Dependency
+  test("read carbondata files (sdk Writer Output) using the Carbonfile ") {
+    buildTestData(false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //new provider Carbonfile
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("Describe formatted sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable limit 3").show(false)
+
+    sql("select name from sdkOutputTable").show(false)
+
+    sql("select age from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false)
+
+    sql("select * from sdkOutputTable where name = 'robot3'").show(200, false)
+
+    sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false)
+
+    sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false)
+
+    sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false)
+
+    sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false)
+
+    sql("select count(*) from sdkOutputTable").show(200, false)
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("should not allow to alter datasource carbontable ") {
+    buildTestData(false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //data source file format
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    val exception = intercept[MalformedCarbonCommandException]
+    {
+      sql("Alter table sdkOutputTable change age age BIGINT")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported alter operation on Carbon external fileformat table"))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file without index file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //data source file format
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    //org.apache.spark.SparkException: Index file not present to read the carbondata file
+    val exception = intercept[java.lang.RuntimeException]
+    {
+      sql("select * from sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage().contains("Index file not present to read the carbondata file"))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+  test("Read sdk writer output file without Carbondata file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[Exception] {
+      //    data source file format
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+         |'$writerPath' """.stripMargin)
+    }
+    assert(exception.getMessage()
+      .contains("Operation not allowed: Invalid table path provided:"))
+
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+  test("Read sdk writer output file without any file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[Exception] {
+      //data source file format
+      sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+         |'$writerPath' """.stripMargin)
+
+      sql("select * from sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+        .contains("Operation not allowed: Invalid table path provided:"))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
new file mode 100644
index 0000000..d284e50
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAndAfterAll {
+
+
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  var writerPath = new File(this.getClass.getResource("/").getPath
+                            +
+                            "../." +
+                            "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+    .getCanonicalPath
+  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  writerPath = writerPath.replace("\\", "/");
+
+  val filePath = writerPath + "/Fact/Part0/Segment_null/"
+
+  def buildTestData(persistSchema:Boolean) = {
+
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        if (persistSchema) {
+          builder.persistSchemaFile(true)
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+        } else {
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+        }
+
+      var i = 0
+      while (i < 100) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => None
+      case _ => None
+    }
+  }
+
+  def cleanTestData() = {
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  def deleteIndexFile(path: String, extension: String) : Unit = {
+    val file: CarbonFile = FileFactory
+      .getCarbonFile(path, FileFactory.getFileType(path))
+
+    for (eachDir <- file.listFiles) {
+      if (!eachDir.isDirectory) {
+        if (eachDir.getName.endsWith(extension)) {
+          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+        }
+      } else {
+        deleteIndexFile(eachDir.getPath, extension)
+      }
+    }
+  }
+
+  //TO DO, need to remove segment dependency and tableIdentifier Dependency
+  test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") {
+    buildTestData(false)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //data source file format
+    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+           |'$filePath' """.stripMargin)
+    } else{
+      // TO DO
+    }
+
+    sql("Describe formatted sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable limit 3").show(false)
+
+    sql("select name from sdkOutputTable").show(false)
+
+    sql("select age from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false)
+
+    sql("select * from sdkOutputTable where name = 'robot3'").show(200,false)
+
+    sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false)
+
+    sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false)
+
+    sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false)
+
+    sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false)
+
+    sql("select count(*) from sdkOutputTable").show(200,false)
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+
+
+  test("should not allow to alter datasource carbontable ") {
+    buildTestData(false)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+
+    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+           |'$filePath' """.stripMargin)
+    } else{
+      // TO DO
+    }
+
+    val exception = intercept[MalformedCarbonCommandException]
+      {
+        sql("Alter table sdkOutputTable change age age BIGINT")
+      }
+    assert(exception.getMessage().contains("Unsupported alter operation on hive table"))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file without Carbondata file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[org.apache.spark.SparkException] {
+      //    data source file format
+      if (sqlContext.sparkContext.version.startsWith("2.1")) {
+        //data source file format
+        sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+      } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+        //data source file format
+        sql(
+          s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+             |'$filePath' """.stripMargin)
+      } else{
+        // TO DO
+      }
+    }
+    assert(exception.getMessage()
+      .contains("CarbonData file is not present in the location mentioned in DDL"))
+
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+
+
+  test("Read sdk writer output file without any file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[org.apache.spark.SparkException] {
+      //data source file format
+      if (sqlContext.sparkContext.version.startsWith("2.1")) {
+        //data source file format
+        sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+      } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+        //data source file format
+        sql(
+          s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+             |'$filePath' """.stripMargin)
+      } else{
+        // TO DO
+      }
+
+      sql("select * from sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("CarbonData file is not present in the location mentioned in DDL"))
+
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file withSchema") {
+    buildTestData(true)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    //data source file format
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+           |'$filePath' """.stripMargin)
+    } else{
+      // TO DO
+    }
+
+    sql("Describe formatted sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable limit 3").show(false)
+
+    sql("select name from sdkOutputTable").show(false)
+
+    sql("select age from sdkOutputTable").show(false)
+
+    sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false)
+
+    sql("select * from sdkOutputTable where name = 'robot3'").show(200, false)
+
+    sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false)
+
+    sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false)
+
+    sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false)
+
+    sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false)
+
+    sql("select count(*) from sdkOutputTable").show(200, false)
+
+    sql("DROP TABLE sdkOutputTable")
+
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file without index file should fail") {
+    buildTestData(false)
+    deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    assert(new File(filePath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    if (sqlContext.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+    } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
+      //data source file format
+      sql(
+        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+           |'$filePath' """.stripMargin)
+    } else{
+      // TO DO
+    }
+    //org.apache.spark.SparkException: Index file not present to read the carbondata file
+    val exception = intercept[org.apache.spark.SparkException]
+      {
+        sql("select * from sdkOutputTable").show(false)
+      }
+    assert(exception.getMessage().contains("Index file not present to read the carbondata file"))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
new file mode 100644
index 0000000..9a46676
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+object TestSparkCarbonFileFormatWithSparkSession {
+
+  var writerPath = new File(this.getClass.getResource("/").getPath
+                            +
+                            "../." +
+                            "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+    .getCanonicalPath
+  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  writerPath = writerPath.replace("\\", "/");
+
+  val filePath = writerPath + "/Fact/Part0/Segment_null/"
+
+  def buildTestData(persistSchema:Boolean) = {
+
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        if (persistSchema) {
+          builder.persistSchemaFile(true)
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+        } else {
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+        }
+
+      var i = 0
+      while (i < 100) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => None
+      case _ => None
+    }
+  }
+
+  def cleanTestData() = {
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  def deleteIndexFile(path: String, extension: String) : Unit = {
+    val file: CarbonFile = FileFactory
+      .getCarbonFile(path, FileFactory.getFileType(path))
+
+    for (eachDir <- file.listFiles) {
+      if (!eachDir.isDirectory) {
+        if (eachDir.getName.endsWith(extension)) {
+          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+        }
+      } else {
+        deleteIndexFile(eachDir.getPath, extension)
+      }
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+    // clean data folder
+    if (true) {
+      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+      clean(storeLocation)
+      clean(warehouse)
+      clean(metastoredb)
+    }
+
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("TestSparkCarbonFileFormatWithSparkSession")
+      .enableHiveSupport()
+      .config("spark.sql.warehouse.dir", warehouse)
+      .config("javax.jdo.option.ConnectionURL",
+        s"jdbc:derby:;databaseName=$metastoredb;create=true")
+      .getOrCreate()
+
+    CarbonProperties.getInstance()
+      .addProperty("carbon.storelocation", storeLocation)
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss")
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
+    buildTestData(false)
+    assert(new File(filePath).exists())
+    //data source file format
+    if (spark.sparkContext.version.startsWith("2.1")) {
+      //data source file format
+      spark.sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+    } else if (spark.sparkContext.version.startsWith("2.2")) {
+      //data source file format
+      spark.sql(
+        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+           |'$filePath' """.stripMargin)
+    } else{
+      // TO DO
+    }
+
+    spark.sql("Describe formatted sdkOutputTable").show(false)
+
+    spark.sql("select * from sdkOutputTable").show(false)
+
+    spark.sql("select * from sdkOutputTable limit 3").show(false)
+
+    spark.sql("select name from sdkOutputTable").show(false)
+
+    spark.sql("select age from sdkOutputTable").show(false)
+
+    spark.sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false)
+
+    spark.sql("select * from sdkOutputTable where name = 'robot3'").show(200,false)
+
+    spark.sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false)
+
+    spark.sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false)
+
+    spark.sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false)
+
+    spark.sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false)
+
+    spark.sql("select count(*) from sdkOutputTable").show(200,false)
+
+    spark.sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(filePath).exists())
+    cleanTestData()
+
+    spark.stop()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 49a8023..6afd2c0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.filter.FilterUtil
 import org.apache.carbondata.core.scan.model.QueryModel
@@ -50,7 +50,7 @@ import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstant
 import org.apache.carbondata.core.statusmanager.FileFormat
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.hadoop._
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
@@ -90,13 +90,21 @@ class CarbonScanRDD(
     val jobConf = new JobConf(conf)
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job = Job.getInstance(jobConf)
-    val format = prepareInputFormatForDriver(job.getConfiguration)
-
+    val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal")
+    val format = if (fileLevelExternal != null && fileLevelExternal.equalsIgnoreCase("true")) {
+      prepareFileInputFormatForDriver(job.getConfiguration)
+    } else {
+      prepareInputFormatForDriver(job.getConfiguration)
+    }
     // initialise query_id for job
     job.getConfiguration.set("query.id", queryId)
 
     // get splits
     val splits = format.getSplits(job)
+    if ((splits == null) && format.isInstanceOf[CarbonFileInputFormat[Object]]) {
+      throw new SparkException(
+        "CarbonData file not exist in the segment_null (SDK writer Output) path")
+    }
 
     // separate split
     // 1. for batch splits, invoke distributeSplits method to create partitions
@@ -113,7 +121,7 @@ class CarbonScanRDD(
     }
     val batchPartitions = distributeColumnarSplits(columnarSplits)
     // check and remove InExpression from filterExpression
-    checkAndRemoveInExpressinFromFilterExpression(format, batchPartitions)
+    checkAndRemoveInExpressinFromFilterExpression(batchPartitions)
     if (streamSplits.isEmpty) {
       batchPartitions.toArray
     } else {
@@ -354,7 +362,9 @@ class CarbonScanRDD(
         case _ =>
           // create record reader for CarbonData file format
           if (vectorReader) {
-            val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats)
+            val carbonRecordReader = createVectorizedCarbonRecordReader(model,
+              inputMetricsStats,
+              "true")
             if (carbonRecordReader == null) {
               new CarbonRecordReader(model,
                 format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats)
@@ -431,6 +441,16 @@ class CarbonScanRDD(
     createInputFormat(conf)
   }
 
+  def prepareFileInputFormatForDriver(conf: Configuration): CarbonFileInputFormat[Object] = {
+    CarbonFileInputFormat.setTableInfo(conf, tableInfo)
+    CarbonFileInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
+    CarbonFileInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
+    if (partitionNames != null) {
+      CarbonFileInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
+    }
+    createFileInputFormat(conf)
+  }
+
   private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = {
     CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
     val tableInfo1 = getTableInfo
@@ -441,6 +461,32 @@ class CarbonScanRDD(
     createInputFormat(conf)
   }
 
+  private def createFileInputFormat(conf: Configuration): CarbonFileInputFormat[Object] = {
+    val format = new CarbonFileInputFormat[Object]
+    CarbonFileInputFormat.setTablePath(conf,
+      identifier.appendWithLocalPrefix(identifier.getTablePath))
+    CarbonFileInputFormat.setQuerySegment(conf, identifier)
+    CarbonFileInputFormat.setFilterPredicates(conf, filterExpression)
+    CarbonFileInputFormat.setColumnProjection(conf, columnProjection)
+    CarbonFileInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+    if (CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
+      CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+    }
+
+    // when validate segments is disabled in thread local update it to CarbonTableInputFormat
+    val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (carbonSessionInfo != null) {
+      CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+        .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+                     identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+                     identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
+    }
+    format
+  }
+
+
   private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
     val format = new CarbonTableInputFormat[Object]
     CarbonTableInputFormat.setTablePath(conf,
@@ -485,7 +531,6 @@ class CarbonScanRDD(
    * @param identifiedPartitions
    */
   private def checkAndRemoveInExpressinFromFilterExpression(
-      format: CarbonTableInputFormat[Object],
       identifiedPartitions: mutable.Buffer[Partition]) = {
     if (null != filterExpression) {
       if (identifiedPartitions.nonEmpty &&
@@ -533,12 +578,13 @@ class CarbonScanRDD(
   }
 
   def createVectorizedCarbonRecordReader(queryModel: QueryModel,
-      inputMetricsStats: InputMetricsStats): RecordReader[Void, Object] = {
+      inputMetricsStats: InputMetricsStats, enableBatch: String): RecordReader[Void, Object] = {
     val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader"
     try {
       val cons = Class.forName(name).getDeclaredConstructors
       cons.head.setAccessible(true)
-      cons.head.newInstance(queryModel, inputMetricsStats).asInstanceOf[RecordReader[Void, Object]]
+      cons.head.newInstance(queryModel, inputMetricsStats, enableBatch)
+        .asInstanceOf[RecordReader[Void, Object]]
     } catch {
       case e: Exception =>
         LOGGER.error(e)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 73da878..903bf44 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -91,10 +91,21 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
 
   private InputMetricsStats inputMetricsStats;
 
-  public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats) {
+  public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats,
+      String enableBatch) {
     this.queryModel = queryModel;
     this.inputMetricsStats = inputMetricsStats;
-    enableReturningBatches();
+    if (enableBatch.equals("true")) {
+      enableReturningBatches();
+    }
+  }
+
+
+  /*
+ * Can be called before any rows are returned to enable returning columnar batches directly.
+ */
+  public void enableReturningBatches() {
+    returnColumnarBatch = true;
   }
 
   /**
@@ -273,12 +284,7 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
     if (columnarBatch == null) initBatch();
   }
 
-  /*
-   * Can be called before any rows are returned to enable returning columnar batches directly.
-   */
-  private void enableReturningBatches() {
-    returnColumnarBatch = true;
-  }
+
 
   /**
    * Advances to the next batch of rows. Returns false if there are no more.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index eb00ebf..5fd9639 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
-import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.types._
@@ -1015,7 +1015,7 @@ case class CarbonLoadDataCommand(
       partitionSchema = partitionSchema,
       dataSchema = dataSchema,
       bucketSpec = catalogTable.bucketSpec,
-      fileFormat = new CarbonFileFormat,
+      fileFormat = new SparkCarbonTableFormat,
       options = options.toMap)(sparkSession = sparkSession)
 
     CarbonReflectionUtils.getLogicalRelation(hdfsRelation,


[4/6] carbondata git commit: [CARBONDATA-2244]fix creating pre-aggregate table bug when there are invisibility INSERT_IN_PROGRESS/INSERT_OVERWRITE_IN_PROGRESS segments on main table

Posted by ja...@apache.org.
[CARBONDATA-2244]fix creating pre-aggregate table bug when there are invisibility INSERT_IN_PROGRESS/INSERT_OVERWRITE_IN_PROGRESS segments on main table

When there are some invisibility INSERT_IN_PROGRESS/INSERT_OVERWRITE_IN_PROGRESS segments on main table, it can not create preaggregate table on it.

This closes #2050


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a386f1f4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a386f1f4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a386f1f4

Branch: refs/heads/carbonfile
Commit: a386f1f4e220210e031f10a73c26a7f56f57a603
Parents: 31011fc
Author: Zhang Zhichao <44...@qq.com>
Authored: Fri Mar 9 23:36:08 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 16 14:43:06 2018 +0800

----------------------------------------------------------------------
 .../command/preaaggregate/PreAggregateTableHelper.scala      | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a386f1f4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index 1f1e1e6..b64c91e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -165,12 +165,12 @@ case class PreAggregateTableHelper(
     // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
     // table.
     SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
-    val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
-    if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
-      load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
+    if (SegmentStatusManager.isLoadInProgressInTable(parentTable)) {
       throw new UnsupportedOperationException(
         "Cannot create pre-aggregate table when insert is in progress on main table")
-    } else if (loadAvailable.nonEmpty) {
+    }
+    val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
+    if (loadAvailable.nonEmpty) {
       // Passing segmentToLoad as * because we want to load all the segments into the
       // pre-aggregate table even if the user has set some segments on the parent table.
       loadCommand.dataFrame = Some(PreAggregateUtil


[2/6] carbondata git commit: [CARBONDATA-2248]Fixed Memory leak in parser/CarbonSparkSqlParser.scala

Posted by ja...@apache.org.
[CARBONDATA-2248]Fixed Memory leak in parser/CarbonSparkSqlParser.scala

In some scenarios where more sessions are created, there are many parser failure objects are accumulated in memory inside thread locals and causing memory leak in long run.

Solution: Remove the parser object from thread local after parsing of the query

This closes #2057


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5b48e70a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5b48e70a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5b48e70a

Branch: refs/heads/carbonfile
Commit: 5b48e70a8ab7bd662d85790aca8e3a8975e82bfe
Parents: d4f9003
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Mar 12 21:00:22 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Mar 15 17:40:47 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/parser/CarbonSparkSqlParser.scala    | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b48e70a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index ef4836e..e0fff08 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -36,7 +36,7 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
 
 /**
  * Concrete parser for Spark SQL stateENABLE_INMEMORY_MERGE_SORT_DEFAULTments and carbon specific
@@ -52,9 +52,12 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
   override def parsePlan(sqlText: String): LogicalPlan = {
     CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
     try {
-      super.parsePlan(sqlText)
+      val parsedPlan = super.parsePlan(sqlText)
+      CarbonScalaUtil.cleanParserThreadLocals
+      parsedPlan
     } catch {
       case ce: MalformedCarbonCommandException =>
+        CarbonScalaUtil.cleanParserThreadLocals
         throw ce
       case ex =>
         try {


[3/6] carbondata git commit: [CARBONDATA-2250][DataLoad] Reduce massive object generation in global sort

Posted by ja...@apache.org.
[CARBONDATA-2250][DataLoad] Reduce massive object generation in global sort

Generate compatator outside the function, otherwise it will be generated for every row

This closes #2059


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/31011fc2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/31011fc2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/31011fc2

Branch: refs/heads/carbonfile
Commit: 31011fc29f85d949bdabaa3551b7513eb1183ee5
Parents: 5b48e70
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Wed Mar 14 14:45:53 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Mar 15 17:43:39 2018 +0800

----------------------------------------------------------------------
 .../spark/load/DataLoadProcessBuilderOnSpark.scala     | 13 ++++++-------
 .../processing/sort/sortdata/NewRowComparator.java     |  4 +++-
 .../sort/sortdata/NewRowComparatorForNormalDims.java   |  5 ++++-
 3 files changed, 13 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/31011fc2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 1062cd7..dc238fb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -80,15 +80,14 @@ object DataLoadProcessBuilderOnSpark {
     // 3. Sort
     val configuration = DataLoadProcessBuilder.createConfiguration(model)
     val sortParameters = SortParameters.createSortParameters(configuration)
+    val rowComparator: Comparator[Array[AnyRef]] =
+      if (sortParameters.getNoDictionaryCount > 0) {
+        new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn)
+      } else {
+        new NewRowComparatorForNormalDims(sortParameters.getDimColCount)
+      }
     object RowOrdering extends Ordering[Array[AnyRef]] {
       def compare(rowA: Array[AnyRef], rowB: Array[AnyRef]): Int = {
-        val rowComparator: Comparator[Array[AnyRef]] =
-          if (sortParameters.getNoDictionaryCount > 0) {
-            new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn)
-          } else {
-            new NewRowComparatorForNormalDims(sortParameters.getDimColCount)
-          }
-
         rowComparator.compare(rowA, rowB)
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/31011fc2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
index 3f94533..f47ecc7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
@@ -17,11 +17,13 @@
 
 package org.apache.carbondata.processing.sort.sortdata;
 
+import java.io.Serializable;
 import java.util.Comparator;
 
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 
-public class NewRowComparator implements Comparator<Object[]> {
+public class NewRowComparator implements Comparator<Object[]>, Serializable {
+  private static final long serialVersionUID = -1739874611112709436L;
 
   /**
    * mapping of dictionary dimensions and no dictionary of sort_column.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/31011fc2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
index 7538c92..aea83ba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
@@ -16,13 +16,16 @@
  */
 package org.apache.carbondata.processing.sort.sortdata;
 
+import java.io.Serializable;
 import java.util.Comparator;
 
 /**
  * This class is used as comparator for comparing dims which are non high cardinality dims.
  * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
  */
-public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
+public class NewRowComparatorForNormalDims implements Comparator<Object[]>, Serializable {
+  private static final long serialVersionUID = -1749874611112709432L;
+
   /**
    * dimension count
    */