You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/02/15 09:17:08 UTC
[1/2] incubator-carbondata git commit: CARBONDATA-697 Jira
single_pass is not used while doing data load
Repository: incubator-carbondata
Updated Branches:
refs/heads/master f3496ee0b -> 35afe80e3
CARBONDATA-697 Jira single_pass is not used while doing data load
Written dictionary values in file on shutdown of dictionary server.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1017384c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1017384c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1017384c
Branch: refs/heads/master
Commit: 1017384ce512cd59abd13c874f5df81514cda19d
Parents: f3496ee
Author: BJangir <ba...@gmail.com>
Authored: Mon Feb 6 22:13:55 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Feb 15 14:45:10 2017 +0530
----------------------------------------------------------------------
.../dataload/TestLoadDataGeneral.scala | 18 +++
.../spark/rdd/CarbonDataRDDFactory.scala | 14 ++
.../execution/command/carbonTableSchema.scala | 158 ++++++++++++++-----
3 files changed, 152 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1017384c/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 5d9f750..aa18b8f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -130,6 +130,24 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE load_test")
}
+ test("test data loading into table with Single Pass") {
+ sql("DROP TABLE IF EXISTS load_test_singlepass")
+ sql(""" CREATE TABLE load_test_singlepass(id int, name string, city string, age int)
+ STORED BY 'org.apache.carbondata.format' """)
+ val testData = s"$resourcesPath/sample.csv"
+ try {
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table load_test_singlepass options ('USE_KETTLE'='FALSE','SINGLE_PASS'='TRUE')")
+ } catch {
+ case ex: Exception =>
+ assert(false)
+ }
+ checkAnswer(
+ sql("SELECT id,name FROM load_test_singlepass where name='eason'"),
+ Seq(Row(2,"eason"))
+ )
+ sql("DROP TABLE load_test_singlepass")
+ }
+
override def afterAll {
sql("DROP TABLE if exists loadtest")
sql("drop table if exists invalidMeasures")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1017384c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 9024d57..c7f22cc 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -40,6 +40,7 @@ import org.apache.spark.util.SparkUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
+import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -364,6 +365,7 @@ object CarbonDataRDDFactory {
columnar: Boolean,
partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
useKettle: Boolean,
+ result: Future[DictionaryServer],
dataFrame: Option[DataFrame] = None,
updateModel: Option[UpdateTableModel] = None): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -962,6 +964,18 @@ object CarbonDataRDDFactory {
// TODO : Handle it
LOGGER.info("********Database updated**********")
}
+
+ // write dictionary file and shutdown dictionary server
+ if (carbonLoadModel.getUseOnePass) {
+ try {
+ result.get().shutdown()
+ } catch {
+ case ex: Exception =>
+ LOGGER.error("Error while close dictionary server and write dictionary file for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ throw new Exception("Dataload failed due to error while write dictionary file!")
+ }
+ }
LOGGER.audit("Data load is successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1017384c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 6fba830..d1f1771 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -18,6 +18,10 @@
package org.apache.spark.sql.execution.command
import java.io.File
+import java.util.concurrent.Callable
+import java.util.concurrent.Executors
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Future
import scala.collection.JavaConverters._
import scala.language.implicitConversions
@@ -37,6 +41,7 @@ import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.encoder.Encoding
@@ -440,6 +445,23 @@ case class LoadTable(
.setBadRecordsAction(
TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsLoggerRedirect)
+ val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
+ case "true" =>
+ if (!useKettle && StringUtils.isEmpty(allDictionaryPath)) {
+ true
+ } else {
+ LOGGER.error("Can't use single_pass, because SINGLE_PASS and ALL_DICTIONARY_PATH" +
+ "can not be used together, and USE_KETTLE must be set as false")
+ false
+ }
+ case "false" =>
+ false
+ case illegal =>
+ LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
+ "Please set it as 'true' or 'false'")
+ false
+ }
+ carbonLoadModel.setUseOnePass(useOnePass)
if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) ||
complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) ||
delimiter.equalsIgnoreCase(complex_delimiter_level_2)) {
@@ -455,6 +477,9 @@ case class LoadTable(
carbonLoadModel.setAllDictPath(allDictionaryPath)
val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+ var result: Future[DictionaryServer] = null
+ var executorService: ExecutorService = null
+
try {
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
@@ -466,54 +491,105 @@ case class LoadTable(
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
- val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
- val fields = dataFrame.get.schema.fields
- import org.apache.spark.sql.functions.udf
- // extracting only segment from tupleId
- val getSegIdUDF = udf((tupleId: String) =>
- CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
- // getting all fields except tupleId field as it is not required in the value
- var otherFields = fields.toSeq
- .filter(field => !field.name
- .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
- .map(field => {
- if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
- new Column(field.name
- .substring(0,
- field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
- } else {
-
- new Column(field.name)
- }
- })
-
- // extract tupleId field which will be used as a key
- val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
- .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId")
- // use dataFrameWithoutTupleId as dictionaryDataFrame
- val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
- otherFields = otherFields :+ segIdColumn
- // use dataFrameWithTupleId as loadDataFrame
- val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
- (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
- } else {
- (dataFrame, dataFrame)
- }
- GlobalDictionaryUtil
- .generateGlobalDictionary(
- sparkSession.sqlContext,
+ if (carbonLoadModel.getUseOnePass) {
+ val colDictFilePath = carbonLoadModel.getColDictFilePath
+ if (colDictFilePath != null) {
+ val storePath = relation.tableMeta.storePath
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ .getCarbonTableIdentifier
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(storePath, carbonTableIdentifier)
+ val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
+ val dimensions = carbonTable.getDimensionByTableName(
+ carbonTable.getFactTableName).asScala.toArray
+ carbonLoadModel.initPredefDictMap()
+ // generate predefined dictionary
+ GlobalDictionaryUtil
+ .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
+ dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath)
+ }
+ // dictionaryServerClient dictionary generator
+ val dictionaryServerPort = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+ CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
+ carbonLoadModel.setDictionaryServerPort(Integer.parseInt(dictionaryServerPort))
+ val sparkDriverHost = sparkSession.sqlContext.sparkContext.
+ getConf.get("spark.driver.host")
+ carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
+ // start dictionary server when use one pass load.
+ executorService = Executors.newFixedThreadPool(1)
+ result = executorService.submit(new Callable[DictionaryServer]() {
+ @throws[Exception]
+ def call: DictionaryServer = {
+ Thread.currentThread().setName("Dictionary server")
+ val server: DictionaryServer = new DictionaryServer
+ server.startServer(dictionaryServerPort.toInt)
+ server
+ }
+ })
+ CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
- dictionaryDataFrame)
- CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+ kettleHomePath,
+ columnar,
+ partitionStatus,
+ useKettle,
+ result,
+ dataFrame,
+ updateModel)
+ }
+ else {
+ val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
+ val fields = dataFrame.get.schema.fields
+ import org.apache.spark.sql.functions.udf
+ // extracting only segment from tupleId
+ val getSegIdUDF = udf((tupleId: String) =>
+ CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
+ // getting all fields except tupleId field as it is not required in the value
+ var otherFields = fields.toSeq
+ .filter(field => !field.name
+ .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+ .map(field => {
+ if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
+ new Column(field.name
+ .substring(0,
+ field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
+ } else {
+
+ new Column(field.name)
+ }
+ })
+
+ // extract tupleId field which will be used as a key
+ val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
+ .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId")
+ // use dataFrameWithoutTupleId as dictionaryDataFrame
+ val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
+ otherFields = otherFields :+ segIdColumn
+ // use dataFrameWithTupleId as loadDataFrame
+ val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
+ (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
+ } else {
+ (dataFrame, dataFrame)
+ }
+ GlobalDictionaryUtil
+ .generateGlobalDictionary(
+ sparkSession.sqlContext,
+ carbonLoadModel,
+ relation.tableMeta.storePath,
+ dictionaryDataFrame)
+ CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
kettleHomePath,
columnar,
partitionStatus,
useKettle,
+ result,
loadDataFrame,
updateModel)
+ }
} catch {
case ex: Exception =>
LOGGER.error(ex)
@@ -522,6 +598,12 @@ case class LoadTable(
} finally {
// Once the data load is successful delete the unwanted partition files
try {
+
+ // shutdown dictionary server thread
+ if (carbonLoadModel.getUseOnePass) {
+ executorService.shutdownNow()
+ }
+
val fileType = FileFactory.getFileType(partitionLocation)
if (FileFactory.isFileExist(partitionLocation, fileType)) {
val file = FileFactory
[2/2] incubator-carbondata git commit: [CARBONDATA-697] Jira
single_pass is not used while doing data load This closes #588
Posted by ra...@apache.org.
[CARBONDATA-697] Jira single_pass is not used while doing data load This closes #588
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/35afe80e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/35afe80e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/35afe80e
Branch: refs/heads/master
Commit: 35afe80e335ead86baa3b7b76fc364d110598d4e
Parents: f3496ee 1017384
Author: ravipesala <ra...@gmail.com>
Authored: Wed Feb 15 14:46:50 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Feb 15 14:46:50 2017 +0530
----------------------------------------------------------------------
.../dataload/TestLoadDataGeneral.scala | 18 +++
.../spark/rdd/CarbonDataRDDFactory.scala | 14 ++
.../execution/command/carbonTableSchema.scala | 158 ++++++++++++++-----
3 files changed, 152 insertions(+), 38 deletions(-)
----------------------------------------------------------------------