You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/30 09:27:38 UTC

[1/3] carbondata git commit: [CARBONDATA-1628][Streaming] Re-factory LoadTableCommand to reuse code for streaming ingest in the future [Forced Update!]

Repository: carbondata
Updated Branches:
  refs/heads/streaming_ingest a833fd47a -> 8c9873927 (forced update)


[CARBONDATA-1628][Streaming] Re-factory LoadTableCommand to reuse code for streaming ingest in the future

Re-factory LoadTableCommand to reuse code for streaming ingest in the future

This closes #1439


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5936e7fb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5936e7fb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5936e7fb

Branch: refs/heads/streaming_ingest
Commit: 5936e7fb25c9d5125a5863aa7648cffe5a1d8f7c
Parents: 4d70a21
Author: QiangCai <qi...@qq.com>
Authored: Fri Oct 27 16:06:06 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Oct 30 14:55:00 2017 +0530

----------------------------------------------------------------------
 .../carbondata/spark/util/DataLoadingUtil.scala | 300 ++++++++++++
 .../command/management/LoadTableCommand.scala   | 464 ++++++-------------
 2 files changed, 452 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5936e7fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
new file mode 100644
index 0000000..445fdbb
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import scala.collection.immutable
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.load.ValidateUtil
+
+/**
+ * the util object of data loading
+ */
+object DataLoadingUtil {
+
+  /**
+   * get data loading options and initialise default value
+   */
+  def getDataLoadingOptions(
+      carbonProperty: CarbonProperties,
+      options: immutable.Map[String, String]): mutable.Map[String, String] = {
+    val optionsFinal = scala.collection.mutable.Map[String, String]()
+    optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
+    optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
+    optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
+    optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
+    optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
+    optionsFinal.put("columndict", options.getOrElse("columndict", null))
+
+    optionsFinal.put(
+      "serialization_null_format",
+      options.getOrElse("serialization_null_format", "\\N"))
+
+    optionsFinal.put(
+      "bad_records_logger_enable",
+      options.getOrElse(
+        "bad_records_logger_enable",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
+
+    val badRecordActionValue = carbonProperty.getProperty(
+      CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+      CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+
+    optionsFinal.put(
+      "bad_records_action",
+      options.getOrElse(
+        "bad_records_action",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+          badRecordActionValue)))
+
+    optionsFinal.put(
+      "is_empty_data_bad_record",
+      options.getOrElse(
+        "is_empty_data_bad_record",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
+
+    optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
+
+    optionsFinal.put(
+      "complex_delimiter_level_1",
+      options.getOrElse("complex_delimiter_level_1", "\\$"))
+
+    optionsFinal.put(
+      "complex_delimiter_level_2",
+      options.getOrElse("complex_delimiter_level_2", "\\:"))
+
+    optionsFinal.put(
+      "dateformat",
+      options.getOrElse(
+        "dateformat",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
+
+    optionsFinal.put(
+      "global_sort_partitions",
+      options.getOrElse(
+        "global_sort_partitions",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+          null)))
+
+    optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
+
+    optionsFinal.put(
+      "batch_sort_size_inmb",
+      options.getOrElse(
+        "batch_sort_size_inmb",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+          carbonProperty.getProperty(
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
+
+    optionsFinal.put(
+      "bad_record_path",
+      options.getOrElse(
+        "bad_record_path",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+          carbonProperty.getProperty(
+            CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+            CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
+
+    val useOnePass = options.getOrElse(
+      "single_pass",
+      carbonProperty.getProperty(
+        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
+      case "true" =>
+        true
+      case "false" =>
+        // when single_pass = false  and if either alldictionarypath
+        // or columnDict is configured the do not allow load
+        if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
+            StringUtils.isNotEmpty(optionsFinal("columndict"))) {
+          throw new MalformedCarbonCommandException(
+            "Can not use all_dictionary_path or columndict without single_pass.")
+        } else {
+          false
+        }
+      case illegal =>
+        val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+        LOGGER.error(s"Can't use single_pass, because illegal syntax found: [$illegal] " +
+                     "Please set it as 'true' or 'false'")
+        false
+    }
+    optionsFinal.put("single_pass", useOnePass.toString)
+    optionsFinal
+  }
+
+  /**
+   * check whether using default value or not
+   */
+  private def checkDefaultValue(value: String, default: String) = {
+    if (StringUtils.isEmpty(value)) {
+      default
+    } else {
+      value
+    }
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   */
+  def buildCarbonLoadModel(
+      table: CarbonTable,
+      carbonProperty: CarbonProperties,
+      options: immutable.Map[String, String],
+      optionsFinal: mutable.Map[String, String],
+      carbonLoadModel: CarbonLoadModel): Unit = {
+    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setDatabaseName(table.getDatabaseName)
+    carbonLoadModel.setStorePath(table.getStorePath)
+    carbonLoadModel.setTableName(table.getFactTableName)
+    val dataLoadSchema = new CarbonDataLoadSchema(table)
+    // Need to fill dimension relation
+    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+    val sort_scope = optionsFinal("sort_scope")
+    val single_pass = optionsFinal("single_pass")
+    val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
+    val bad_records_action = optionsFinal("bad_records_action")
+    val bad_record_path = optionsFinal("bad_record_path")
+    val global_sort_partitions = optionsFinal("global_sort_partitions")
+    val dateFormat = optionsFinal("dateformat")
+    val delimeter = optionsFinal("delimiter")
+    val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
+    val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
+    val all_dictionary_path = optionsFinal("all_dictionary_path")
+    val column_dict = optionsFinal("columndict")
+    ValidateUtil.validateDateFormat(dateFormat, table, table.getFactTableName)
+    ValidateUtil.validateSortScope(table, sort_scope)
+
+    if (bad_records_logger_enable.toBoolean ||
+        LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
+      if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+        sys.error("Invalid bad records location.")
+      }
+    }
+    carbonLoadModel.setBadRecordsLocation(bad_record_path)
+
+    ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
+    carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
+    carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
+    carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
+
+    // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
+    // we should use table schema to generate file header.
+    var fileHeader = optionsFinal("fileheader")
+    val headerOption = options.get("header")
+    if (headerOption.isDefined) {
+      // whether the csv file has file header
+      // the default value is true
+      val header = try {
+        headerOption.get.toBoolean
+      } catch {
+        case ex: IllegalArgumentException =>
+          throw new MalformedCarbonCommandException(
+            "'header' option should be either 'true' or 'false'. " + ex.getMessage)
+      }
+      if (header) {
+        if (fileHeader.nonEmpty) {
+          throw new MalformedCarbonCommandException(
+            "When 'header' option is true, 'fileheader' option is not required.")
+        }
+      } else {
+        if (fileHeader.isEmpty) {
+          fileHeader = table.getCreateOrderColumn(table.getFactTableName)
+            .asScala.map(_.getColName).mkString(",")
+        }
+      }
+    }
+
+    carbonLoadModel.setDateFormat(dateFormat)
+    carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+
+    carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
+      CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+
+    carbonLoadModel.setSerializationNullFormat(
+        TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
+        optionsFinal("serialization_null_format"))
+
+    carbonLoadModel.setBadRecordsLoggerEnable(
+        TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
+
+    carbonLoadModel.setBadRecordsAction(
+        TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
+
+    carbonLoadModel.setIsEmptyDataBadRecord(
+        DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
+        optionsFinal("is_empty_data_bad_record"))
+
+    carbonLoadModel.setSortScope(sort_scope)
+    carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
+    carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
+    carbonLoadModel.setUseOnePass(single_pass.toBoolean)
+
+    if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
+        complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
+        delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
+      sys.error(s"Field Delimiter & Complex types delimiter are same")
+    } else {
+      carbonLoadModel.setComplexDelimiterLevel1(
+        CarbonUtil.delimiterConverter(complex_delimeter_level1))
+      carbonLoadModel.setComplexDelimiterLevel2(
+        CarbonUtil.delimiterConverter(complex_delimeter_level2))
+    }
+    // set local dictionary path, and dictionary file extension
+    carbonLoadModel.setAllDictPath(all_dictionary_path)
+    carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
+    carbonLoadModel.setCsvHeader(fileHeader)
+    carbonLoadModel.setColDictFilePath(column_dict)
+    carbonLoadModel.setDirectLoad(true)
+    carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
+
+    val validatedMaxColumns = CommonUtil.validateMaxColumns(
+      carbonLoadModel.getCsvHeaderColumns,
+      optionsFinal("maxcolumns"))
+
+    carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
+    if (null == carbonLoadModel.getLoadMetadataDetails) {
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5936e7fb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 9018f7b..630ee27 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, DataPro
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{CausedBy, FileUtils}
 
-import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -39,14 +38,11 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format
 import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.TableOptionConstant
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.ValidateUtil
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
 
 case class LoadTableCommand(
     databaseNameOp: Option[String],
@@ -60,89 +56,6 @@ case class LoadTableCommand(
     updateModel: Option[UpdateTableModel] = None)
   extends RunnableCommand with DataProcessCommand {
 
-  private def getFinalOptions(carbonProperty: CarbonProperties):
-  scala.collection.mutable.Map[String, String] = {
-    val optionsFinal = scala.collection.mutable.Map[String, String]()
-    optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
-    optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
-    optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
-    optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
-    optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
-    optionsFinal.put("columndict", options.getOrElse("columndict", null))
-    optionsFinal
-      .put("serialization_null_format", options.getOrElse("serialization_null_format", "\\N"))
-    optionsFinal.put("bad_records_logger_enable", options.getOrElse("bad_records_logger_enable",
-      carbonProperty
-        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
-    val badRecordActionValue = carbonProperty
-      .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-    optionsFinal.put("bad_records_action", options.getOrElse("bad_records_action", carbonProperty
-      .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-        badRecordActionValue)))
-    optionsFinal
-      .put("is_empty_data_bad_record", options.getOrElse("is_empty_data_bad_record", carbonProperty
-        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
-    optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
-    optionsFinal
-      .put("complex_delimiter_level_1", options.getOrElse("complex_delimiter_level_1", "\\$"))
-    optionsFinal
-      .put("complex_delimiter_level_2", options.getOrElse("complex_delimiter_level_2", "\\:"))
-    optionsFinal.put("dateformat", options.getOrElse("dateformat",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-        CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
-
-    optionsFinal.put("global_sort_partitions", options.getOrElse("global_sort_partitions",
-      carbonProperty
-        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)))
-
-    optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
-
-    optionsFinal.put("batch_sort_size_inmb", options.getOrElse("batch_sort_size_inmb",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
-        carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-          CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
-
-    optionsFinal.put("bad_record_path", options.getOrElse("bad_record_path",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-        carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
-
-    val useOnePass = options.getOrElse("single_pass",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
-        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
-      case "true" =>
-        true
-      case "false" =>
-        // when single_pass = false  and if either alldictionarypath
-        // or columnDict is configured the do not allow load
-        if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
-            StringUtils.isNotEmpty(optionsFinal("columndict"))) {
-          throw new MalformedCarbonCommandException(
-            "Can not use all_dictionary_path or columndict without single_pass.")
-        } else {
-          false
-        }
-      case illegal =>
-        val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-        LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
-                     "Please set it as 'true' or 'false'")
-        false
-    }
-    optionsFinal.put("single_pass", useOnePass.toString)
-    optionsFinal
-  }
-
-  private def checkDefaultValue(value: String, default: String) = {
-    if (StringUtils.isEmpty(value)) {
-      default
-    } else {
-      value
-    }
-  }
-
   override def run(sparkSession: SparkSession): Seq[Row] = {
     processData(sparkSession)
   }
@@ -158,7 +71,6 @@ case class LoadTableCommand(
     }
 
     val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
     if (relation == null) {
@@ -172,7 +84,7 @@ case class LoadTableCommand(
 
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
-    val optionsFinal = getFinalOptions(carbonProperty)
+    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
 
     val tableProperties = relation.tableMeta.carbonTable.getTableInfo
       .getFactTable.getTableProperties
@@ -183,133 +95,26 @@ case class LoadTableCommand(
           CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
 
     try {
+      val table = relation.tableMeta.carbonTable
+      val carbonLoadModel = new CarbonLoadModel()
       val factPath = if (dataFrame.isDefined) {
         ""
       } else {
         FileUtils.getPaths(
           CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
       }
-      val carbonLoadModel = new CarbonLoadModel()
-      carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
-      carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-      carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
-
-      val table = relation.tableMeta.carbonTable
-      carbonLoadModel.setTableName(table.getFactTableName)
-      val dataLoadSchema = new CarbonDataLoadSchema(table)
-      // Need to fill dimension relation
-      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-
-      val partitionLocation = relation.tableMeta.storePath + "/partition/" +
-                              relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
-                              relation.tableMeta.carbonTableIdentifier.getTableName + "/"
-      val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
-      val sort_scope = optionsFinal("sort_scope")
-      val single_pass = optionsFinal("single_pass")
-      val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
-      val bad_records_action = optionsFinal("bad_records_action")
-      val bad_record_path = optionsFinal("bad_record_path")
-      val global_sort_partitions = optionsFinal("global_sort_partitions")
-      val dateFormat = optionsFinal("dateformat")
-      val delimeter = optionsFinal("delimiter")
-      val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
-      val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
-      val all_dictionary_path = optionsFinal("all_dictionary_path")
-      val column_dict = optionsFinal("columndict")
-      ValidateUtil.validateDateFormat(dateFormat, table, tableName)
-      ValidateUtil.validateSortScope(table, sort_scope)
-
-      if (bad_records_logger_enable.toBoolean ||
-          LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
-        if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
-          sys.error("Invalid bad records location.")
-        }
-      }
-      carbonLoadModel.setBadRecordsLocation(bad_record_path)
-
-      ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
-      carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
-      carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
-      carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
-
-      // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
-      // we should use table schema to generate file header.
-      var fileHeader = optionsFinal("fileheader")
-      val headerOption = options.get("header")
-      if (headerOption.isDefined) {
-        // whether the csv file has file header
-        // the default value is true
-        val header = try {
-          headerOption.get.toBoolean
-        } catch {
-          case ex: IllegalArgumentException =>
-            throw new MalformedCarbonCommandException(
-              "'header' option should be either 'true' or 'false'. " + ex.getMessage)
-        }
-        if (header) {
-          if (fileHeader.nonEmpty) {
-            throw new MalformedCarbonCommandException(
-              "When 'header' option is true, 'fileheader' option is not required.")
-          }
-        } else {
-          if (fileHeader.isEmpty) {
-            fileHeader = table.getCreateOrderColumn(table.getFactTableName)
-              .asScala.map(_.getColName).mkString(",")
-          }
-        }
-      }
-
-      carbonLoadModel.setDateFormat(dateFormat)
-      carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
-        CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-      carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
-        CarbonCommonConstants.CARBON_DATE_FORMAT,
-        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
-      carbonLoadModel
-        .setSerializationNullFormat(
-          TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
-          optionsFinal("serialization_null_format"))
-      carbonLoadModel
-        .setBadRecordsLoggerEnable(
-          TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
-      carbonLoadModel
-        .setBadRecordsAction(
-          TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
-      carbonLoadModel
-        .setIsEmptyDataBadRecord(
-          DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
-          optionsFinal("is_empty_data_bad_record"))
-      carbonLoadModel.setSortScope(sort_scope)
-      carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
-      carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
-      carbonLoadModel.setUseOnePass(single_pass.toBoolean)
-      if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
-          complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
-          delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
-        sys.error(s"Field Delimiter & Complex types delimiter are same")
-      } else {
-        carbonLoadModel.setComplexDelimiterLevel1(
-          CarbonUtil.delimiterConverter(complex_delimeter_level1))
-        carbonLoadModel.setComplexDelimiterLevel2(
-          CarbonUtil.delimiterConverter(complex_delimeter_level2))
-      }
-      // set local dictionary path, and dictionary file extension
-      carbonLoadModel.setAllDictPath(all_dictionary_path)
-
-      val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-      try {
+      carbonLoadModel.setFactFilePath(factPath)
+      DataLoadingUtil.buildCarbonLoadModel(
+        table,
+        carbonProperty,
+        options,
+        optionsFinal,
+        carbonLoadModel
+      )
+
+      try{
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
-        carbonLoadModel.setFactFilePath(factPath)
-        carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
-        carbonLoadModel.setCsvHeader(fileHeader)
-        carbonLoadModel.setColDictFilePath(column_dict)
-        carbonLoadModel.setDirectLoad(true)
-        carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
-        val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
-          optionsFinal("maxcolumns"))
-        carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
         GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
         val storePath = relation.tableMeta.storePath
         // add the start entry for the new load in the table status file
@@ -320,11 +125,9 @@ case class LoadTableCommand(
         if (isOverwriteTable) {
           LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
         }
-        if (null == carbonLoadModel.getLoadMetadataDetails) {
-          CommonUtil.readLoadMetadataDetails(carbonLoadModel)
-        }
         if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
-            StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) {
+            StringUtils.isEmpty(carbonLoadModel.getColDictFilePath) &&
+            StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
           LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
           LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
           carbonLoadModel.setUseOnePass(false)
@@ -337,111 +140,21 @@ case class LoadTableCommand(
         if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
           FileFactory.mkdirs(metadataDirectoryPath, fileType)
         }
+        val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+        val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
         if (carbonLoadModel.getUseOnePass) {
-          val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-          val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-            .getCarbonTableIdentifier
-          val carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(storePath, carbonTableIdentifier)
-          val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
-          val dimensions = carbonTable.getDimensionByTableName(
-            carbonTable.getFactTableName).asScala.toArray
-          val colDictFilePath = carbonLoadModel.getColDictFilePath
-          if (!StringUtils.isEmpty(colDictFilePath)) {
-            carbonLoadModel.initPredefDictMap()
-            // generate predefined dictionary
-            GlobalDictionaryUtil
-              .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
-                dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath)
-          }
-          if (!StringUtils.isEmpty(all_dictionary_path)) {
-            carbonLoadModel.initPredefDictMap()
-            GlobalDictionaryUtil
-              .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
-                carbonLoadModel,
-                storePath,
-                carbonTableIdentifier,
-                dictFolderPath,
-                dimensions,
-                all_dictionary_path)
-          }
-          // dictionaryServerClient dictionary generator
-          val dictionaryServerPort = carbonProperty
-            .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
-              CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
-          val sparkDriverHost = sparkSession.sqlContext.sparkContext.
-            getConf.get("spark.driver.host")
-          carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
-          // start dictionary server when use one pass load and dimension with DICTIONARY
-          // encoding is present.
-          val allDimensions = table.getAllDimensions.asScala.toList
-          val createDictionary = allDimensions.exists {
-            carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-                               !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
-          }
-          val server: Option[DictionaryServer] = if (createDictionary) {
-            val dictionaryServer = DictionaryServer
-              .getInstance(dictionaryServerPort.toInt, carbonTable)
-            carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
-            sparkSession.sparkContext.addSparkListener(new SparkListener() {
-              override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
-                dictionaryServer.shutdown()
-              }
-            })
-            Some(dictionaryServer)
-          } else {
-            None
-          }
-          CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+          loadDataUsingOnePass(
+            sparkSession,
+            carbonProperty,
             carbonLoadModel,
-            relation.tableMeta.storePath,
             columnar,
-            partitionStatus,
-            server,
-            isOverwriteTable,
-            dataFrame,
-            updateModel)
+            partitionStatus)
         } else {
-          val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
-            val fields = dataFrame.get.schema.fields
-            import org.apache.spark.sql.functions.udf
-            // extracting only segment from tupleId
-            val getSegIdUDF = udf((tupleId: String) =>
-              CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
-            // getting all fields except tupleId field as it is not required in the value
-            var otherFields = fields.toSeq
-              .filter(field => !field.name
-                .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-              .map(field => new Column(field.name))
-
-            // extract tupleId field which will be used as a key
-            val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
-              .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
-              as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
-            // use dataFrameWithoutTupleId as dictionaryDataFrame
-            val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
-            otherFields = otherFields :+ segIdColumn
-            // use dataFrameWithTupleId as loadDataFrame
-            val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
-            (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
-          } else {
-            (dataFrame, dataFrame)
-          }
-
-          GlobalDictionaryUtil.generateGlobalDictionary(
-            sparkSession.sqlContext,
-            carbonLoadModel,
-            relation.tableMeta.storePath,
-            dictionaryDataFrame)
-          CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+          loadData(
+            sparkSession,
             carbonLoadModel,
-            relation.tableMeta.storePath,
             columnar,
-            partitionStatus,
-            None,
-            isOverwriteTable,
-            loadDataFrame,
-            updateModel)
+            partitionStatus)
         }
       } catch {
         case CausedBy(ex: NoRetryException) =>
@@ -454,6 +167,9 @@ case class LoadTableCommand(
       } finally {
         // Once the data load is successful delete the unwanted partition files
         try {
+          val partitionLocation = table.getStorePath + "/partition/" +
+                                  table.getDatabaseName + "/" +
+                                  table.getFactTableName + "/"
           val fileType = FileFactory.getFileType(partitionLocation)
           if (FileFactory.isFileExist(partitionLocation, fileType)) {
             val file = FileFactory
@@ -480,6 +196,130 @@ case class LoadTableCommand(
     Seq.empty
   }
 
+  private def loadDataUsingOnePass(
+      sparkSession: SparkSession,
+      carbonProperty: CarbonProperties,
+      carbonLoadModel: CarbonLoadModel,
+      columnar: Boolean,
+      partitionStatus: String): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      .getCarbonTableIdentifier
+    val carbonTablePath = CarbonStorePath
+      .getCarbonTablePath(carbonLoadModel.getStorePath, carbonTableIdentifier)
+    val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
+    val dimensions = carbonTable.getDimensionByTableName(
+      carbonTable.getFactTableName).asScala.toArray
+    val colDictFilePath = carbonLoadModel.getColDictFilePath
+    if (!StringUtils.isEmpty(colDictFilePath)) {
+      carbonLoadModel.initPredefDictMap()
+      // generate predefined dictionary
+      GlobalDictionaryUtil.generatePredefinedColDictionary(
+        colDictFilePath,
+        carbonTableIdentifier,
+        dimensions,
+        carbonLoadModel,
+        sparkSession.sqlContext,
+        carbonLoadModel.getStorePath,
+        dictFolderPath)
+    }
+    if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
+      carbonLoadModel.initPredefDictMap()
+      GlobalDictionaryUtil
+        .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
+          carbonLoadModel,
+          carbonLoadModel.getStorePath,
+          carbonTableIdentifier,
+          dictFolderPath,
+          dimensions,
+          carbonLoadModel.getAllDictPath)
+    }
+    // dictionaryServerClient dictionary generator
+    val dictionaryServerPort = carbonProperty
+      .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+        CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
+    val sparkDriverHost = sparkSession.sqlContext.sparkContext.
+      getConf.get("spark.driver.host")
+    carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
+    // start dictionary server when use one pass load and dimension with DICTIONARY
+    // encoding is present.
+    val allDimensions =
+      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
+    val createDictionary = allDimensions.exists {
+      carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+                         !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
+    }
+    val server: Option[DictionaryServer] = if (createDictionary) {
+      val dictionaryServer = DictionaryServer
+        .getInstance(dictionaryServerPort.toInt, carbonTable)
+      carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+      sparkSession.sparkContext.addSparkListener(new SparkListener() {
+        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+          dictionaryServer.shutdown()
+        }
+      })
+      Some(dictionaryServer)
+    } else {
+      None
+    }
+    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+      carbonLoadModel,
+      carbonLoadModel.getStorePath,
+      columnar,
+      partitionStatus,
+      server,
+      isOverwriteTable,
+      dataFrame,
+      updateModel)
+  }
+
+  private def loadData(
+      sparkSession: SparkSession,
+      carbonLoadModel: CarbonLoadModel,
+      columnar: Boolean,
+      partitionStatus: String): Unit = {
+    val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
+      val fields = dataFrame.get.schema.fields
+      import org.apache.spark.sql.functions.udf
+      // extracting only segment from tupleId
+      val getSegIdUDF = udf((tupleId: String) =>
+        CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
+      // getting all fields except tupleId field as it is not required in the value
+      var otherFields = fields.toSeq
+        .filter(field => !field.name
+          .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+        .map(field => new Column(field.name))
+
+      // extract tupleId field which will be used as a key
+      val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
+        .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
+        as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
+      // use dataFrameWithoutTupleId as dictionaryDataFrame
+      val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
+      otherFields = otherFields :+ segIdColumn
+      // use dataFrameWithTupleId as loadDataFrame
+      val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
+      (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
+    } else {
+      (dataFrame, dataFrame)
+    }
+
+    GlobalDictionaryUtil.generateGlobalDictionary(
+      sparkSession.sqlContext,
+      carbonLoadModel,
+      carbonLoadModel.getStorePath,
+      dictionaryDataFrame)
+    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+      carbonLoadModel,
+      carbonLoadModel.getStorePath,
+      columnar,
+      partitionStatus,
+      None,
+      isOverwriteTable,
+      loadDataFrame,
+      updateModel)
+  }
+
   private def updateTableMetadata(
       carbonLoadModel: CarbonLoadModel,
       sqlContext: SQLContext,


[2/3] carbondata git commit: [CARBONDATA-1173] Stream ingestion - write path framework

Posted by ja...@apache.org.
[CARBONDATA-1173] Stream ingestion - write path framework

This closes #1064


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a9d951ea
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a9d951ea
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a9d951ea

Branch: refs/heads/streaming_ingest
Commit: a9d951eaa3cdcb0b1b55dfbe8a94819a45250989
Parents: 5936e7f
Author: Aniket Adnaik <an...@gmail.com>
Authored: Thu Jun 15 11:57:43 2017 -0700
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Oct 30 14:56:57 2017 +0530

----------------------------------------------------------------------
 .../streaming/CarbonStreamingCommitInfo.java    | 108 ++++++++++
 .../streaming/CarbonStreamingConstants.java     |  25 +++
 .../streaming/CarbonStreamingMetaStore.java     |  40 ++++
 .../streaming/CarbonStreamingMetaStoreImpl.java |  56 ++++++
 .../core/util/path/CarbonTablePath.java         |  10 +
 .../streaming/CarbonStreamingOutputFormat.java  |  66 +++++++
 .../streaming/CarbonStreamingRecordWriter.java  | 196 +++++++++++++++++++
 .../org/apache/spark/sql/CarbonSource.scala     |  39 +++-
 .../CarbonStreamingOutpurWriteFactory.scala     |  88 +++++++++
 .../streaming/CarbonStreamingOutputWriter.scala |  98 ++++++++++
 10 files changed, 719 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
new file mode 100644
index 0000000..6cf303a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+/**
+ * Commit info for streaming writes
+ * The commit info can be used to recover valid offset in the file
+ * in the case of write failure.
+ */
+public class CarbonStreamingCommitInfo {
+
+  private String dataBase;
+
+  private String table;
+
+  private long commitTime;
+
+  private long segmentID;
+
+  private String partitionID;
+
+  private long batchID;
+
+  private String fileOffset;
+
+  private long transactionID;     // future use
+
+  public  CarbonStreamingCommitInfo(
+
+      String dataBase,
+
+      String table,
+
+      long commitTime,
+
+      long segmentID,
+
+      String partitionID,
+
+      long batchID) {
+
+    this.dataBase = dataBase;
+
+    this.table = table;
+
+    this.commitTime = commitTime;
+
+    this.segmentID = segmentID;
+
+    this.partitionID = partitionID;
+
+    this.batchID = batchID;
+
+    this.transactionID = -1;
+  }
+
+  public String getDataBase() {
+    return dataBase;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public long getCommitTime() {
+    return commitTime;
+  }
+
+  public long getSegmentID() {
+    return segmentID;
+  }
+
+  public String getPartitionID() {
+    return partitionID;
+  }
+
+  public long getBatchID() {
+    return batchID;
+  }
+
+  public String getFileOffset() {
+    return fileOffset;
+  }
+
+  public long getTransactionID() {
+    return transactionID;
+  }
+
+  @Override
+  public String toString() {
+    return dataBase + "." + table + "." + segmentID + "$" + partitionID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
new file mode 100644
index 0000000..db7186f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+public class CarbonStreamingConstants {
+
+  public static final long DEFAULT_CARBON_STREAM_FILE_BLOCK_SIZE = 1024 * 1024 * 1024; // 1GB
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
new file mode 100644
index 0000000..fa3746c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+
+import java.io.IOException;
+
+/**
+ * Generic interface for storing commit info for streaming ingest
+ */
+public interface CarbonStreamingMetaStore {
+
+  public CarbonStreamingCommitInfo getStreamingCommitInfo(
+          String dataBase,
+          String table,
+          long segmentID,
+          String partitionID) throws IOException;
+
+  public void updateStreamingCommitInfo(
+          CarbonStreamingMetaStore commitInfo) throws IOException;
+
+  public void recoverStreamingData(
+          CarbonStreamingCommitInfo commitInfo) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
new file mode 100644
index 0000000..0afe962
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+import java.io.IOException;
+
+/**
+ *  JSON format can be used to store the metadata
+ */
+public class CarbonStreamingMetaStoreImpl implements CarbonStreamingMetaStore {
+
+  /**
+   * get commit info from metastore
+   */
+  public CarbonStreamingCommitInfo getStreamingCommitInfo(
+          String dataBase,
+          String table,
+          long segmentID,
+          String partitionID) throws IOException {
+
+    return null;
+
+  }
+
+  /**
+   * Update commit info in metastore
+   */
+  public void updateStreamingCommitInfo(
+          CarbonStreamingMetaStore commitInfo) throws IOException {
+
+  }
+
+  /**
+   * Recover streaming data using valid offset in commit info
+   */
+  public void recoverStreamingData(
+          CarbonStreamingCommitInfo commitInfo) throws IOException {
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 7be9c76..3c6bb3c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -51,6 +51,16 @@ public class CarbonTablePath extends Path {
   protected static final String INDEX_FILE_EXT = ".carbonindex";
   protected static final String DELETE_DELTA_FILE_EXT = ".deletedelta";
 
+  /**
+   * Streaming ingest related paths
+   */
+  protected static final String STREAM_PREFIX = "Streaming";
+  protected static final String STREAM_FILE_NAME_EXT = ".carbondata.stream";
+  protected static final String STREAM_FILE_BEING_WRITTEN = "in-progress.carbondata.stream";
+  protected static final String STREAM_FILE_BEING_WRITTEN_META = "in-progress.meta";
+  protected static final String STREAM_COMPACTION_STATUS = "streaming_compaction_status";
+  protected static final String STREAM_FILE_LOCK = "streaming_in_use.lock";
+
   protected String tablePath;
   protected CarbonTableIdentifier carbonTableIdentifier;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
new file mode 100644
index 0000000..350684e
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.streaming.CarbonStreamingConstants;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+
+/**
+ * Output format to write streaming data to carbondata file
+ *
+ * @param <V> - type of record
+ */
+public class CarbonStreamingOutputFormat<K, V> extends FileOutputFormat<K, V> {
+
+  public static long getBlockSize(Configuration conf) {
+    return conf.getLong("dfs.block.size",
+            CarbonStreamingConstants.DEFAULT_CARBON_STREAM_FILE_BLOCK_SIZE);
+  }
+
+  public static void setBlockSize(Configuration conf, long blockSize) {
+    conf.setLong("dfs.block.size", blockSize);
+  }
+
+  /**
+   * When getRecordWriter may need to override
+   * to provide correct path including streaming segment name
+   */
+  @Override
+  public CarbonStreamingRecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
+          throws IOException, InterruptedException {
+
+    Configuration conf = job.getConfiguration();
+
+    String keyValueSeparator = conf.get(
+            CSVInputFormat.DELIMITER,
+            CSVInputFormat.DELIMITER_DEFAULT);
+
+    return new CarbonStreamingRecordWriter<K, V>(
+            conf,
+            getDefaultWorkFile(job, null),
+            keyValueSeparator);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
new file mode 100644
index 0000000..9d1951f
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+
+public class CarbonStreamingRecordWriter<K,V> extends RecordWriter<K, V> {
+
+  private static final String utf8 = "UTF-8";
+
+  private static final byte[] newline;
+
+  static {
+
+    try {
+
+      newline = "\n".getBytes(utf8);
+
+    } catch (UnsupportedEncodingException uee) {
+
+      throw new IllegalArgumentException("Can't find " + utf8 + " encoding");
+    }
+  }
+
+  private FSDataOutputStream outputStream;
+
+  private FileSystem fs;
+
+  private Path file;
+
+  private volatile boolean isClosed;
+
+  private final byte[] keyValueSeparator;
+
+  public void initOut() throws IOException {
+
+    outputStream = fs.create(file, false);
+
+    isClosed = false;
+  }
+
+  public CarbonStreamingRecordWriter(
+          Configuration conf,
+          Path file,
+          String keyValueSeparator) throws IOException {
+
+    this.file = file;
+
+    fs = FileSystem.get(conf);
+
+    outputStream = fs.create(file, false);
+
+    isClosed = false;
+
+    try {
+
+      this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
+
+    } catch (UnsupportedEncodingException uee) {
+
+      throw new IllegalArgumentException("Can't find " + utf8 + "encoding");
+
+    }
+
+  }
+
+  public CarbonStreamingRecordWriter(
+          Configuration conf,
+          Path file) throws IOException {
+
+    this(conf, file, ",");
+
+  }
+
+  /**
+   *  Write Object to byte stream.
+   */
+
+  private void writeObject(Object o) throws IOException {
+
+    if (o instanceof Text) {
+      Text to = (Text)o;
+
+      outputStream.write(to.getBytes(), 0, to.getLength());
+
+    } else {
+
+      outputStream.write(o.toString().getBytes(utf8));
+
+    }
+  }
+
+  /**
+   * Write streaming data as text file (temporary)
+   */
+
+  @Override
+  public synchronized void write(K key, V value) throws IOException {
+
+    boolean isNULLKey = key == null || key instanceof NullWritable;
+
+    boolean isNULLValue = value == null || value instanceof NullWritable;
+
+    if (isNULLKey && isNULLValue) {
+
+      return;
+    }
+
+    if (!isNULLKey) {
+
+      writeObject(key);
+    }
+
+    if (!isNULLKey || !isNULLValue) {
+
+      outputStream.write(keyValueSeparator);
+    }
+
+    if (!isNULLValue) {
+
+      writeObject(value);
+    }
+
+    outputStream.write(newline);
+  }
+
+  private void closeInternal() throws IOException {
+
+    if (!isClosed) {
+
+      outputStream.close();
+
+      isClosed = true;
+    }
+
+  }
+
+  public void flush() throws IOException {
+
+    outputStream.hflush();
+  }
+
+  public long getOffset() throws IOException {
+
+    return outputStream.getPos();
+  }
+
+  public void commit(boolean finalCommit) throws IOException {
+
+    closeInternal();
+
+    Path commitFile = new Path(file.getParent(),
+            CarbonTablePath.getCarbonDataPrefix() + System.currentTimeMillis());
+
+    fs.rename(file, commitFile);
+
+    if (!finalCommit) {
+      initOut();
+    }
+  }
+
+  @Override
+  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+
+    closeInternal();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index f4f8b75..d496de2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -21,16 +21,19 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.commons.lang.StringUtils
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory}
 import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
 import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
+import org.apache.spark.sql.types.{StringType, StructType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -41,12 +44,13 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
+
 /**
  * Carbon relation provider compliant to data source api.
  * Creates carbon relations
  */
 class CarbonSource extends CreatableRelationProvider with RelationProvider
-  with SchemaRelationProvider with DataSourceRegister {
+  with SchemaRelationProvider with DataSourceRegister with FileFormat  {
 
   override def shortName(): String = "carbondata"
 
@@ -54,7 +58,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
   override def createRelation(sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
     CarbonEnv.getInstance(sqlContext.sparkSession)
-    // if path is provided we can directly create Hadoop relation. \
+    // if path is provided we can directly create Hadoop relation.
     // Otherwise create datasource relation
     parameters.get("tablePath") match {
       case Some(path) => CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
@@ -178,7 +182,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
   /**
    * Returns the path of the table
    *
-   * @param sparkSession
+     * @param sparkSession
    * @param dbName
    * @param tableName
    * @return
@@ -203,11 +207,32 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         (relation.tableMeta.tablePath, parameters)
       }
     } catch {
-      case ex: Exception =>
-        throw new Exception(s"Do not have $dbName and $tableName", ex)
+        case ex: Exception =>
+          throw new Exception(s"Do not have $dbName and $tableName", ex)
     }
   }
 
+  /**
+   * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation can
+   * be put here.  For example, user defined output committer can be configured here
+   * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
+   */
+  def prepareWrite(
+    sparkSession: SparkSession,
+    job: Job,
+    options: Map[String, String],
+    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
+
+  /**
+   * When possible, this method should return the schema of the given `files`.  When the format
+   * does not support inference, or no valid files are given should return None.  In these cases
+   * Spark will require that user specify the schema manually.
+   */
+  def inferSchema(
+    sparkSession: SparkSession,
+    options: Map[String, String],
+    files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType))
+
 }
 
 object CarbonSource {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
new file mode 100644
index 0000000..be69885
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
@@ -0,0 +1,88 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.streaming
+
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+
+class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
+
+ /**
+  * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
+  * this method gets called by each task on executor side
+  * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
+  *
+  * @param path Path to write the file.
+  * @param dataSchema Schema of the rows to be written. Partition columns are not
+  *                   included in the schema if the relation being written is
+  *                   partitioned.
+  * @param context The Hadoop MapReduce task context.
+  */
+
+  override def newInstance(
+    path: String,
+
+    dataSchema: StructType,
+
+    context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
+
+        new CarbonStreamingOutputWriter(path, context)
+  }
+
+  override def getFileExtension(context: TaskAttemptContext): String = {
+
+    CarbonTablePath.STREAM_FILE_NAME_EXT
+  }
+
+}
+
+object CarbonStreamingOutpurWriterFactory {
+
+  private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
+
+  def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
+
+    if (writers.contains(path)) {
+      throw new IllegalArgumentException(path + "writer already exists")
+    }
+
+    writers.put(path, writer)
+  }
+
+  def getWriter(path: String): CarbonStreamingOutputWriter = {
+
+    writers.get(path)
+  }
+
+  def containsWriter(path: String): Boolean = {
+
+    writers.containsKey(path)
+  }
+
+  def removeWriter(path: String): Unit = {
+
+    writers.remove(path)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
new file mode 100644
index 0000000..dfc8ff3
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{NullWritable, Text}
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.OutputWriter
+import org.apache.spark.sql.Row
+
+import org.apache.carbondata.hadoop.streaming.{CarbonStreamingOutputFormat, CarbonStreamingRecordWriter}
+
+class CarbonStreamingOutputWriter (
+    path: String,
+    context: TaskAttemptContext)
+    extends OutputWriter {
+
+  private[this] val buffer = new Text()
+
+  private val recordWriter: CarbonStreamingRecordWriter[NullWritable, Text] = {
+
+    val outputFormat = new CarbonStreamingOutputFormat[NullWritable, Text] () {
+
+      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String) : Path = {
+        new Path(path)
+      }
+
+    /*
+     May need to override
+     def getOutputCommiter(c: TaskAttemptContext): OutputCommitter = {
+      null
+    }
+    */
+
+    }
+
+    outputFormat.
+      getRecordWriter(context).asInstanceOf[CarbonStreamingRecordWriter[NullWritable, Text]]
+  }
+
+  override def write(row: Row): Unit = {
+
+    throw new UnsupportedOperationException("call writeInternal")
+
+  }
+
+  override protected [sql] def writeInternal(row: InternalRow): Unit = {
+
+    val utf8string = row.getUTF8String(0)
+
+    buffer.set(utf8string.getBytes)
+
+    recordWriter.write(NullWritable.get(), buffer)
+
+  }
+
+  def getpath: String = path
+
+  override def close(): Unit = {
+
+    recordWriter.close(context)
+
+  }
+
+  def flush(): Unit = {
+
+    recordWriter.flush()
+
+  }
+
+  def getPos(): Long = {
+
+    recordWriter.getOffset()
+
+  }
+
+  def commit(finalCommit: Boolean): Unit = {
+
+    recordWriter.commit(finalCommit)
+
+  }
+}


[3/3] carbondata git commit: [CARBONDATA-1174] Streaming Ingestion - schema validation and streaming examples

Posted by ja...@apache.org.
[CARBONDATA-1174] Streaming Ingestion - schema validation and streaming examples

1.schema validation of input data if its from a file source when schema is specified.
2.added streaming examples - for file stream and socket stream sources

This closes #1352


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8c987392
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8c987392
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8c987392

Branch: refs/heads/streaming_ingest
Commit: 8c98739279e5ffe49b70ae6664d19a7f596cf18c
Parents: a9d951e
Author: Aniket Adnaik <an...@gmail.com>
Authored: Thu Jun 15 11:57:43 2017 -0700
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Oct 30 14:56:57 2017 +0530

----------------------------------------------------------------------
 .../streaming/CarbonStreamingCommitInfo.java    |   6 +-
 ...CarbonStreamingIngestFileSourceExample.scala | 146 +++++++++++++
 ...rbonStreamingIngestSocketSourceExample.scala | 160 ++++++++++++++
 .../examples/utils/StreamingExampleUtil.scala   | 145 +++++++++++++
 .../org/apache/spark/sql/CarbonSource.scala     | 210 +++++++++++++++++--
 .../CarbonStreamingOutpurWriteFactory.scala     |  88 --------
 .../CarbonStreamingOutputWriteFactory.scala     |  88 ++++++++
 .../CarbonSourceSchemaValidationTest.scala      |  61 ++++++
 8 files changed, 801 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
index 6cf303a..2027566 100644
--- a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
@@ -36,7 +36,7 @@ public class CarbonStreamingCommitInfo {
 
   private long batchID;
 
-  private String fileOffset;
+  private long fileOffset;
 
   private long transactionID;     // future use
 
@@ -67,6 +67,8 @@ public class CarbonStreamingCommitInfo {
     this.batchID = batchID;
 
     this.transactionID = -1;
+
+    this.fileOffset = 0;
   }
 
   public String getDataBase() {
@@ -93,7 +95,7 @@ public class CarbonStreamingCommitInfo {
     return batchID;
   }
 
-  public String getFileOffset() {
+  public long getFileOffset() {
     return fileOffset;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
new file mode 100644
index 0000000..ebe0a5c
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.streaming.ProcessingTime
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.utils.{StreamingExampleUtil}
+
+/**
+ * Covers spark structured streaming scenario where user streams data
+ * from a file source (input source) and write into carbondata table(output sink).
+ * This example uses csv file as a input source and writes
+ * into target carbon table. The target carbon table must exist.
+ */
+
+object CarbonStreamingIngestFileSourceExample {
+
+  def main(args: Array[String]) {
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target"
+    val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
+    val streamTableName = s"_carbon_file_stream_table_"
+    val streamTablePath = s"$storeLocation/default/$streamTableName"
+    val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    // cleanup residual files, if any
+    StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+
+    import org.apache.spark.sql.CarbonSession._
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("CarbonFileStreamingExample")
+      .config("spark.sql.warehouse.dir", warehouse)
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+    spark.sparkContext.setLogLevel("ERROR")
+
+    // Writes Dataframe to CarbonData file:
+    import spark.implicits._
+    import org.apache.spark.sql.types._
+    // drop table if exists previously
+    spark.sql(s"DROP TABLE IF EXISTS $streamTableName")
+    // Create target carbon table
+    spark.sql(
+      s"""
+         | CREATE TABLE $streamTableName(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT
+         | )
+         | STORED BY 'carbondata'""".stripMargin)
+
+    // Generate CSV data and write to CSV file
+    StreamingExampleUtil.generateCSVDataFile(spark, 1, csvDataDir, SaveMode.Overwrite)
+
+    // scalastyle:off
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$csvDataDir'
+         | INTO TABLE $streamTableName
+         | OPTIONS('FILEHEADER'='id,name,city,salary'
+         | )""".stripMargin)
+    // scalastyle:on
+
+    // check initial table data
+    spark.sql(s""" SELECT * FROM $streamTableName """).show()
+
+    // define custom schema
+    val inputSchema = new StructType().
+      add("id", "integer").
+      add("name", "string").
+      add("city", "string").
+      add("salary", "float")
+
+    // setup csv file as a input streaming source
+    val csvReadDF = spark.readStream.
+      format("csv").
+      option("sep", ",").
+      schema(inputSchema).
+      option("path", csvDataDir).
+      option("header", "true").
+      load()
+
+    // Write data from csv format streaming source to carbondata target format
+    // set trigger to every 1 second
+    val qry = csvReadDF.writeStream
+      .format("carbondata")
+      .trigger(ProcessingTime("1 seconds"))
+      .option("checkpointLocation", ckptLocation)
+      .option("path", streamTablePath)
+      .start()
+
+    // In a separate thread append data every 2 seconds to existing csv
+    val gendataThread: Thread = new Thread() {
+      override def run(): Unit = {
+        for (i <- 1 to 5) {
+          Thread.sleep(2)
+          StreamingExampleUtil.
+            generateCSVDataFile(spark, i * 10 + 1, csvDataDir, SaveMode.Append)
+        }
+      }
+    }
+    gendataThread.start()
+    gendataThread.join()
+
+    // stop streaming execution after 5 sec delay
+    Thread.sleep(5000)
+    qry.stop()
+
+    // verify streaming data is added into the table
+    spark.sql(s""" SELECT * FROM $streamTableName """).show()
+
+    // Cleanup residual files and table data
+    StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+    spark.sql(s"DROP TABLE IF EXISTS $streamTableName")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
new file mode 100644
index 0000000..bbf7ef2
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.streaming.ProcessingTime
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.utils.StreamingExampleUtil
+
+
+/**
+ * This example reads stream data from socket source (input) and write into
+ * existing carbon table(output).
+ *
+ * It uses localhost and port (9999) to create a socket and write to it.
+ * Exmaples uses two threads one to write data to socket and other thread
+ * to receive data from socket and write into carbon table.
+ */
+
+// scalastyle:off println
+object CarbonStreamingIngestSocketSourceExample {
+
+  def main(args: Array[String]) {
+
+    // setup localhost and port number
+    val host = "localhost"
+    val port = 9999
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+      + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target"
+    val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
+    val streamTableName = s"_carbon_socket_stream_table_"
+    val streamTablePath = s"$storeLocation/default/$streamTableName"
+    val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    // cleanup residual files, if any
+    StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+
+    import org.apache.spark.sql.CarbonSession._
+    val spark = SparkSession
+      .builder()
+      .master("local[2]")
+      .appName("CarbonNetworkStreamingExample")
+      .config("spark.sql.warehouse.dir", warehouse)
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+    spark.sparkContext.setLogLevel("ERROR")
+
+    // Writes Dataframe to CarbonData file:
+    import spark.implicits._
+
+    // drop table if exists previously
+    spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
+
+    // Create target carbon table and populate with initial data
+    spark.sql(
+      s"""
+         | CREATE TABLE ${streamTableName}(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT
+         | )
+         | STORED BY 'carbondata'""".stripMargin)
+
+    // Generate CSV data and write to CSV file
+    StreamingExampleUtil.generateCSVDataFile(spark, 1, csvDataDir, SaveMode.Overwrite)
+
+    // load the table
+    // scalastyle:off
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$csvDataDir'
+         | INTO TABLE ${streamTableName}
+         | OPTIONS('FILEHEADER'='id,name,city,salary'
+         | )""".stripMargin)
+
+
+    spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
+
+    // Create server socket in main thread
+    val serverSocket = StreamingExampleUtil.createserverSocket(host, port)
+
+    // Start client thread to receive streaming data and write into carbon
+    val streamWriterThread: Thread = new Thread() {
+      override def run(): Unit= {
+
+        try {
+          // Setup read stream to read input data from socket
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", host)
+            .option("port", port)
+            .load()
+
+          // Write data from socket stream to carbondata file
+          val qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime("2 seconds"))
+            .option("checkpointLocation", ckptLocation)
+            .option("path", streamTablePath)
+            .start()
+
+          qry.awaitTermination()
+        } catch {
+          case e: InterruptedException => println("Done reading and writing streaming data")
+        }
+      }
+    }
+    streamWriterThread.start()
+
+    // wait for client to connection request and accept
+    val clientSocket = StreamingExampleUtil.waitToForClientConnection(serverSocket.get)
+
+    // Write to client's connected socket every 2 seconds, for 5 times
+    StreamingExampleUtil.writeToSocket(clientSocket, 5, 2, 11)
+
+    Thread.sleep(2000)
+    // interrupt client thread to stop streaming query
+    streamWriterThread.interrupt()
+    //wait for client thread to finish
+    streamWriterThread.join()
+
+    //Close the server socket
+    serverSocket.get.close()
+
+    // verify streaming data is added into the table
+    // spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
+
+    // Cleanup residual files and table data
+    StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+    spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
new file mode 100644
index 0000000..6eab491
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples.utils
+
+import java.io.{IOException, PrintWriter}
+import java.net.{ServerSocket, Socket}
+
+import scala.tools.nsc.io.Path
+
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+
+
+/**
+ * Utility functions for streaming ingest examples
+ */
+
+// scalastyle:off println
+object StreamingExampleUtil {
+
+  // Clean up directories recursively, accepts variable arguments
+  def cleanUpDir(dirPaths: String*): Unit = {
+
+      // if (args.length < 1) {
+    if (dirPaths.size < 1) {
+      System.err.println("Usage: StreamingCleanupUtil <dirPath> [dirpath]...")
+      System.exit(1)
+    }
+
+    var i = 0
+    while (i < dirPaths.size) {
+      try {
+        val path: Path = Path(dirPaths(i))
+        path.deleteRecursively()
+      } catch {
+        case ioe: IOException => println("IO Exception while deleting files recursively" + ioe)
+      }
+      i = i + 1
+    }
+  }
+
+  // Generates csv data and write to csv files at given path
+  def generateCSVDataFile(spark: SparkSession,
+                        idStart: Int,
+                        csvDirPath: String,
+                        saveMode: SaveMode): Unit = {
+    // Create csv data frame file
+    val csvRDD = spark.sparkContext.parallelize(1 to 10)
+      .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id))
+      val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary")
+
+
+    csvDataDF.write
+      .option("header", "false")
+      .mode(saveMode)
+      .csv(csvDirPath)
+  }
+
+  // Generates csv data frame and returns to caller
+  def generateCSVDataDF(spark: SparkSession,
+                        idStart: Int): DataFrame = {
+    // Create csv data frame file
+    val csvRDD = spark.sparkContext.parallelize(1 to 10)
+      .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id))
+    val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary")
+    csvDataDF
+  }
+
+  // Create server socket for socket streaming source
+  def createserverSocket(host: String, port: Int): Option[ServerSocket] = {
+    try {
+      Some(new ServerSocket(port))
+    } catch {
+      case e: java.net.ConnectException =>
+        println("Error Connecting to" + host + ":" + port, e)
+        None
+    }
+  }
+
+  // Create server socket for socket streaming source
+  def waitToForClientConnection(serverSocket: ServerSocket): Socket = {
+    serverSocket.accept()
+  }
+
+  // Create server socket for socket streaming source
+  def closeServerSocket(serverSocket: ServerSocket): Unit = {
+    serverSocket.close()
+  }
+
+  // write periodically on given socket
+  def writeToSocket(clientSocket: Socket,
+                   iterations: Int,
+                   delay: Int,
+                   startID: Int): Unit = {
+
+    var nItr = 10
+    var nDelay = 5
+
+    // iterations range check
+    if (iterations >= 1 || iterations <= 50) {
+      nItr = iterations
+    } else {
+      println("Number of iterations exceeds limit. Setting to default 10 iterations")
+    }
+
+    // delay range check (1 second to 60 seconds)
+    if (delay >= 1 || delay <= 60) {
+      nDelay = delay
+    } else {
+      println("Delay exceeds the limit. Setting it to default 2 seconds")
+    }
+
+    val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+
+    var j = startID
+
+    for (i <- startID to startID + nItr) {
+      // write 5 records per iteration
+      for (id <- j to j + 5 ) {
+        socketWriter.println(id.toString + ", name_" + i
+          + ", city_" + i + ", " + (i*10000.00).toString)
+      }
+      j = j + 5
+      socketWriter.flush()
+      Thread.sleep(nDelay*1000)
+    }
+    socketWriter.close()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index d496de2..6eacb19 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -17,14 +17,20 @@
 
 package org.apache.spark.sql
 
+import java.io.{BufferedWriter, FileWriter, IOException}
+import java.util.UUID
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.language.implicitConversions
 
 import org.apache.commons.lang.StringUtils
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce.Job
+import org.apache.parquet.schema.InvalidSchemaException
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory}
 import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
@@ -33,10 +39,12 @@ import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types._
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -44,7 +52,6 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
-
 /**
  * Carbon relation provider compliant to data source api.
  * Creates carbon relations
@@ -52,7 +59,11 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 class CarbonSource extends CreatableRelationProvider with RelationProvider
   with SchemaRelationProvider with DataSourceRegister with FileFormat  {
 
-  override def shortName(): String = "carbondata"
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  override def shortName(): String = {
+    "carbondata"
+  }
 
   // will be called if hive supported create table command is provided
   override def createRelation(sqlContext: SQLContext,
@@ -181,8 +192,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
 
   /**
    * Returns the path of the table
-   *
-     * @param sparkSession
+   * @param sparkSession
    * @param dbName
    * @param tableName
    * @return
@@ -217,22 +227,196 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
    * be put here.  For example, user defined output committer can be configured here
    * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
    */
-  def prepareWrite(
+  override def prepareWrite(
     sparkSession: SparkSession,
     job: Job,
     options: Map[String, String],
-    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
+    dataSchema: StructType): OutputWriterFactory = {
+
+    // Check if table with given path exists
+    // validateTable(options.get("path").get)
+    validateTable(options("path"))
+
+    /* Check id streaming data schema matches with carbon table schema
+     * Data from socket source does not have schema attached to it,
+     * Following check is to ignore schema validation for socket source.
+     */
+    if (!(dataSchema.size.equals(1) &&
+      dataSchema.fields(0).dataType.equals(StringType))) {
+      val path = options.get("path")
+      val tablePath: String = path match {
+        case Some(value) => value
+        case None => ""
+      }
+
+      val carbonTableSchema: org.apache.carbondata.format.TableSchema =
+        getTableSchema(sparkSession: SparkSession, tablePath: String)
+      val isSchemaValid = validateSchema(carbonTableSchema, dataSchema)
+
+      if(!isSchemaValid) {
+        LOGGER.error("Schema Validation Failed: streaming data schema"
+          + "does not match with carbon table schema")
+        throw new InvalidSchemaException("Schema Validation Failed : " +
+          "streaming data schema does not match with carbon table schema")
+      }
+    }
+    new CarbonStreamingOutputWriterFactory()
+  }
 
   /**
-   * When possible, this method should return the schema of the given `files`.  When the format
-   * does not support inference, or no valid files are given should return None.  In these cases
-   * Spark will require that user specify the schema manually.
+   * Read schema from existing carbon table
+   * @param sparkSession
+   * @param tablePath carbon table path
+   * @return TableSchema read from provided table path
    */
-  def inferSchema(
+  private def getTableSchema(
+    sparkSession: SparkSession,
+    tablePath: String): org.apache.carbondata.format.TableSchema = {
+
+    val formattedTablePath = tablePath.replace('\\', '/')
+    val names = formattedTablePath.split("/")
+    if (names.length < 3) {
+      throw new IllegalArgumentException("invalid table path: " + tablePath)
+    }
+    val tableName : String = names(names.length - 1)
+    val dbName : String = names(names.length - 2)
+    val storePath = formattedTablePath.substring(0,
+      formattedTablePath.lastIndexOf
+      (dbName.concat(CarbonCommonConstants.FILE_SEPARATOR)
+        .concat(tableName)) - 1)
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val thriftTableInfo: org.apache.carbondata.format.TableInfo =
+      metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession)
+
+    val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table
+    factTable
+  }
+
+  /**
+   * Validates streamed schema against existing table schema
+   * @param carbonTableSchema existing carbon table schema
+   * @param dataSchema streamed data schema
+   * @return true if schema validation is successful else false
+   */
+  private def validateSchema(
+      carbonTableSchema: org.apache.carbondata.format.TableSchema,
+      dataSchema: StructType): Boolean = {
+
+    val columnnSchemaValues = carbonTableSchema.getTable_columns
+      .asScala.sortBy(_.schemaOrdinal)
+
+    var columnDataTypes = new ListBuffer[String]()
+    columnnSchemaValues.foreach(columnDataType =>
+      columnDataTypes.append(columnDataType.data_type.toString))
+    val tableColumnDataTypeList = columnDataTypes.toList
+
+    var streamSchemaDataTypes = new ListBuffer[String]()
+    dataSchema.fields.foreach(item => streamSchemaDataTypes
+      .append(mapStreamingDataTypeToString(item.dataType.toString)))
+    val streamedDataTypeList = streamSchemaDataTypes.toList
+
+    val isValid = tableColumnDataTypeList == streamedDataTypeList
+    isValid
+  }
+
+  /**
+   * Maps streamed datatype to carbon datatype
+   * @param dataType
+   * @return String
+   */
+  def mapStreamingDataTypeToString(dataType: String): String = {
+    import org.apache.carbondata.format.DataType
+    dataType match {
+      case "IntegerType" => DataType.INT.toString
+      case "StringType" => DataType.STRING.toString
+      case "DateType" => DataType.DATE.toString
+      case "DoubleType" => DataType.DOUBLE.toString
+      case "FloatType" => DataType.DOUBLE.toString
+      case "LongType" => DataType.LONG.toString
+      case "ShortType" => DataType.SHORT.toString
+      case "TimestampType" => DataType.TIMESTAMP.toString
+    }
+  }
+
+  /**
+   * Validates if given table exists or throws exception
+   * @param tablePath existing carbon table path
+   * @return None
+   */
+  private def validateTable(tablePath: String): Unit = {
+
+    val formattedTablePath = tablePath.replace('\\', '/')
+    val names = formattedTablePath.split("/")
+    if (names.length < 3) {
+      throw new IllegalArgumentException("invalid table path: " + tablePath)
+    }
+    val tableName : String = names(names.length - 1)
+    val dbName : String = names(names.length - 2)
+    val storePath = formattedTablePath.substring(0,
+      formattedTablePath.lastIndexOf
+      (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
+        .concat(tableName)).toString) - 1)
+    val absoluteTableIdentifier: AbsoluteTableIdentifier =
+      new AbsoluteTableIdentifier(storePath,
+        new CarbonTableIdentifier(dbName, tableName,
+          UUID.randomUUID().toString))
+
+    if (!checkIfTableExists(absoluteTableIdentifier)) {
+      throw new NoSuchTableException(dbName, tableName)
+    }
+  }
+
+  /**
+   * Checks if table exists by checking its schema file
+   * @param absoluteTableIdentifier
+   * @return Boolean
+   */
+  private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean = {
+    val carbonTablePath: CarbonTablePath = CarbonStorePath
+      .getCarbonTablePath(absoluteTableIdentifier)
+    val schemaFilePath: String = carbonTablePath.getSchemaFilePath
+    FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
+      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
+      FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)
+  }
+
+  /**
+   * If user wants to stream data from carbondata table source
+   * and if following conditions are true:
+   *    1. No schema provided by the user in readStream()
+   *    2. spark.sql.streaming.schemaInference is set to true
+   * carbondata can infer table schema from a valid table path
+   * The schema inference is not mandatory, but good have.
+   * When possible, this method should return the schema of the given `files`.
+   * If the format does not support schema inference, or no valid files
+   * are given it should return None. In these cases Spark will require that
+   * user specify the schema manually.
+   */
+  override def inferSchema(
     sparkSession: SparkSession,
     options: Map[String, String],
-    files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType))
+    files: Seq[FileStatus]): Option[StructType] = {
+
+    val path = options.get("path")
+    val tablePath: String = path match {
+      case Some(value) => value
+      case None => ""
+    }
+    // Check if table with given path exists
+    validateTable(tablePath)
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonTableSchema: org.apache.carbondata.format.TableSchema =
+      getTableSchema(sparkSession: SparkSession, tablePath: String)
+    val columnnSchemaValues = carbonTableSchema.getTable_columns
+      .asScala.sortBy(_.schemaOrdinal)
+
+    var structFields = new ArrayBuffer[StructField]()
+    columnnSchemaValues.foreach(columnSchema =>
+      structFields.append(StructField(columnSchema.column_name,
+        CatalystSqlParser.parseDataType(columnSchema.data_type.toString), true)))
 
+    Some(new StructType(structFields.toArray))
+  }
 }
 
 object CarbonSource {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
deleted file mode 100644
index be69885..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.streaming
-
-
-import java.util.concurrent.ConcurrentHashMap
-
-import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.spark.sql.execution.datasources.OutputWriterFactory
-import org.apache.spark.sql.types.StructType
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
-
-class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
-
- /**
-  * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
-  * this method gets called by each task on executor side
-  * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
-  *
-  * @param path Path to write the file.
-  * @param dataSchema Schema of the rows to be written. Partition columns are not
-  *                   included in the schema if the relation being written is
-  *                   partitioned.
-  * @param context The Hadoop MapReduce task context.
-  */
-
-  override def newInstance(
-    path: String,
-
-    dataSchema: StructType,
-
-    context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
-
-        new CarbonStreamingOutputWriter(path, context)
-  }
-
-  override def getFileExtension(context: TaskAttemptContext): String = {
-
-    CarbonTablePath.STREAM_FILE_NAME_EXT
-  }
-
-}
-
-object CarbonStreamingOutpurWriterFactory {
-
-  private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
-
-  def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
-
-    if (writers.contains(path)) {
-      throw new IllegalArgumentException(path + "writer already exists")
-    }
-
-    writers.put(path, writer)
-  }
-
-  def getWriter(path: String): CarbonStreamingOutputWriter = {
-
-    writers.get(path)
-  }
-
-  def containsWriter(path: String): Boolean = {
-
-    writers.containsKey(path)
-  }
-
-  def removeWriter(path: String): Unit = {
-
-    writers.remove(path)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
new file mode 100644
index 0000000..c5e4226
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
@@ -0,0 +1,88 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.streaming
+
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+
+class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
+
+ /**
+  * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
+  * this method gets called by each task on executor side
+  * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
+  *
+  * @param path Path to write the file.
+  * @param dataSchema Schema of the rows to be written. Partition columns are not
+  *                   included in the schema if the relation being written is
+  *                   partitioned.
+  * @param context The Hadoop MapReduce task context.
+  */
+
+  override def newInstance(
+    path: String,
+
+    dataSchema: StructType,
+
+    context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
+
+        new CarbonStreamingOutputWriter(path, context)
+  }
+
+  override def getFileExtension(context: TaskAttemptContext): String = {
+
+    CarbonTablePath.STREAM_FILE_NAME_EXT
+  }
+
+}
+
+object CarbonStreamingOutputWriterFactory {
+
+  private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
+
+  def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
+
+    if (writers.contains(path)) {
+      throw new IllegalArgumentException(path + "writer already exists")
+    }
+
+    writers.put(path, writer)
+  }
+
+  def getWriter(path: String): CarbonStreamingOutputWriter = {
+
+    writers.get(path)
+  }
+
+  def containsWriter(path: String): Boolean = {
+
+    writers.containsKey(path)
+  }
+
+  def removeWriter(path: String): Unit = {
+
+    writers.remove(path)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
new file mode 100644
index 0000000..f00eea5
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata.streaming
+
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.sql.common.util.Spark2QueryTest
+import org.apache.spark.sql.{CarbonSource, SparkSession}
+import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test for schema validation during streaming ingestion
+ * Validates streamed schema(source) against existing table(target) schema.
+ */
+
+class CarbonSourceSchemaValidationTest extends Spark2QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll() {
+    sql("DROP TABLE IF EXISTS _carbon_stream_table_")
+  }
+
+  test("Testing validate schema method with correct values ") {
+
+    val spark = SparkSession.builder
+      .appName("StreamIngestSchemaValidation")
+      .master("local")
+      .getOrCreate()
+
+    val carbonSource = new CarbonSource
+    val job = new Job()
+    val warehouseLocation = TestQueryExecutor.warehouse
+
+    sql("CREATE TABLE _carbon_stream_table_(id int,name string)STORED BY 'carbondata'")
+    val tablePath: String = s"$warehouseLocation/default/_carbon_stream_table_"
+    val dataSchema = StructType(Array(StructField("id", IntegerType, true), StructField("name", StringType, true)))
+    val res = carbonSource.prepareWrite(spark, job, Map("path" -> tablePath), dataSchema)
+    assert(res.isInstanceOf[CarbonStreamingOutputWriterFactory])
+  }
+
+  override def afterAll() {
+    sql("DROP TABLE IF EXISTS _carbon_stream_table_")
+  }
+
+}