You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/02/17 14:01:31 UTC

[7/9] incubator-carbondata git commit: CARBONDATA-697 Jira single_pass is not used while doing data load

CARBONDATA-697 Jira single_pass is not used while doing data load

Written dictionary values in file on shutdown of dictionary server.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e0016e28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e0016e28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e0016e28

Branch: refs/heads/branch-1.0
Commit: e0016e2850ea4b4d9162d66be8a191e9e1bd3949
Parents: 78881d9
Author: BJangir <ba...@gmail.com>
Authored: Mon Feb 6 22:13:55 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Feb 17 19:29:36 2017 +0530

----------------------------------------------------------------------
 .../dataload/TestLoadDataGeneral.scala          |  18 +++
 .../spark/rdd/CarbonDataRDDFactory.scala        |  14 ++
 .../execution/command/carbonTableSchema.scala   | 158 ++++++++++++++-----
 3 files changed, 152 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e0016e28/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 5d9f750..aa18b8f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -130,6 +130,24 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE load_test")
   }
 
+  test("test data loading into table with Single Pass") {
+    sql("DROP TABLE IF EXISTS load_test_singlepass")
+    sql(""" CREATE TABLE load_test_singlepass(id int, name string, city string, age int)
+        STORED BY 'org.apache.carbondata.format' """)
+    val testData = s"$resourcesPath/sample.csv"
+    try {
+      sql(s"LOAD DATA LOCAL INPATH '$testData' into table load_test_singlepass options ('USE_KETTLE'='FALSE','SINGLE_PASS'='TRUE')")
+    } catch {
+      case ex: Exception =>
+        assert(false)
+    }
+    checkAnswer(
+      sql("SELECT id,name FROM load_test_singlepass where name='eason'"),
+      Seq(Row(2,"eason"))
+    )
+    sql("DROP TABLE load_test_singlepass")
+  }
+
   override def afterAll {
     sql("DROP TABLE if exists loadtest")
     sql("drop table if exists invalidMeasures")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e0016e28/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 9024d57..c7f22cc 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -40,6 +40,7 @@ import org.apache.spark.util.SparkUtil
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
+import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -364,6 +365,7 @@ object CarbonDataRDDFactory {
       columnar: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
       useKettle: Boolean,
+      result: Future[DictionaryServer],
       dataFrame: Option[DataFrame] = None,
       updateModel: Option[UpdateTableModel] = None): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -962,6 +964,18 @@ object CarbonDataRDDFactory {
           // TODO : Handle it
           LOGGER.info("********Database updated**********")
         }
+
+        // write dictionary file and shutdown dictionary server
+        if (carbonLoadModel.getUseOnePass) {
+          try {
+            result.get().shutdown()
+          } catch {
+            case ex: Exception =>
+              LOGGER.error("Error while close dictionary server and write dictionary file for " +
+                s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+              throw new Exception("Dataload failed due to error while write dictionary file!")
+          }
+        }
         LOGGER.audit("Data load is successful for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e0016e28/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 6fba830..d1f1771 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -18,6 +18,10 @@
 package org.apache.spark.sql.execution.command
 
 import java.io.File
+import java.util.concurrent.Callable
+import java.util.concurrent.Executors
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Future
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
@@ -37,6 +41,7 @@ import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.encoder.Encoding
@@ -440,6 +445,23 @@ case class LoadTable(
         .setBadRecordsAction(
           TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsLoggerRedirect)
 
+      val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
+        case "true" =>
+          if (!useKettle && StringUtils.isEmpty(allDictionaryPath)) {
+            true
+          } else {
+            LOGGER.error("Can't use single_pass, because SINGLE_PASS and ALL_DICTIONARY_PATH" +
+              "can not be used together, and USE_KETTLE must be set as false")
+            false
+          }
+        case "false" =>
+          false
+        case illegal =>
+          LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
+            "Please set it as 'true' or 'false'")
+          false
+      }
+      carbonLoadModel.setUseOnePass(useOnePass)
       if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) ||
           complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) ||
           delimiter.equalsIgnoreCase(complex_delimiter_level_2)) {
@@ -455,6 +477,9 @@ case class LoadTable(
       carbonLoadModel.setAllDictPath(allDictionaryPath)
 
       val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+      var result: Future[DictionaryServer] = null
+      var executorService: ExecutorService = null
+
       try {
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
@@ -466,54 +491,105 @@ case class LoadTable(
         carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
         GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
 
-        val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
-          val fields = dataFrame.get.schema.fields
-          import org.apache.spark.sql.functions.udf
-          // extracting only segment from tupleId
-          val getSegIdUDF = udf((tupleId: String) =>
-            CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
-          // getting all fields except tupleId field as it is not required in the value
-          var otherFields = fields.toSeq
-            .filter(field => !field.name
-              .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-            .map(field => {
-              if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
-                new Column(field.name
-                  .substring(0,
-                    field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
-              } else {
-
-                new Column(field.name)
-              }
-            })
-
-          // extract tupleId field which will be used as a key
-          val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
-            .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId")
-          // use dataFrameWithoutTupleId as dictionaryDataFrame
-          val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
-          otherFields = otherFields :+ segIdColumn
-          // use dataFrameWithTupleId as loadDataFrame
-          val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
-          (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
-        } else {
-          (dataFrame, dataFrame)
-        }
-        GlobalDictionaryUtil
-          .generateGlobalDictionary(
-            sparkSession.sqlContext,
+        if (carbonLoadModel.getUseOnePass) {
+          val colDictFilePath = carbonLoadModel.getColDictFilePath
+          if (colDictFilePath != null) {
+            val storePath = relation.tableMeta.storePath
+            val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+            val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+              .getCarbonTableIdentifier
+            val carbonTablePath = CarbonStorePath
+              .getCarbonTablePath(storePath, carbonTableIdentifier)
+            val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
+            val dimensions = carbonTable.getDimensionByTableName(
+              carbonTable.getFactTableName).asScala.toArray
+            carbonLoadModel.initPredefDictMap()
+            // generate predefined dictionary
+            GlobalDictionaryUtil
+              .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
+                dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath)
+          }
+          // dictionaryServerClient dictionary generator
+          val dictionaryServerPort = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+              CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
+          carbonLoadModel.setDictionaryServerPort(Integer.parseInt(dictionaryServerPort))
+          val sparkDriverHost = sparkSession.sqlContext.sparkContext.
+            getConf.get("spark.driver.host")
+          carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
+          // start dictionary server when use one pass load.
+          executorService = Executors.newFixedThreadPool(1)
+          result = executorService.submit(new Callable[DictionaryServer]() {
+            @throws[Exception]
+            def call: DictionaryServer = {
+              Thread.currentThread().setName("Dictionary server")
+              val server: DictionaryServer = new DictionaryServer
+              server.startServer(dictionaryServerPort.toInt)
+              server
+            }
+          })
+          CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,
-            dictionaryDataFrame)
-        CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+            kettleHomePath,
+            columnar,
+            partitionStatus,
+            useKettle,
+            result,
+            dataFrame,
+            updateModel)
+        }
+        else {
+          val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
+            val fields = dataFrame.get.schema.fields
+            import org.apache.spark.sql.functions.udf
+            // extracting only segment from tupleId
+            val getSegIdUDF = udf((tupleId: String) =>
+              CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
+            // getting all fields except tupleId field as it is not required in the value
+            var otherFields = fields.toSeq
+              .filter(field => !field.name
+                .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+              .map(field => {
+                if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
+                  new Column(field.name
+                    .substring(0,
+                      field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
+                } else {
+
+                  new Column(field.name)
+                }
+              })
+
+            // extract tupleId field which will be used as a key
+            val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
+              .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId")
+            // use dataFrameWithoutTupleId as dictionaryDataFrame
+            val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
+            otherFields = otherFields :+ segIdColumn
+            // use dataFrameWithTupleId as loadDataFrame
+            val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
+            (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
+          } else {
+            (dataFrame, dataFrame)
+          }
+          GlobalDictionaryUtil
+            .generateGlobalDictionary(
+              sparkSession.sqlContext,
+              carbonLoadModel,
+              relation.tableMeta.storePath,
+              dictionaryDataFrame)
+          CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,
             kettleHomePath,
             columnar,
             partitionStatus,
             useKettle,
+            result,
             loadDataFrame,
             updateModel)
+        }
       } catch {
         case ex: Exception =>
           LOGGER.error(ex)
@@ -522,6 +598,12 @@ case class LoadTable(
       } finally {
         // Once the data load is successful delete the unwanted partition files
         try {
+
+          // shutdown dictionary server thread
+          if (carbonLoadModel.getUseOnePass) {
+            executorService.shutdownNow()
+          }
+
           val fileType = FileFactory.getFileType(partitionLocation)
           if (FileFactory.isFileExist(partitionLocation, fileType)) {
             val file = FileFactory