You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/30 09:27:38 UTC
[1/3] carbondata git commit: [CARBONDATA-1628][Streaming] Re-factory
LoadTableCommand to reuse code for streaming ingest in the future [Forced
Update!]
Repository: carbondata
Updated Branches:
refs/heads/streaming_ingest a833fd47a -> 8c9873927 (forced update)
[CARBONDATA-1628][Streaming] Re-factory LoadTableCommand to reuse code for streaming ingest in the future
Re-factory LoadTableCommand to reuse code for streaming ingest in the future
This closes #1439
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5936e7fb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5936e7fb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5936e7fb
Branch: refs/heads/streaming_ingest
Commit: 5936e7fb25c9d5125a5863aa7648cffe5a1d8f7c
Parents: 4d70a21
Author: QiangCai <qi...@qq.com>
Authored: Fri Oct 27 16:06:06 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Oct 30 14:55:00 2017 +0530
----------------------------------------------------------------------
.../carbondata/spark/util/DataLoadingUtil.scala | 300 ++++++++++++
.../command/management/LoadTableCommand.scala | 464 ++++++-------------
2 files changed, 452 insertions(+), 312 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5936e7fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
new file mode 100644
index 0000000..445fdbb
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import scala.collection.immutable
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.load.ValidateUtil
+
+/**
+ * the util object of data loading
+ */
+object DataLoadingUtil {
+
+ /**
+ * get data loading options and initialise default value
+ */
+ def getDataLoadingOptions(
+ carbonProperty: CarbonProperties,
+ options: immutable.Map[String, String]): mutable.Map[String, String] = {
+ val optionsFinal = scala.collection.mutable.Map[String, String]()
+ optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
+ optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
+ optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
+ optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
+ optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
+ optionsFinal.put("columndict", options.getOrElse("columndict", null))
+
+ optionsFinal.put(
+ "serialization_null_format",
+ options.getOrElse("serialization_null_format", "\\N"))
+
+ optionsFinal.put(
+ "bad_records_logger_enable",
+ options.getOrElse(
+ "bad_records_logger_enable",
+ carbonProperty.getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
+
+ val badRecordActionValue = carbonProperty.getProperty(
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+
+ optionsFinal.put(
+ "bad_records_action",
+ options.getOrElse(
+ "bad_records_action",
+ carbonProperty.getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+ badRecordActionValue)))
+
+ optionsFinal.put(
+ "is_empty_data_bad_record",
+ options.getOrElse(
+ "is_empty_data_bad_record",
+ carbonProperty.getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
+
+ optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
+
+ optionsFinal.put(
+ "complex_delimiter_level_1",
+ options.getOrElse("complex_delimiter_level_1", "\\$"))
+
+ optionsFinal.put(
+ "complex_delimiter_level_2",
+ options.getOrElse("complex_delimiter_level_2", "\\:"))
+
+ optionsFinal.put(
+ "dateformat",
+ options.getOrElse(
+ "dateformat",
+ carbonProperty.getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
+
+ optionsFinal.put(
+ "global_sort_partitions",
+ options.getOrElse(
+ "global_sort_partitions",
+ carbonProperty.getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+ null)))
+
+ optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
+
+ optionsFinal.put(
+ "batch_sort_size_inmb",
+ options.getOrElse(
+ "batch_sort_size_inmb",
+ carbonProperty.getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+ carbonProperty.getProperty(
+ CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+ CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
+
+ optionsFinal.put(
+ "bad_record_path",
+ options.getOrElse(
+ "bad_record_path",
+ carbonProperty.getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+ carbonProperty.getProperty(
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
+
+ val useOnePass = options.getOrElse(
+ "single_pass",
+ carbonProperty.getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
+ case "true" =>
+ true
+ case "false" =>
+ // when single_pass = false and if either alldictionarypath
+ // or columnDict is configured the do not allow load
+ if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
+ StringUtils.isNotEmpty(optionsFinal("columndict"))) {
+ throw new MalformedCarbonCommandException(
+ "Can not use all_dictionary_path or columndict without single_pass.")
+ } else {
+ false
+ }
+ case illegal =>
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ LOGGER.error(s"Can't use single_pass, because illegal syntax found: [$illegal] " +
+ "Please set it as 'true' or 'false'")
+ false
+ }
+ optionsFinal.put("single_pass", useOnePass.toString)
+ optionsFinal
+ }
+
+ /**
+ * check whether using default value or not
+ */
+ private def checkDefaultValue(value: String, default: String) = {
+ if (StringUtils.isEmpty(value)) {
+ default
+ } else {
+ value
+ }
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ */
+ def buildCarbonLoadModel(
+ table: CarbonTable,
+ carbonProperty: CarbonProperties,
+ options: immutable.Map[String, String],
+ optionsFinal: mutable.Map[String, String],
+ carbonLoadModel: CarbonLoadModel): Unit = {
+ carbonLoadModel.setTableName(table.getFactTableName)
+ carbonLoadModel.setDatabaseName(table.getDatabaseName)
+ carbonLoadModel.setStorePath(table.getStorePath)
+ carbonLoadModel.setTableName(table.getFactTableName)
+ val dataLoadSchema = new CarbonDataLoadSchema(table)
+ // Need to fill dimension relation
+ carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+ val sort_scope = optionsFinal("sort_scope")
+ val single_pass = optionsFinal("single_pass")
+ val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
+ val bad_records_action = optionsFinal("bad_records_action")
+ val bad_record_path = optionsFinal("bad_record_path")
+ val global_sort_partitions = optionsFinal("global_sort_partitions")
+ val dateFormat = optionsFinal("dateformat")
+ val delimeter = optionsFinal("delimiter")
+ val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
+ val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
+ val all_dictionary_path = optionsFinal("all_dictionary_path")
+ val column_dict = optionsFinal("columndict")
+ ValidateUtil.validateDateFormat(dateFormat, table, table.getFactTableName)
+ ValidateUtil.validateSortScope(table, sort_scope)
+
+ if (bad_records_logger_enable.toBoolean ||
+ LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
+ if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+ sys.error("Invalid bad records location.")
+ }
+ }
+ carbonLoadModel.setBadRecordsLocation(bad_record_path)
+
+ ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
+ carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
+ carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
+ carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
+
+ // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
+ // we should use table schema to generate file header.
+ var fileHeader = optionsFinal("fileheader")
+ val headerOption = options.get("header")
+ if (headerOption.isDefined) {
+ // whether the csv file has file header
+ // the default value is true
+ val header = try {
+ headerOption.get.toBoolean
+ } catch {
+ case ex: IllegalArgumentException =>
+ throw new MalformedCarbonCommandException(
+ "'header' option should be either 'true' or 'false'. " + ex.getMessage)
+ }
+ if (header) {
+ if (fileHeader.nonEmpty) {
+ throw new MalformedCarbonCommandException(
+ "When 'header' option is true, 'fileheader' option is not required.")
+ }
+ } else {
+ if (fileHeader.isEmpty) {
+ fileHeader = table.getCreateOrderColumn(table.getFactTableName)
+ .asScala.map(_.getColName).mkString(",")
+ }
+ }
+ }
+
+ carbonLoadModel.setDateFormat(dateFormat)
+ carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
+ CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+
+ carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
+ CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+
+ carbonLoadModel.setSerializationNullFormat(
+ TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
+ optionsFinal("serialization_null_format"))
+
+ carbonLoadModel.setBadRecordsLoggerEnable(
+ TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
+
+ carbonLoadModel.setBadRecordsAction(
+ TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
+
+ carbonLoadModel.setIsEmptyDataBadRecord(
+ DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
+ optionsFinal("is_empty_data_bad_record"))
+
+ carbonLoadModel.setSortScope(sort_scope)
+ carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
+ carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
+ carbonLoadModel.setUseOnePass(single_pass.toBoolean)
+
+ if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
+ complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
+ delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
+ sys.error(s"Field Delimiter & Complex types delimiter are same")
+ } else {
+ carbonLoadModel.setComplexDelimiterLevel1(
+ CarbonUtil.delimiterConverter(complex_delimeter_level1))
+ carbonLoadModel.setComplexDelimiterLevel2(
+ CarbonUtil.delimiterConverter(complex_delimeter_level2))
+ }
+ // set local dictionary path, and dictionary file extension
+ carbonLoadModel.setAllDictPath(all_dictionary_path)
+ carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
+ carbonLoadModel.setCsvHeader(fileHeader)
+ carbonLoadModel.setColDictFilePath(column_dict)
+ carbonLoadModel.setDirectLoad(true)
+ carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
+
+ val validatedMaxColumns = CommonUtil.validateMaxColumns(
+ carbonLoadModel.getCsvHeaderColumns,
+ optionsFinal("maxcolumns"))
+
+ carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
+ if (null == carbonLoadModel.getLoadMetadataDetails) {
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5936e7fb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 9018f7b..630ee27 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, DataPro
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.{CausedBy, FileUtils}
-import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -39,14 +38,11 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format
import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.TableOptionConstant
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.ValidateUtil
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
case class LoadTableCommand(
databaseNameOp: Option[String],
@@ -60,89 +56,6 @@ case class LoadTableCommand(
updateModel: Option[UpdateTableModel] = None)
extends RunnableCommand with DataProcessCommand {
- private def getFinalOptions(carbonProperty: CarbonProperties):
- scala.collection.mutable.Map[String, String] = {
- val optionsFinal = scala.collection.mutable.Map[String, String]()
- optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
- optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
- optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
- optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
- optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
- optionsFinal.put("columndict", options.getOrElse("columndict", null))
- optionsFinal
- .put("serialization_null_format", options.getOrElse("serialization_null_format", "\\N"))
- optionsFinal.put("bad_records_logger_enable", options.getOrElse("bad_records_logger_enable",
- carbonProperty
- .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
- val badRecordActionValue = carbonProperty
- .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
- optionsFinal.put("bad_records_action", options.getOrElse("bad_records_action", carbonProperty
- .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
- badRecordActionValue)))
- optionsFinal
- .put("is_empty_data_bad_record", options.getOrElse("is_empty_data_bad_record", carbonProperty
- .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
- optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
- optionsFinal
- .put("complex_delimiter_level_1", options.getOrElse("complex_delimiter_level_1", "\\$"))
- optionsFinal
- .put("complex_delimiter_level_2", options.getOrElse("complex_delimiter_level_2", "\\:"))
- optionsFinal.put("dateformat", options.getOrElse("dateformat",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
- CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
-
- optionsFinal.put("global_sort_partitions", options.getOrElse("global_sort_partitions",
- carbonProperty
- .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)))
-
- optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
-
- optionsFinal.put("batch_sort_size_inmb", options.getOrElse("batch_sort_size_inmb",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
- carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
-
- optionsFinal.put("bad_record_path", options.getOrElse("bad_record_path",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
-
- val useOnePass = options.getOrElse("single_pass",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
- CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
- case "true" =>
- true
- case "false" =>
- // when single_pass = false and if either alldictionarypath
- // or columnDict is configured the do not allow load
- if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
- StringUtils.isNotEmpty(optionsFinal("columndict"))) {
- throw new MalformedCarbonCommandException(
- "Can not use all_dictionary_path or columndict without single_pass.")
- } else {
- false
- }
- case illegal =>
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
- "Please set it as 'true' or 'false'")
- false
- }
- optionsFinal.put("single_pass", useOnePass.toString)
- optionsFinal
- }
-
- private def checkDefaultValue(value: String, default: String) = {
- if (StringUtils.isEmpty(value)) {
- default
- } else {
- value
- }
- }
-
override def run(sparkSession: SparkSession): Seq[Row] = {
processData(sparkSession)
}
@@ -158,7 +71,6 @@ case class LoadTableCommand(
}
val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-
val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
if (relation == null) {
@@ -172,7 +84,7 @@ case class LoadTableCommand(
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")
- val optionsFinal = getFinalOptions(carbonProperty)
+ val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
val tableProperties = relation.tableMeta.carbonTable.getTableInfo
.getFactTable.getTableProperties
@@ -183,133 +95,26 @@ case class LoadTableCommand(
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
try {
+ val table = relation.tableMeta.carbonTable
+ val carbonLoadModel = new CarbonLoadModel()
val factPath = if (dataFrame.isDefined) {
""
} else {
FileUtils.getPaths(
CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
}
- val carbonLoadModel = new CarbonLoadModel()
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
-
- val table = relation.tableMeta.carbonTable
- carbonLoadModel.setTableName(table.getFactTableName)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-
- val partitionLocation = relation.tableMeta.storePath + "/partition/" +
- relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
- relation.tableMeta.carbonTableIdentifier.getTableName + "/"
- val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
- val sort_scope = optionsFinal("sort_scope")
- val single_pass = optionsFinal("single_pass")
- val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
- val bad_records_action = optionsFinal("bad_records_action")
- val bad_record_path = optionsFinal("bad_record_path")
- val global_sort_partitions = optionsFinal("global_sort_partitions")
- val dateFormat = optionsFinal("dateformat")
- val delimeter = optionsFinal("delimiter")
- val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
- val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
- val all_dictionary_path = optionsFinal("all_dictionary_path")
- val column_dict = optionsFinal("columndict")
- ValidateUtil.validateDateFormat(dateFormat, table, tableName)
- ValidateUtil.validateSortScope(table, sort_scope)
-
- if (bad_records_logger_enable.toBoolean ||
- LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
- if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
- sys.error("Invalid bad records location.")
- }
- }
- carbonLoadModel.setBadRecordsLocation(bad_record_path)
-
- ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
- carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
- carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
- carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
-
- // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
- // we should use table schema to generate file header.
- var fileHeader = optionsFinal("fileheader")
- val headerOption = options.get("header")
- if (headerOption.isDefined) {
- // whether the csv file has file header
- // the default value is true
- val header = try {
- headerOption.get.toBoolean
- } catch {
- case ex: IllegalArgumentException =>
- throw new MalformedCarbonCommandException(
- "'header' option should be either 'true' or 'false'. " + ex.getMessage)
- }
- if (header) {
- if (fileHeader.nonEmpty) {
- throw new MalformedCarbonCommandException(
- "When 'header' option is true, 'fileheader' option is not required.")
- }
- } else {
- if (fileHeader.isEmpty) {
- fileHeader = table.getCreateOrderColumn(table.getFactTableName)
- .asScala.map(_.getColName).mkString(",")
- }
- }
- }
-
- carbonLoadModel.setDateFormat(dateFormat)
- carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
- CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
- carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_DATE_FORMAT,
- CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
- carbonLoadModel
- .setSerializationNullFormat(
- TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
- optionsFinal("serialization_null_format"))
- carbonLoadModel
- .setBadRecordsLoggerEnable(
- TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
- carbonLoadModel
- .setBadRecordsAction(
- TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
- carbonLoadModel
- .setIsEmptyDataBadRecord(
- DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
- optionsFinal("is_empty_data_bad_record"))
- carbonLoadModel.setSortScope(sort_scope)
- carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
- carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
- carbonLoadModel.setUseOnePass(single_pass.toBoolean)
- if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
- complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
- delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
- sys.error(s"Field Delimiter & Complex types delimiter are same")
- } else {
- carbonLoadModel.setComplexDelimiterLevel1(
- CarbonUtil.delimiterConverter(complex_delimeter_level1))
- carbonLoadModel.setComplexDelimiterLevel2(
- CarbonUtil.delimiterConverter(complex_delimeter_level2))
- }
- // set local dictionary path, and dictionary file extension
- carbonLoadModel.setAllDictPath(all_dictionary_path)
-
- val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- try {
+ carbonLoadModel.setFactFilePath(factPath)
+ DataLoadingUtil.buildCarbonLoadModel(
+ table,
+ carbonProperty,
+ options,
+ optionsFinal,
+ carbonLoadModel
+ )
+
+ try{
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
- carbonLoadModel.setFactFilePath(factPath)
- carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
- carbonLoadModel.setCsvHeader(fileHeader)
- carbonLoadModel.setColDictFilePath(column_dict)
- carbonLoadModel.setDirectLoad(true)
- carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
- val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
- optionsFinal("maxcolumns"))
- carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
val storePath = relation.tableMeta.storePath
// add the start entry for the new load in the table status file
@@ -320,11 +125,9 @@ case class LoadTableCommand(
if (isOverwriteTable) {
LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
}
- if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
- }
if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
- StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) {
+ StringUtils.isEmpty(carbonLoadModel.getColDictFilePath) &&
+ StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
carbonLoadModel.setUseOnePass(false)
@@ -337,111 +140,21 @@ case class LoadTableCommand(
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
FileFactory.mkdirs(metadataDirectoryPath, fileType)
}
+ val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+ val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
if (carbonLoadModel.getUseOnePass) {
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- .getCarbonTableIdentifier
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier)
- val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
- val dimensions = carbonTable.getDimensionByTableName(
- carbonTable.getFactTableName).asScala.toArray
- val colDictFilePath = carbonLoadModel.getColDictFilePath
- if (!StringUtils.isEmpty(colDictFilePath)) {
- carbonLoadModel.initPredefDictMap()
- // generate predefined dictionary
- GlobalDictionaryUtil
- .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
- dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath)
- }
- if (!StringUtils.isEmpty(all_dictionary_path)) {
- carbonLoadModel.initPredefDictMap()
- GlobalDictionaryUtil
- .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
- carbonLoadModel,
- storePath,
- carbonTableIdentifier,
- dictFolderPath,
- dimensions,
- all_dictionary_path)
- }
- // dictionaryServerClient dictionary generator
- val dictionaryServerPort = carbonProperty
- .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
- CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
- val sparkDriverHost = sparkSession.sqlContext.sparkContext.
- getConf.get("spark.driver.host")
- carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
- // start dictionary server when use one pass load and dimension with DICTIONARY
- // encoding is present.
- val allDimensions = table.getAllDimensions.asScala.toList
- val createDictionary = allDimensions.exists {
- carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
- !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
- }
- val server: Option[DictionaryServer] = if (createDictionary) {
- val dictionaryServer = DictionaryServer
- .getInstance(dictionaryServerPort.toInt, carbonTable)
- carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
- sparkSession.sparkContext.addSparkListener(new SparkListener() {
- override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
- dictionaryServer.shutdown()
- }
- })
- Some(dictionaryServer)
- } else {
- None
- }
- CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+ loadDataUsingOnePass(
+ sparkSession,
+ carbonProperty,
carbonLoadModel,
- relation.tableMeta.storePath,
columnar,
- partitionStatus,
- server,
- isOverwriteTable,
- dataFrame,
- updateModel)
+ partitionStatus)
} else {
- val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
- val fields = dataFrame.get.schema.fields
- import org.apache.spark.sql.functions.udf
- // extracting only segment from tupleId
- val getSegIdUDF = udf((tupleId: String) =>
- CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
- // getting all fields except tupleId field as it is not required in the value
- var otherFields = fields.toSeq
- .filter(field => !field.name
- .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
- .map(field => new Column(field.name))
-
- // extract tupleId field which will be used as a key
- val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
- .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
- as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
- // use dataFrameWithoutTupleId as dictionaryDataFrame
- val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
- otherFields = otherFields :+ segIdColumn
- // use dataFrameWithTupleId as loadDataFrame
- val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
- (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
- } else {
- (dataFrame, dataFrame)
- }
-
- GlobalDictionaryUtil.generateGlobalDictionary(
- sparkSession.sqlContext,
- carbonLoadModel,
- relation.tableMeta.storePath,
- dictionaryDataFrame)
- CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+ loadData(
+ sparkSession,
carbonLoadModel,
- relation.tableMeta.storePath,
columnar,
- partitionStatus,
- None,
- isOverwriteTable,
- loadDataFrame,
- updateModel)
+ partitionStatus)
}
} catch {
case CausedBy(ex: NoRetryException) =>
@@ -454,6 +167,9 @@ case class LoadTableCommand(
} finally {
// Once the data load is successful delete the unwanted partition files
try {
+ val partitionLocation = table.getStorePath + "/partition/" +
+ table.getDatabaseName + "/" +
+ table.getFactTableName + "/"
val fileType = FileFactory.getFileType(partitionLocation)
if (FileFactory.isFileExist(partitionLocation, fileType)) {
val file = FileFactory
@@ -480,6 +196,130 @@ case class LoadTableCommand(
Seq.empty
}
+ private def loadDataUsingOnePass(
+ sparkSession: SparkSession,
+ carbonProperty: CarbonProperties,
+ carbonLoadModel: CarbonLoadModel,
+ columnar: Boolean,
+ partitionStatus: String): Unit = {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ .getCarbonTableIdentifier
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(carbonLoadModel.getStorePath, carbonTableIdentifier)
+ val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
+ val dimensions = carbonTable.getDimensionByTableName(
+ carbonTable.getFactTableName).asScala.toArray
+ val colDictFilePath = carbonLoadModel.getColDictFilePath
+ if (!StringUtils.isEmpty(colDictFilePath)) {
+ carbonLoadModel.initPredefDictMap()
+ // generate predefined dictionary
+ GlobalDictionaryUtil.generatePredefinedColDictionary(
+ colDictFilePath,
+ carbonTableIdentifier,
+ dimensions,
+ carbonLoadModel,
+ sparkSession.sqlContext,
+ carbonLoadModel.getStorePath,
+ dictFolderPath)
+ }
+ if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
+ carbonLoadModel.initPredefDictMap()
+ GlobalDictionaryUtil
+ .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
+ carbonLoadModel,
+ carbonLoadModel.getStorePath,
+ carbonTableIdentifier,
+ dictFolderPath,
+ dimensions,
+ carbonLoadModel.getAllDictPath)
+ }
+ // dictionaryServerClient dictionary generator
+ val dictionaryServerPort = carbonProperty
+ .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+ CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
+ val sparkDriverHost = sparkSession.sqlContext.sparkContext.
+ getConf.get("spark.driver.host")
+ carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
+ // start dictionary server when use one pass load and dimension with DICTIONARY
+ // encoding is present.
+ val allDimensions =
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
+ val createDictionary = allDimensions.exists {
+ carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+ !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
+ }
+ val server: Option[DictionaryServer] = if (createDictionary) {
+ val dictionaryServer = DictionaryServer
+ .getInstance(dictionaryServerPort.toInt, carbonTable)
+ carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+ sparkSession.sparkContext.addSparkListener(new SparkListener() {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+ dictionaryServer.shutdown()
+ }
+ })
+ Some(dictionaryServer)
+ } else {
+ None
+ }
+ CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+ carbonLoadModel,
+ carbonLoadModel.getStorePath,
+ columnar,
+ partitionStatus,
+ server,
+ isOverwriteTable,
+ dataFrame,
+ updateModel)
+ }
+
+ private def loadData(
+ sparkSession: SparkSession,
+ carbonLoadModel: CarbonLoadModel,
+ columnar: Boolean,
+ partitionStatus: String): Unit = {
+ val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
+ val fields = dataFrame.get.schema.fields
+ import org.apache.spark.sql.functions.udf
+ // extracting only segment from tupleId
+ val getSegIdUDF = udf((tupleId: String) =>
+ CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
+ // getting all fields except tupleId field as it is not required in the value
+ var otherFields = fields.toSeq
+ .filter(field => !field.name
+ .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+ .map(field => new Column(field.name))
+
+ // extract tupleId field which will be used as a key
+ val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
+ .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
+ as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
+ // use dataFrameWithoutTupleId as dictionaryDataFrame
+ val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
+ otherFields = otherFields :+ segIdColumn
+ // use dataFrameWithTupleId as loadDataFrame
+ val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
+ (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
+ } else {
+ (dataFrame, dataFrame)
+ }
+
+ GlobalDictionaryUtil.generateGlobalDictionary(
+ sparkSession.sqlContext,
+ carbonLoadModel,
+ carbonLoadModel.getStorePath,
+ dictionaryDataFrame)
+ CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+ carbonLoadModel,
+ carbonLoadModel.getStorePath,
+ columnar,
+ partitionStatus,
+ None,
+ isOverwriteTable,
+ loadDataFrame,
+ updateModel)
+ }
+
private def updateTableMetadata(
carbonLoadModel: CarbonLoadModel,
sqlContext: SQLContext,
[2/3] carbondata git commit: [CARBONDATA-1173] Stream ingestion -
write path framework
Posted by ja...@apache.org.
[CARBONDATA-1173] Stream ingestion - write path framework
This closes #1064
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a9d951ea
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a9d951ea
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a9d951ea
Branch: refs/heads/streaming_ingest
Commit: a9d951eaa3cdcb0b1b55dfbe8a94819a45250989
Parents: 5936e7f
Author: Aniket Adnaik <an...@gmail.com>
Authored: Thu Jun 15 11:57:43 2017 -0700
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Oct 30 14:56:57 2017 +0530
----------------------------------------------------------------------
.../streaming/CarbonStreamingCommitInfo.java | 108 ++++++++++
.../streaming/CarbonStreamingConstants.java | 25 +++
.../streaming/CarbonStreamingMetaStore.java | 40 ++++
.../streaming/CarbonStreamingMetaStoreImpl.java | 56 ++++++
.../core/util/path/CarbonTablePath.java | 10 +
.../streaming/CarbonStreamingOutputFormat.java | 66 +++++++
.../streaming/CarbonStreamingRecordWriter.java | 196 +++++++++++++++++++
.../org/apache/spark/sql/CarbonSource.scala | 39 +++-
.../CarbonStreamingOutpurWriteFactory.scala | 88 +++++++++
.../streaming/CarbonStreamingOutputWriter.scala | 98 ++++++++++
10 files changed, 719 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
new file mode 100644
index 0000000..6cf303a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+/**
+ * Commit info for streaming writes
+ * The commit info can be used to recover valid offset in the file
+ * in the case of write failure.
+ */
+public class CarbonStreamingCommitInfo {
+
+ private String dataBase;
+
+ private String table;
+
+ private long commitTime;
+
+ private long segmentID;
+
+ private String partitionID;
+
+ private long batchID;
+
+ private String fileOffset;
+
+ private long transactionID; // future use
+
+ public CarbonStreamingCommitInfo(
+
+ String dataBase,
+
+ String table,
+
+ long commitTime,
+
+ long segmentID,
+
+ String partitionID,
+
+ long batchID) {
+
+ this.dataBase = dataBase;
+
+ this.table = table;
+
+ this.commitTime = commitTime;
+
+ this.segmentID = segmentID;
+
+ this.partitionID = partitionID;
+
+ this.batchID = batchID;
+
+ this.transactionID = -1;
+ }
+
+ public String getDataBase() {
+ return dataBase;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public long getCommitTime() {
+ return commitTime;
+ }
+
+ public long getSegmentID() {
+ return segmentID;
+ }
+
+ public String getPartitionID() {
+ return partitionID;
+ }
+
+ public long getBatchID() {
+ return batchID;
+ }
+
+ public String getFileOffset() {
+ return fileOffset;
+ }
+
+ public long getTransactionID() {
+ return transactionID;
+ }
+
+ @Override
+ public String toString() {
+ return dataBase + "." + table + "." + segmentID + "$" + partitionID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
new file mode 100644
index 0000000..db7186f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+public class CarbonStreamingConstants {
+
+ public static final long DEFAULT_CARBON_STREAM_FILE_BLOCK_SIZE = 1024 * 1024 * 1024; // 1GB
+
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
new file mode 100644
index 0000000..fa3746c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+
+import java.io.IOException;
+
+/**
+ * Generic interface for storing commit info for streaming ingest
+ */
+public interface CarbonStreamingMetaStore {
+
+ public CarbonStreamingCommitInfo getStreamingCommitInfo(
+ String dataBase,
+ String table,
+ long segmentID,
+ String partitionID) throws IOException;
+
+ public void updateStreamingCommitInfo(
+ CarbonStreamingMetaStore commitInfo) throws IOException;
+
+ public void recoverStreamingData(
+ CarbonStreamingCommitInfo commitInfo) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
new file mode 100644
index 0000000..0afe962
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+import java.io.IOException;
+
+/**
+ * JSON format can be used to store the metadata
+ */
+public class CarbonStreamingMetaStoreImpl implements CarbonStreamingMetaStore {
+
+ /**
+ * get commit info from metastore
+ */
+ public CarbonStreamingCommitInfo getStreamingCommitInfo(
+ String dataBase,
+ String table,
+ long segmentID,
+ String partitionID) throws IOException {
+
+ return null;
+
+ }
+
+ /**
+ * Update commit info in metastore
+ */
+ public void updateStreamingCommitInfo(
+ CarbonStreamingMetaStore commitInfo) throws IOException {
+
+ }
+
+ /**
+ * Recover streaming data using valid offset in commit info
+ */
+ public void recoverStreamingData(
+ CarbonStreamingCommitInfo commitInfo) throws IOException {
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 7be9c76..3c6bb3c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -51,6 +51,16 @@ public class CarbonTablePath extends Path {
protected static final String INDEX_FILE_EXT = ".carbonindex";
protected static final String DELETE_DELTA_FILE_EXT = ".deletedelta";
+ /**
+ * Streaming ingest related paths
+ */
+ protected static final String STREAM_PREFIX = "Streaming";
+ protected static final String STREAM_FILE_NAME_EXT = ".carbondata.stream";
+ protected static final String STREAM_FILE_BEING_WRITTEN = "in-progress.carbondata.stream";
+ protected static final String STREAM_FILE_BEING_WRITTEN_META = "in-progress.meta";
+ protected static final String STREAM_COMPACTION_STATUS = "streaming_compaction_status";
+ protected static final String STREAM_FILE_LOCK = "streaming_in_use.lock";
+
protected String tablePath;
protected CarbonTableIdentifier carbonTableIdentifier;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
new file mode 100644
index 0000000..350684e
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.streaming.CarbonStreamingConstants;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+
+/**
+ * Output format to write streaming data to carbondata file
+ *
+ * @param <V> - type of record
+ */
+public class CarbonStreamingOutputFormat<K, V> extends FileOutputFormat<K, V> {
+
+ public static long getBlockSize(Configuration conf) {
+ return conf.getLong("dfs.block.size",
+ CarbonStreamingConstants.DEFAULT_CARBON_STREAM_FILE_BLOCK_SIZE);
+ }
+
+ public static void setBlockSize(Configuration conf, long blockSize) {
+ conf.setLong("dfs.block.size", blockSize);
+ }
+
+ /**
+ * When getRecordWriter may need to override
+ * to provide correct path including streaming segment name
+ */
+ @Override
+ public CarbonStreamingRecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
+ throws IOException, InterruptedException {
+
+ Configuration conf = job.getConfiguration();
+
+ String keyValueSeparator = conf.get(
+ CSVInputFormat.DELIMITER,
+ CSVInputFormat.DELIMITER_DEFAULT);
+
+ return new CarbonStreamingRecordWriter<K, V>(
+ conf,
+ getDefaultWorkFile(job, null),
+ keyValueSeparator);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
new file mode 100644
index 0000000..9d1951f
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+
+public class CarbonStreamingRecordWriter<K,V> extends RecordWriter<K, V> {
+
+ private static final String utf8 = "UTF-8";
+
+ private static final byte[] newline;
+
+ static {
+
+ try {
+
+ newline = "\n".getBytes(utf8);
+
+ } catch (UnsupportedEncodingException uee) {
+
+ throw new IllegalArgumentException("Can't find " + utf8 + " encoding");
+ }
+ }
+
+ private FSDataOutputStream outputStream;
+
+ private FileSystem fs;
+
+ private Path file;
+
+ private volatile boolean isClosed;
+
+ private final byte[] keyValueSeparator;
+
+ public void initOut() throws IOException {
+
+ outputStream = fs.create(file, false);
+
+ isClosed = false;
+ }
+
+ public CarbonStreamingRecordWriter(
+ Configuration conf,
+ Path file,
+ String keyValueSeparator) throws IOException {
+
+ this.file = file;
+
+ fs = FileSystem.get(conf);
+
+ outputStream = fs.create(file, false);
+
+ isClosed = false;
+
+ try {
+
+ this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
+
+ } catch (UnsupportedEncodingException uee) {
+
+ throw new IllegalArgumentException("Can't find " + utf8 + "encoding");
+
+ }
+
+ }
+
+ public CarbonStreamingRecordWriter(
+ Configuration conf,
+ Path file) throws IOException {
+
+ this(conf, file, ",");
+
+ }
+
+ /**
+ * Write Object to byte stream.
+ */
+
+ private void writeObject(Object o) throws IOException {
+
+ if (o instanceof Text) {
+ Text to = (Text)o;
+
+ outputStream.write(to.getBytes(), 0, to.getLength());
+
+ } else {
+
+ outputStream.write(o.toString().getBytes(utf8));
+
+ }
+ }
+
+ /**
+ * Write streaming data as text file (temporary)
+ */
+
+ @Override
+ public synchronized void write(K key, V value) throws IOException {
+
+ boolean isNULLKey = key == null || key instanceof NullWritable;
+
+ boolean isNULLValue = value == null || value instanceof NullWritable;
+
+ if (isNULLKey && isNULLValue) {
+
+ return;
+ }
+
+ if (!isNULLKey) {
+
+ writeObject(key);
+ }
+
+ if (!isNULLKey || !isNULLValue) {
+
+ outputStream.write(keyValueSeparator);
+ }
+
+ if (!isNULLValue) {
+
+ writeObject(value);
+ }
+
+ outputStream.write(newline);
+ }
+
+ private void closeInternal() throws IOException {
+
+ if (!isClosed) {
+
+ outputStream.close();
+
+ isClosed = true;
+ }
+
+ }
+
+ public void flush() throws IOException {
+
+ outputStream.hflush();
+ }
+
+ public long getOffset() throws IOException {
+
+ return outputStream.getPos();
+ }
+
+ public void commit(boolean finalCommit) throws IOException {
+
+ closeInternal();
+
+ Path commitFile = new Path(file.getParent(),
+ CarbonTablePath.getCarbonDataPrefix() + System.currentTimeMillis());
+
+ fs.rename(file, commitFile);
+
+ if (!finalCommit) {
+ initOut();
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+
+ closeInternal();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index f4f8b75..d496de2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -21,16 +21,19 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.commons.lang.StringUtils
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory}
import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -41,12 +44,13 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
/**
* Carbon relation provider compliant to data source api.
* Creates carbon relations
*/
class CarbonSource extends CreatableRelationProvider with RelationProvider
- with SchemaRelationProvider with DataSourceRegister {
+ with SchemaRelationProvider with DataSourceRegister with FileFormat {
override def shortName(): String = "carbondata"
@@ -54,7 +58,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
CarbonEnv.getInstance(sqlContext.sparkSession)
- // if path is provided we can directly create Hadoop relation. \
+ // if path is provided we can directly create Hadoop relation.
// Otherwise create datasource relation
parameters.get("tablePath") match {
case Some(path) => CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
@@ -178,7 +182,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
/**
* Returns the path of the table
*
- * @param sparkSession
+ * @param sparkSession
* @param dbName
* @param tableName
* @return
@@ -203,11 +207,32 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
(relation.tableMeta.tablePath, parameters)
}
} catch {
- case ex: Exception =>
- throw new Exception(s"Do not have $dbName and $tableName", ex)
+ case ex: Exception =>
+ throw new Exception(s"Do not have $dbName and $tableName", ex)
}
}
+ /**
+ * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
+ * be put here. For example, user defined output committer can be configured here
+ * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
+ */
+ def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
+
+ /**
+ * When possible, this method should return the schema of the given `files`. When the format
+ * does not support inference, or no valid files are given should return None. In these cases
+ * Spark will require that user specify the schema manually.
+ */
+ def inferSchema(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType))
+
}
object CarbonSource {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
new file mode 100644
index 0000000..be69885
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
@@ -0,0 +1,88 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.streaming
+
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+
+class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
+
+ /**
+ * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
+ * this method gets called by each task on executor side
+ * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
+ *
+ * @param path Path to write the file.
+ * @param dataSchema Schema of the rows to be written. Partition columns are not
+ * included in the schema if the relation being written is
+ * partitioned.
+ * @param context The Hadoop MapReduce task context.
+ */
+
+ override def newInstance(
+ path: String,
+
+ dataSchema: StructType,
+
+ context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
+
+ new CarbonStreamingOutputWriter(path, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+
+ CarbonTablePath.STREAM_FILE_NAME_EXT
+ }
+
+}
+
+object CarbonStreamingOutpurWriterFactory {
+
+ private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
+
+ def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
+
+ if (writers.contains(path)) {
+ throw new IllegalArgumentException(path + "writer already exists")
+ }
+
+ writers.put(path, writer)
+ }
+
+ def getWriter(path: String): CarbonStreamingOutputWriter = {
+
+ writers.get(path)
+ }
+
+ def containsWriter(path: String): Boolean = {
+
+ writers.containsKey(path)
+ }
+
+ def removeWriter(path: String): Unit = {
+
+ writers.remove(path)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9d951ea/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
new file mode 100644
index 0000000..dfc8ff3
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{NullWritable, Text}
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.OutputWriter
+import org.apache.spark.sql.Row
+
+import org.apache.carbondata.hadoop.streaming.{CarbonStreamingOutputFormat, CarbonStreamingRecordWriter}
+
+class CarbonStreamingOutputWriter (
+ path: String,
+ context: TaskAttemptContext)
+ extends OutputWriter {
+
+ private[this] val buffer = new Text()
+
+ private val recordWriter: CarbonStreamingRecordWriter[NullWritable, Text] = {
+
+ val outputFormat = new CarbonStreamingOutputFormat[NullWritable, Text] () {
+
+ override def getDefaultWorkFile(context: TaskAttemptContext, extension: String) : Path = {
+ new Path(path)
+ }
+
+ /*
+ May need to override
+ def getOutputCommiter(c: TaskAttemptContext): OutputCommitter = {
+ null
+ }
+ */
+
+ }
+
+ outputFormat.
+ getRecordWriter(context).asInstanceOf[CarbonStreamingRecordWriter[NullWritable, Text]]
+ }
+
+ override def write(row: Row): Unit = {
+
+ throw new UnsupportedOperationException("call writeInternal")
+
+ }
+
+ override protected [sql] def writeInternal(row: InternalRow): Unit = {
+
+ val utf8string = row.getUTF8String(0)
+
+ buffer.set(utf8string.getBytes)
+
+ recordWriter.write(NullWritable.get(), buffer)
+
+ }
+
+ def getpath: String = path
+
+ override def close(): Unit = {
+
+ recordWriter.close(context)
+
+ }
+
+ def flush(): Unit = {
+
+ recordWriter.flush()
+
+ }
+
+ def getPos(): Long = {
+
+ recordWriter.getOffset()
+
+ }
+
+ def commit(finalCommit: Boolean): Unit = {
+
+ recordWriter.commit(finalCommit)
+
+ }
+}
[3/3] carbondata git commit: [CARBONDATA-1174] Streaming Ingestion -
schema validation and streaming examples
Posted by ja...@apache.org.
[CARBONDATA-1174] Streaming Ingestion - schema validation and streaming examples
1.schema validation of input data if its from a file source when schema is specified.
2.added streaming examples - for file stream and socket stream sources
This closes #1352
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8c987392
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8c987392
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8c987392
Branch: refs/heads/streaming_ingest
Commit: 8c98739279e5ffe49b70ae6664d19a7f596cf18c
Parents: a9d951e
Author: Aniket Adnaik <an...@gmail.com>
Authored: Thu Jun 15 11:57:43 2017 -0700
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Oct 30 14:56:57 2017 +0530
----------------------------------------------------------------------
.../streaming/CarbonStreamingCommitInfo.java | 6 +-
...CarbonStreamingIngestFileSourceExample.scala | 146 +++++++++++++
...rbonStreamingIngestSocketSourceExample.scala | 160 ++++++++++++++
.../examples/utils/StreamingExampleUtil.scala | 145 +++++++++++++
.../org/apache/spark/sql/CarbonSource.scala | 210 +++++++++++++++++--
.../CarbonStreamingOutpurWriteFactory.scala | 88 --------
.../CarbonStreamingOutputWriteFactory.scala | 88 ++++++++
.../CarbonSourceSchemaValidationTest.scala | 61 ++++++
8 files changed, 801 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
index 6cf303a..2027566 100644
--- a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
@@ -36,7 +36,7 @@ public class CarbonStreamingCommitInfo {
private long batchID;
- private String fileOffset;
+ private long fileOffset;
private long transactionID; // future use
@@ -67,6 +67,8 @@ public class CarbonStreamingCommitInfo {
this.batchID = batchID;
this.transactionID = -1;
+
+ this.fileOffset = 0;
}
public String getDataBase() {
@@ -93,7 +95,7 @@ public class CarbonStreamingCommitInfo {
return batchID;
}
- public String getFileOffset() {
+ public long getFileOffset() {
return fileOffset;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
new file mode 100644
index 0000000..ebe0a5c
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestFileSourceExample.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.streaming.ProcessingTime
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.utils.{StreamingExampleUtil}
+
+/**
+ * Covers spark structured streaming scenario where user streams data
+ * from a file source (input source) and write into carbondata table(output sink).
+ * This example uses csv file as a input source and writes
+ * into target carbon table. The target carbon table must exist.
+ */
+
+object CarbonStreamingIngestFileSourceExample {
+
+ def main(args: Array[String]) {
+
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+ val metastoredb = s"$rootPath/examples/spark2/target"
+ val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
+ val streamTableName = s"_carbon_file_stream_table_"
+ val streamTablePath = s"$storeLocation/default/$streamTableName"
+ val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+ // cleanup residual files, if any
+ StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+
+ import org.apache.spark.sql.CarbonSession._
+ val spark = SparkSession
+ .builder()
+ .master("local")
+ .appName("CarbonFileStreamingExample")
+ .config("spark.sql.warehouse.dir", warehouse)
+ .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+ spark.sparkContext.setLogLevel("ERROR")
+
+ // Writes Dataframe to CarbonData file:
+ import spark.implicits._
+ import org.apache.spark.sql.types._
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS $streamTableName")
+ // Create target carbon table
+ spark.sql(
+ s"""
+ | CREATE TABLE $streamTableName(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'""".stripMargin)
+
+ // Generate CSV data and write to CSV file
+ StreamingExampleUtil.generateCSVDataFile(spark, 1, csvDataDir, SaveMode.Overwrite)
+
+ // scalastyle:off
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$csvDataDir'
+ | INTO TABLE $streamTableName
+ | OPTIONS('FILEHEADER'='id,name,city,salary'
+ | )""".stripMargin)
+ // scalastyle:on
+
+ // check initial table data
+ spark.sql(s""" SELECT * FROM $streamTableName """).show()
+
+ // define custom schema
+ val inputSchema = new StructType().
+ add("id", "integer").
+ add("name", "string").
+ add("city", "string").
+ add("salary", "float")
+
+ // setup csv file as a input streaming source
+ val csvReadDF = spark.readStream.
+ format("csv").
+ option("sep", ",").
+ schema(inputSchema).
+ option("path", csvDataDir).
+ option("header", "true").
+ load()
+
+ // Write data from csv format streaming source to carbondata target format
+ // set trigger to every 1 second
+ val qry = csvReadDF.writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime("1 seconds"))
+ .option("checkpointLocation", ckptLocation)
+ .option("path", streamTablePath)
+ .start()
+
+ // In a separate thread append data every 2 seconds to existing csv
+ val gendataThread: Thread = new Thread() {
+ override def run(): Unit = {
+ for (i <- 1 to 5) {
+ Thread.sleep(2)
+ StreamingExampleUtil.
+ generateCSVDataFile(spark, i * 10 + 1, csvDataDir, SaveMode.Append)
+ }
+ }
+ }
+ gendataThread.start()
+ gendataThread.join()
+
+ // stop streaming execution after 5 sec delay
+ Thread.sleep(5000)
+ qry.stop()
+
+ // verify streaming data is added into the table
+ spark.sql(s""" SELECT * FROM $streamTableName """).show()
+
+ // Cleanup residual files and table data
+ StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+ spark.sql(s"DROP TABLE IF EXISTS $streamTableName")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
new file mode 100644
index 0000000..bbf7ef2
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/streaming/CarbonStreamingIngestSocketSourceExample.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.streaming.ProcessingTime
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.utils.StreamingExampleUtil
+
+
+/**
+ * This example reads stream data from socket source (input) and write into
+ * existing carbon table(output).
+ *
+ * It uses localhost and port (9999) to create a socket and write to it.
+ * Exmaples uses two threads one to write data to socket and other thread
+ * to receive data from socket and write into carbon table.
+ */
+
+// scalastyle:off println
+object CarbonStreamingIngestSocketSourceExample {
+
+ def main(args: Array[String]) {
+
+ // setup localhost and port number
+ val host = "localhost"
+ val port = 9999
+ // setup paths
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+ val metastoredb = s"$rootPath/examples/spark2/target"
+ val csvDataDir = s"$rootPath/examples/spark2/resources/csvDataDir"
+ val streamTableName = s"_carbon_socket_stream_table_"
+ val streamTablePath = s"$storeLocation/default/$streamTableName"
+ val ckptLocation = s"$rootPath/examples/spark2/resources/ckptDir"
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+ // cleanup residual files, if any
+ StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+
+ import org.apache.spark.sql.CarbonSession._
+ val spark = SparkSession
+ .builder()
+ .master("local[2]")
+ .appName("CarbonNetworkStreamingExample")
+ .config("spark.sql.warehouse.dir", warehouse)
+ .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+ spark.sparkContext.setLogLevel("ERROR")
+
+ // Writes Dataframe to CarbonData file:
+ import spark.implicits._
+
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
+
+ // Create target carbon table and populate with initial data
+ spark.sql(
+ s"""
+ | CREATE TABLE ${streamTableName}(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'""".stripMargin)
+
+ // Generate CSV data and write to CSV file
+ StreamingExampleUtil.generateCSVDataFile(spark, 1, csvDataDir, SaveMode.Overwrite)
+
+ // load the table
+ // scalastyle:off
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$csvDataDir'
+ | INTO TABLE ${streamTableName}
+ | OPTIONS('FILEHEADER'='id,name,city,salary'
+ | )""".stripMargin)
+
+
+ spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
+
+ // Create server socket in main thread
+ val serverSocket = StreamingExampleUtil.createserverSocket(host, port)
+
+ // Start client thread to receive streaming data and write into carbon
+ val streamWriterThread: Thread = new Thread() {
+ override def run(): Unit= {
+
+ try {
+ // Setup read stream to read input data from socket
+ val readSocketDF = spark.readStream
+ .format("socket")
+ .option("host", host)
+ .option("port", port)
+ .load()
+
+ // Write data from socket stream to carbondata file
+ val qry = readSocketDF.writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime("2 seconds"))
+ .option("checkpointLocation", ckptLocation)
+ .option("path", streamTablePath)
+ .start()
+
+ qry.awaitTermination()
+ } catch {
+ case e: InterruptedException => println("Done reading and writing streaming data")
+ }
+ }
+ }
+ streamWriterThread.start()
+
+ // wait for client to connection request and accept
+ val clientSocket = StreamingExampleUtil.waitToForClientConnection(serverSocket.get)
+
+ // Write to client's connected socket every 2 seconds, for 5 times
+ StreamingExampleUtil.writeToSocket(clientSocket, 5, 2, 11)
+
+ Thread.sleep(2000)
+ // interrupt client thread to stop streaming query
+ streamWriterThread.interrupt()
+ //wait for client thread to finish
+ streamWriterThread.join()
+
+ //Close the server socket
+ serverSocket.get.close()
+
+ // verify streaming data is added into the table
+ // spark.sql(s""" SELECT * FROM ${streamTableName} """).show()
+
+ // Cleanup residual files and table data
+ StreamingExampleUtil.cleanUpDir(csvDataDir, ckptLocation)
+ spark.sql(s"DROP TABLE IF EXISTS ${streamTableName}")
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
new file mode 100644
index 0000000..6eab491
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/utils/StreamingExampleUtil.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples.utils
+
+import java.io.{IOException, PrintWriter}
+import java.net.{ServerSocket, Socket}
+
+import scala.tools.nsc.io.Path
+
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+
+
+/**
+ * Utility functions for streaming ingest examples
+ */
+
+// scalastyle:off println
+object StreamingExampleUtil {
+
+ // Clean up directories recursively, accepts variable arguments
+ def cleanUpDir(dirPaths: String*): Unit = {
+
+ // if (args.length < 1) {
+ if (dirPaths.size < 1) {
+ System.err.println("Usage: StreamingCleanupUtil <dirPath> [dirpath]...")
+ System.exit(1)
+ }
+
+ var i = 0
+ while (i < dirPaths.size) {
+ try {
+ val path: Path = Path(dirPaths(i))
+ path.deleteRecursively()
+ } catch {
+ case ioe: IOException => println("IO Exception while deleting files recursively" + ioe)
+ }
+ i = i + 1
+ }
+ }
+
+ // Generates csv data and write to csv files at given path
+ def generateCSVDataFile(spark: SparkSession,
+ idStart: Int,
+ csvDirPath: String,
+ saveMode: SaveMode): Unit = {
+ // Create csv data frame file
+ val csvRDD = spark.sparkContext.parallelize(1 to 10)
+ .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id))
+ val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary")
+
+
+ csvDataDF.write
+ .option("header", "false")
+ .mode(saveMode)
+ .csv(csvDirPath)
+ }
+
+ // Generates csv data frame and returns to caller
+ def generateCSVDataDF(spark: SparkSession,
+ idStart: Int): DataFrame = {
+ // Create csv data frame file
+ val csvRDD = spark.sparkContext.parallelize(1 to 10)
+ .map(id => (id, "name_ABC", "city_XYZ", 10000.00*id))
+ val csvDataDF = spark.createDataFrame(csvRDD).toDF("id", "name", "city", "salary")
+ csvDataDF
+ }
+
+ // Create server socket for socket streaming source
+ def createserverSocket(host: String, port: Int): Option[ServerSocket] = {
+ try {
+ Some(new ServerSocket(port))
+ } catch {
+ case e: java.net.ConnectException =>
+ println("Error Connecting to" + host + ":" + port, e)
+ None
+ }
+ }
+
+ // Create server socket for socket streaming source
+ def waitToForClientConnection(serverSocket: ServerSocket): Socket = {
+ serverSocket.accept()
+ }
+
+ // Create server socket for socket streaming source
+ def closeServerSocket(serverSocket: ServerSocket): Unit = {
+ serverSocket.close()
+ }
+
+ // write periodically on given socket
+ def writeToSocket(clientSocket: Socket,
+ iterations: Int,
+ delay: Int,
+ startID: Int): Unit = {
+
+ var nItr = 10
+ var nDelay = 5
+
+ // iterations range check
+ if (iterations >= 1 || iterations <= 50) {
+ nItr = iterations
+ } else {
+ println("Number of iterations exceeds limit. Setting to default 10 iterations")
+ }
+
+ // delay range check (1 second to 60 seconds)
+ if (delay >= 1 || delay <= 60) {
+ nDelay = delay
+ } else {
+ println("Delay exceeds the limit. Setting it to default 2 seconds")
+ }
+
+ val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+
+ var j = startID
+
+ for (i <- startID to startID + nItr) {
+ // write 5 records per iteration
+ for (id <- j to j + 5 ) {
+ socketWriter.println(id.toString + ", name_" + i
+ + ", city_" + i + ", " + (i*10000.00).toString)
+ }
+ j = j + 5
+ socketWriter.flush()
+ Thread.sleep(nDelay*1000)
+ }
+ socketWriter.close()
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index d496de2..6eacb19 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -17,14 +17,20 @@
package org.apache.spark.sql
+import java.io.{BufferedWriter, FileWriter, IOException}
+import java.util.UUID
+
import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.language.implicitConversions
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job
+import org.apache.parquet.schema.InvalidSchemaException
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory}
import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
@@ -33,10 +39,12 @@ import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types._
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -44,7 +52,6 @@ import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
/**
* Carbon relation provider compliant to data source api.
* Creates carbon relations
@@ -52,7 +59,11 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class CarbonSource extends CreatableRelationProvider with RelationProvider
with SchemaRelationProvider with DataSourceRegister with FileFormat {
- override def shortName(): String = "carbondata"
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ override def shortName(): String = {
+ "carbondata"
+ }
// will be called if hive supported create table command is provided
override def createRelation(sqlContext: SQLContext,
@@ -181,8 +192,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
/**
* Returns the path of the table
- *
- * @param sparkSession
+ * @param sparkSession
* @param dbName
* @param tableName
* @return
@@ -217,22 +227,196 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
* be put here. For example, user defined output committer can be configured here
* by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
*/
- def prepareWrite(
+ override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
+ dataSchema: StructType): OutputWriterFactory = {
+
+ // Check if table with given path exists
+ // validateTable(options.get("path").get)
+ validateTable(options("path"))
+
+ /* Check id streaming data schema matches with carbon table schema
+ * Data from socket source does not have schema attached to it,
+ * Following check is to ignore schema validation for socket source.
+ */
+ if (!(dataSchema.size.equals(1) &&
+ dataSchema.fields(0).dataType.equals(StringType))) {
+ val path = options.get("path")
+ val tablePath: String = path match {
+ case Some(value) => value
+ case None => ""
+ }
+
+ val carbonTableSchema: org.apache.carbondata.format.TableSchema =
+ getTableSchema(sparkSession: SparkSession, tablePath: String)
+ val isSchemaValid = validateSchema(carbonTableSchema, dataSchema)
+
+ if(!isSchemaValid) {
+ LOGGER.error("Schema Validation Failed: streaming data schema"
+ + "does not match with carbon table schema")
+ throw new InvalidSchemaException("Schema Validation Failed : " +
+ "streaming data schema does not match with carbon table schema")
+ }
+ }
+ new CarbonStreamingOutputWriterFactory()
+ }
/**
- * When possible, this method should return the schema of the given `files`. When the format
- * does not support inference, or no valid files are given should return None. In these cases
- * Spark will require that user specify the schema manually.
+ * Read schema from existing carbon table
+ * @param sparkSession
+ * @param tablePath carbon table path
+ * @return TableSchema read from provided table path
*/
- def inferSchema(
+ private def getTableSchema(
+ sparkSession: SparkSession,
+ tablePath: String): org.apache.carbondata.format.TableSchema = {
+
+ val formattedTablePath = tablePath.replace('\\', '/')
+ val names = formattedTablePath.split("/")
+ if (names.length < 3) {
+ throw new IllegalArgumentException("invalid table path: " + tablePath)
+ }
+ val tableName : String = names(names.length - 1)
+ val dbName : String = names(names.length - 2)
+ val storePath = formattedTablePath.substring(0,
+ formattedTablePath.lastIndexOf
+ (dbName.concat(CarbonCommonConstants.FILE_SEPARATOR)
+ .concat(tableName)) - 1)
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val thriftTableInfo: org.apache.carbondata.format.TableInfo =
+ metastore.getThriftTableInfo(new CarbonTablePath(storePath, dbName, tableName))(sparkSession)
+
+ val factTable: org.apache.carbondata.format.TableSchema = thriftTableInfo.getFact_table
+ factTable
+ }
+
+ /**
+ * Validates streamed schema against existing table schema
+ * @param carbonTableSchema existing carbon table schema
+ * @param dataSchema streamed data schema
+ * @return true if schema validation is successful else false
+ */
+ private def validateSchema(
+ carbonTableSchema: org.apache.carbondata.format.TableSchema,
+ dataSchema: StructType): Boolean = {
+
+ val columnnSchemaValues = carbonTableSchema.getTable_columns
+ .asScala.sortBy(_.schemaOrdinal)
+
+ var columnDataTypes = new ListBuffer[String]()
+ columnnSchemaValues.foreach(columnDataType =>
+ columnDataTypes.append(columnDataType.data_type.toString))
+ val tableColumnDataTypeList = columnDataTypes.toList
+
+ var streamSchemaDataTypes = new ListBuffer[String]()
+ dataSchema.fields.foreach(item => streamSchemaDataTypes
+ .append(mapStreamingDataTypeToString(item.dataType.toString)))
+ val streamedDataTypeList = streamSchemaDataTypes.toList
+
+ val isValid = tableColumnDataTypeList == streamedDataTypeList
+ isValid
+ }
+
+ /**
+ * Maps streamed datatype to carbon datatype
+ * @param dataType
+ * @return String
+ */
+ def mapStreamingDataTypeToString(dataType: String): String = {
+ import org.apache.carbondata.format.DataType
+ dataType match {
+ case "IntegerType" => DataType.INT.toString
+ case "StringType" => DataType.STRING.toString
+ case "DateType" => DataType.DATE.toString
+ case "DoubleType" => DataType.DOUBLE.toString
+ case "FloatType" => DataType.DOUBLE.toString
+ case "LongType" => DataType.LONG.toString
+ case "ShortType" => DataType.SHORT.toString
+ case "TimestampType" => DataType.TIMESTAMP.toString
+ }
+ }
+
+ /**
+ * Validates if given table exists or throws exception
+ * @param tablePath existing carbon table path
+ * @return None
+ */
+ private def validateTable(tablePath: String): Unit = {
+
+ val formattedTablePath = tablePath.replace('\\', '/')
+ val names = formattedTablePath.split("/")
+ if (names.length < 3) {
+ throw new IllegalArgumentException("invalid table path: " + tablePath)
+ }
+ val tableName : String = names(names.length - 1)
+ val dbName : String = names(names.length - 2)
+ val storePath = formattedTablePath.substring(0,
+ formattedTablePath.lastIndexOf
+ (((dbName.concat(CarbonCommonConstants.FILE_SEPARATOR).toString)
+ .concat(tableName)).toString) - 1)
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
+ new AbsoluteTableIdentifier(storePath,
+ new CarbonTableIdentifier(dbName, tableName,
+ UUID.randomUUID().toString))
+
+ if (!checkIfTableExists(absoluteTableIdentifier)) {
+ throw new NoSuchTableException(dbName, tableName)
+ }
+ }
+
+ /**
+ * Checks if table exists by checking its schema file
+ * @param absoluteTableIdentifier
+ * @return Boolean
+ */
+ private def checkIfTableExists(absoluteTableIdentifier: AbsoluteTableIdentifier): Boolean = {
+ val carbonTablePath: CarbonTablePath = CarbonStorePath
+ .getCarbonTablePath(absoluteTableIdentifier)
+ val schemaFilePath: String = carbonTablePath.getSchemaFilePath
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)
+ }
+
+ /**
+ * If user wants to stream data from carbondata table source
+ * and if following conditions are true:
+ * 1. No schema provided by the user in readStream()
+ * 2. spark.sql.streaming.schemaInference is set to true
+ * carbondata can infer table schema from a valid table path
+ * The schema inference is not mandatory, but good have.
+ * When possible, this method should return the schema of the given `files`.
+ * If the format does not support schema inference, or no valid files
+ * are given it should return None. In these cases Spark will require that
+ * user specify the schema manually.
+ */
+ override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
- files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType))
+ files: Seq[FileStatus]): Option[StructType] = {
+
+ val path = options.get("path")
+ val tablePath: String = path match {
+ case Some(value) => value
+ case None => ""
+ }
+ // Check if table with given path exists
+ validateTable(tablePath)
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val carbonTableSchema: org.apache.carbondata.format.TableSchema =
+ getTableSchema(sparkSession: SparkSession, tablePath: String)
+ val columnnSchemaValues = carbonTableSchema.getTable_columns
+ .asScala.sortBy(_.schemaOrdinal)
+
+ var structFields = new ArrayBuffer[StructField]()
+ columnnSchemaValues.foreach(columnSchema =>
+ structFields.append(StructField(columnSchema.column_name,
+ CatalystSqlParser.parseDataType(columnSchema.data_type.toString), true)))
+ Some(new StructType(structFields.toArray))
+ }
}
object CarbonSource {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
deleted file mode 100644
index be69885..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.streaming
-
-
-import java.util.concurrent.ConcurrentHashMap
-
-import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.spark.sql.execution.datasources.OutputWriterFactory
-import org.apache.spark.sql.types.StructType
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
-
-class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
-
- /**
- * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
- * this method gets called by each task on executor side
- * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
- *
- * @param path Path to write the file.
- * @param dataSchema Schema of the rows to be written. Partition columns are not
- * included in the schema if the relation being written is
- * partitioned.
- * @param context The Hadoop MapReduce task context.
- */
-
- override def newInstance(
- path: String,
-
- dataSchema: StructType,
-
- context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
-
- new CarbonStreamingOutputWriter(path, context)
- }
-
- override def getFileExtension(context: TaskAttemptContext): String = {
-
- CarbonTablePath.STREAM_FILE_NAME_EXT
- }
-
-}
-
-object CarbonStreamingOutpurWriterFactory {
-
- private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
-
- def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
-
- if (writers.contains(path)) {
- throw new IllegalArgumentException(path + "writer already exists")
- }
-
- writers.put(path, writer)
- }
-
- def getWriter(path: String): CarbonStreamingOutputWriter = {
-
- writers.get(path)
- }
-
- def containsWriter(path: String): Boolean = {
-
- writers.containsKey(path)
- }
-
- def removeWriter(path: String): Unit = {
-
- writers.remove(path)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
new file mode 100644
index 0000000..c5e4226
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriteFactory.scala
@@ -0,0 +1,88 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.streaming
+
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+
+class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
+
+ /**
+ * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
+ * this method gets called by each task on executor side
+ * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
+ *
+ * @param path Path to write the file.
+ * @param dataSchema Schema of the rows to be written. Partition columns are not
+ * included in the schema if the relation being written is
+ * partitioned.
+ * @param context The Hadoop MapReduce task context.
+ */
+
+ override def newInstance(
+ path: String,
+
+ dataSchema: StructType,
+
+ context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
+
+ new CarbonStreamingOutputWriter(path, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+
+ CarbonTablePath.STREAM_FILE_NAME_EXT
+ }
+
+}
+
+object CarbonStreamingOutputWriterFactory {
+
+ private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
+
+ def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
+
+ if (writers.contains(path)) {
+ throw new IllegalArgumentException(path + "writer already exists")
+ }
+
+ writers.put(path, writer)
+ }
+
+ def getWriter(path: String): CarbonStreamingOutputWriter = {
+
+ writers.get(path)
+ }
+
+ def containsWriter(path: String): Boolean = {
+
+ writers.containsKey(path)
+ }
+
+ def removeWriter(path: String): Unit = {
+
+ writers.remove(path)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c987392/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
new file mode 100644
index 0000000..f00eea5
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/streaming/CarbonSourceSchemaValidationTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata.streaming
+
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.sql.common.util.Spark2QueryTest
+import org.apache.spark.sql.{CarbonSource, SparkSession}
+import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test for schema validation during streaming ingestion
+ * Validates streamed schema(source) against existing table(target) schema.
+ */
+
+class CarbonSourceSchemaValidationTest extends Spark2QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll() {
+ sql("DROP TABLE IF EXISTS _carbon_stream_table_")
+ }
+
+ test("Testing validate schema method with correct values ") {
+
+ val spark = SparkSession.builder
+ .appName("StreamIngestSchemaValidation")
+ .master("local")
+ .getOrCreate()
+
+ val carbonSource = new CarbonSource
+ val job = new Job()
+ val warehouseLocation = TestQueryExecutor.warehouse
+
+ sql("CREATE TABLE _carbon_stream_table_(id int,name string)STORED BY 'carbondata'")
+ val tablePath: String = s"$warehouseLocation/default/_carbon_stream_table_"
+ val dataSchema = StructType(Array(StructField("id", IntegerType, true), StructField("name", StringType, true)))
+ val res = carbonSource.prepareWrite(spark, job, Map("path" -> tablePath), dataSchema)
+ assert(res.isInstanceOf[CarbonStreamingOutputWriterFactory])
+ }
+
+ override def afterAll() {
+ sql("DROP TABLE IF EXISTS _carbon_stream_table_")
+ }
+
+}