You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by QiangCai <gi...@git.apache.org> on 2016/09/27 21:24:01 UTC

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Dataloading support ...

GitHub user QiangCai opened a pull request:

    https://github.com/apache/incubator-carbondata/pull/203

    [CARBONDATA-279]Dataloading support loading RDD directly without writing CSV file

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/QiangCai/incubator-carbondata loadRDDToCarbonData

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-carbondata/pull/203.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #203
    
----
commit fd4632eacf751b2bb1f868c1b905aa699fef6a23
Author: c00318382 <c0...@huaweiobz.com>
Date:   2016-09-27T15:14:04Z

    support loading RDD

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82363963
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---
    @@ -36,6 +38,8 @@ class CarbonOption(options: Map[String, String]) {
           "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
       }
     
    +  def tempCSV: String = options.getOrElse("tempCSV", "false")
    --- End diff --
    
    I think set default to true is better, it complies to current behavior


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r80929523
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java ---
    @@ -343,41 +349,57 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K
         } catch (NumberFormatException exc) {
           numberOfNodes = NUM_CORES_DEFAULT_VAL;
         }
    +    if ( rddIteratorKey == null ) {
    +      BlockDetails[] blocksInfo = GraphGenerator.blockInfo.get(meta.getBlocksID());
    +      if (blocksInfo.length == 0) {
    +        //if isDirectLoad = true, and partition number > file num
    +        //then blocksInfo will get empty in some partition processing, so just return
    +        setOutputDone();
    +        return false;
    +      }
     
    -    BlockDetails[] blocksInfo = GraphGenerator.blockInfo.get(meta.getBlocksID());
    -    if (blocksInfo.length == 0) {
    -      //if isDirectLoad = true, and partition number > file num
    -      //then blocksInfo will get empty in some partition processing, so just return
    -      setOutputDone();
    -      return false;
    -    }
    -
    -    if (numberOfNodes > blocksInfo.length) {
    -      numberOfNodes = blocksInfo.length;
    -    }
    +      if (numberOfNodes > blocksInfo.length) {
    +        numberOfNodes = blocksInfo.length;
    +      }
     
    -    //new the empty lists
    -    for (int pos = 0; pos < numberOfNodes; pos++) {
    -      threadBlockList.add(new ArrayList<BlockDetails>());
    -    }
    +      //new the empty lists
    +      for (int pos = 0; pos < numberOfNodes; pos++) {
    +        threadBlockList.add(new ArrayList<BlockDetails>());
    +      }
     
    -    //block balance to every thread
    -    for (int pos = 0; pos < blocksInfo.length; ) {
    -      for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) {
    -        if (pos < blocksInfo.length) {
    -          threadBlockList.get(threadNum).add(blocksInfo[pos++]);
    +      //block balance to every thread
    +      for (int pos = 0; pos < blocksInfo.length; ) {
    +        for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) {
    +          if (pos < blocksInfo.length) {
    +            threadBlockList.get(threadNum).add(blocksInfo[pos++]);
    +          }
             }
           }
    +      LOGGER.info("*****************Started all csv reading***********");
    +      startProcess(numberOfNodes);
    +      LOGGER.info("*****************Completed all csv reading***********");
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
    +              meta.getPartitionID(), System.currentTimeMillis());
    +    } else {
    +      scanRddIterator();
         }
    -    LOGGER.info("*****************Started all csv reading***********");
    -    startProcess(numberOfNodes);
    -    LOGGER.info("*****************Completed all csv reading***********");
    -    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
    -        meta.getPartitionID(), System.currentTimeMillis());
         setOutputDone();
         return false;
       }
     
    +  private void scanRddIterator() throws RuntimeException {
    +    Iterator<String[]> iterator = RddInputUtils.getAndRemove(rddIteratorKey);
    +    if (iterator != null) {
    +      try{
    +        while(iterator.hasNext()){
    +          putRow(data.outputRowMeta, iterator.next());
    --- End diff --
    
    One suggestion:
    Instead of calling putRow here, perhaps we can execute more steps here until reaching merge step which need to wait. 
    I suggest to raise another PR to implement this improvement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r80922605
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -462,3 +478,133 @@ class CarbonDataLoadRDD[K, V](
       }
     }
     
    +class CarbonRDDDataLoadRDD[K, V](
    +                               sc: SparkContext,
    +                               result: DataLoadResult[K, V],
    +                               carbonLoadModel: CarbonLoadModel,
    +                               var storeLocation: String,
    +                               hdfsStoreLocation: String,
    +                               kettleHomePath: String,
    +                               columinar: Boolean,
    +                               loadCount: Integer,
    +                               tableCreationTime: Long,
    +                               schemaLastUpdatedTime: Long,
    +                               prev: RDD[Row])
    +  extends RDD[(K, V)](prev)
    +    with Logging {
    +
    +  sc.setLocalProperty("spark.scheduler.pool", "DDL")
    +
    +  @DeveloperApi
    +  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
    +    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
    +    val resultIter = new Iterator[(K, V)] {
    +      var partitionID = "0"
    +      val loadMetadataDetails = new LoadMetadataDetails()
    +      var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
    +        theSplit.index
    +      try {
    +        loadMetadataDetails.setPartitionCount(partitionID)
    +        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
    +        carbonLoadModel.setPartitionId(partitionID)
    +        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
    +        carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
    +
    +        storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, theSplit.index)
    +        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
    +        val rddIteratorKey = UUID.randomUUID().toString
    +        try{
    +          RddInputUtils.put(rddIteratorKey,
    +            new RddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel))
    +          carbonLoadModel.setRddIteratorKey(rddIteratorKey)
    --- End diff --
    
    Why can't you just set the iterator object itself in the carbonLoadModel instead of putting it in a global map?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82367638
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---
    @@ -17,6 +17,8 @@
     
     package org.apache.carbondata.spark
     
    +import org.apache.spark.sql.{getDB, SQLContext}
    --- End diff --
    
    this import is not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82370864
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -462,3 +478,155 @@ class CarbonDataLoadRDD[K, V](
       }
     }
     
    +/**
    + * Use this RDD class to load RDD
    + * @param sc
    + * @param result
    + * @param carbonLoadModel
    + * @param storeLocation
    + * @param hdfsStoreLocation
    + * @param kettleHomePath
    + * @param columinar
    + * @param loadCount
    + * @param tableCreationTime
    + * @param schemaLastUpdatedTime
    + * @param prev
    + * @tparam K
    + * @tparam V
    + */
    +class CarbonRDDDataLoadRDD[K, V](
    --- End diff --
    
    suggest to change to `DataFrameLoaderRDD`, it means the RDD to load the dataframe into carbon.
    change `CarbonDataLoadRDD` to `DataFileLoaderRDD` or 'CSVFileLoaderRDD`. It matches loadDataFrame and loadDataFile function name accordinality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r80903639
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java ---
    @@ -343,41 +349,57 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K
         } catch (NumberFormatException exc) {
           numberOfNodes = NUM_CORES_DEFAULT_VAL;
         }
    +    if ( rddIteratorKey == null ) {
    --- End diff --
    
    remove extra space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82369947
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -72,8 +76,91 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
       override def hashCode(): Int = 41 * (41 + rddId) + idx
     }
     
    +object CarbonDataLoadRDDFuncs extends Logging{
    --- End diff --
    
    Can we have a more meaningful name for this?
    I suggest `SparkPartitionLoader` (it loads one spark partition into carbon using kettle), and make it a class instead of object, so that it can keep the storeLocation internally instead of returning it in initialize function. One is using the returned storeLocation variable other than run function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82413317
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -72,8 +76,91 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
       override def hashCode(): Int = 41 * (41 + rddId) + idx
     }
     
    +object CarbonDataLoadRDDFuncs extends Logging{
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r80921202
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -462,3 +478,133 @@ class CarbonDataLoadRDD[K, V](
       }
     }
     
    +class CarbonRDDDataLoadRDD[K, V](
    --- End diff --
    
    what is the difference from CarbonDataLoadRDD, can you merge them into one?
    At least there should be some comment to describe this RDD


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r80932289
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---
    @@ -742,7 +742,8 @@ object GlobalDictionaryUtil extends Logging {
        */
       def generateGlobalDictionary(sqlContext: SQLContext,
                                    carbonLoadModel: CarbonLoadModel,
    -                               hdfsLocation: String): Unit = {
    +                               hdfsLocation: String,
    +                               dataFrame: Option[DataFrame] = None): Unit = {
    --- End diff --
    
    I think we can call dataFrame.rdd.cache inside this function, because there are 2 scan, dataframe can be re-used after the first scan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82413313
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -462,3 +478,155 @@ class CarbonDataLoadRDD[K, V](
       }
     }
     
    +/**
    + * Use this RDD class to load RDD
    + * @param sc
    + * @param result
    + * @param carbonLoadModel
    + * @param storeLocation
    + * @param hdfsStoreLocation
    + * @param kettleHomePath
    + * @param columinar
    + * @param loadCount
    + * @param tableCreationTime
    + * @param schemaLastUpdatedTime
    + * @param prev
    + * @tparam K
    + * @tparam V
    + */
    +class CarbonRDDDataLoadRDD[K, V](
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r80900701
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---
    @@ -17,13 +17,15 @@
     
     package org.apache.carbondata.spark
     
    +import org.apache.spark.sql.{getDB, SQLContext}
    +
     /**
      * Contains all options for Spark data source
      */
    -class CarbonOption(options: Map[String, String]) {
    +class CarbonOption(options: Map[String, String], cc: SQLContext) {
       def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
     
    -  def dbName: String = options.getOrElse("dbName", "default")
    +  def dbName: String = getDB.getDatabaseName(options.get("dbName"), cc)
    --- End diff --
    
    Is this modification required for this PR?
    It is better not adding cc into CarbonOption which will restrict its usage


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-carbondata/pull/203


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82387284
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---
    @@ -742,7 +742,8 @@ object GlobalDictionaryUtil extends Logging {
        */
       def generateGlobalDictionary(sqlContext: SQLContext,
                                    carbonLoadModel: CarbonLoadModel,
    -                               hdfsLocation: String): Unit = {
    +                               hdfsLocation: String,
    +                               dataFrame: Option[DataFrame] = None): Unit = {
    --- End diff --
    
    Can you also add a option in CarbonOption to cache the dataframe? So that user can decide whether to cache it. The name of the option can be `cache`, default is `false`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r80927972
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java ---
    @@ -85,6 +87,8 @@
     
       private ExecutorService exec;
     
    +  private String rddIteratorKey = null;
    --- End diff --
    
    can we make another input class instead of CsvInput? Like SparkRowInput?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r80915025
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -145,92 +232,28 @@ class CarbonDataLoadRDD[K, V](
       override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
         val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
         val iter = new Iterator[(K, V)] {
    -      var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
           var partitionID = "0"
    +      val loadMetadataDetails = new LoadMetadataDetails()
           var model: CarbonLoadModel = _
           var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
                                    theSplit.index
           try {
    -        val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
    -        if (null == carbonPropertiesFilePath) {
    -          System.setProperty("carbon.properties.filepath",
    -            System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
    -        }
    +        loadMetadataDetails.setPartitionCount(partitionID)
    +        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
    +
             carbonLoadModel.setSegmentId(String.valueOf(loadCount))
             setModelAndBlocksInfo()
    -        CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
    -        CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
    -        CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
    -        CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true")
    -        CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true")
    -        CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true")
    -        CarbonProperties.getInstance().addProperty("high.cardinality.value", "100000")
    -        CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
    -        CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000")
    -
    -        // this property is used to determine whether temp location for carbon is inside
    -        // container temp dir or is yarn application directory.
    -        val carbonUseLocalDir = CarbonProperties.getInstance()
    -          .getProperty("carbon.use.local.dir", "false")
    -
    -        if(carbonUseLocalDir.equalsIgnoreCase("true")) {
    -          val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
    -          if (null != storeLocations && storeLocations.nonEmpty) {
    -            storeLocation = storeLocations(Random.nextInt(storeLocations.length))
    -          }
    -          if (storeLocation == null) {
    -            storeLocation = System.getProperty("java.io.tmpdir")
    -          }
    -        }
    -        else {
    -          storeLocation = System.getProperty("java.io.tmpdir")
    -        }
    -        storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
    -        dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
    -
    +        storeLocation = CarbonDataLoadRDD.initialize(model, theSplit.index)
    +        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
             if (model.isRetentionRequest) {
               recreateAggregationTableForRetention
             }
             else if (model.isAggLoadRequest) {
    --- End diff --
    
    Is this required? If it is not required, remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82413331
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---
    @@ -36,6 +38,8 @@ class CarbonOption(options: Map[String, String]) {
           "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
       }
     
    +  def tempCSV: String = options.getOrElse("tempCSV", "false")
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82334270
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -462,3 +478,133 @@ class CarbonDataLoadRDD[K, V](
       }
     }
     
    +class CarbonRDDDataLoadRDD[K, V](
    --- End diff --
    
    I will add some comment to describe the difference points.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82333906
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---
    @@ -17,13 +17,15 @@
     
     package org.apache.carbondata.spark
     
    +import org.apache.spark.sql.{getDB, SQLContext}
    +
     /**
      * Contains all options for Spark data source
      */
    -class CarbonOption(options: Map[String, String]) {
    +class CarbonOption(options: Map[String, String], cc: SQLContext) {
       def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
     
    -  def dbName: String = options.getOrElse("dbName", "default")
    +  def dbName: String = getDB.getDatabaseName(options.get("dbName"), cc)
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by chenliang613 <gi...@git.apache.org>.
Github user chenliang613 commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r81246241
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -462,3 +478,133 @@ class CarbonDataLoadRDD[K, V](
       }
     }
     
    +class CarbonRDDDataLoadRDD[K, V](
    --- End diff --
    
    Please add needful annotate for new class CarbonRDDDataLoadRDD. (it is difficult to know what is the different of CarbonRDDDataLoadRDD, CarbonDataLoadRDD)
    Same comments for other place of this PR :  please provide needful annotate for new classes and new functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82334120
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java ---
    @@ -343,41 +349,57 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K
         } catch (NumberFormatException exc) {
           numberOfNodes = NUM_CORES_DEFAULT_VAL;
         }
    +    if ( rddIteratorKey == null ) {
    +      BlockDetails[] blocksInfo = GraphGenerator.blockInfo.get(meta.getBlocksID());
    +      if (blocksInfo.length == 0) {
    +        //if isDirectLoad = true, and partition number > file num
    +        //then blocksInfo will get empty in some partition processing, so just return
    +        setOutputDone();
    +        return false;
    +      }
     
    -    BlockDetails[] blocksInfo = GraphGenerator.blockInfo.get(meta.getBlocksID());
    -    if (blocksInfo.length == 0) {
    -      //if isDirectLoad = true, and partition number > file num
    -      //then blocksInfo will get empty in some partition processing, so just return
    -      setOutputDone();
    -      return false;
    -    }
    -
    -    if (numberOfNodes > blocksInfo.length) {
    -      numberOfNodes = blocksInfo.length;
    -    }
    +      if (numberOfNodes > blocksInfo.length) {
    +        numberOfNodes = blocksInfo.length;
    +      }
     
    -    //new the empty lists
    -    for (int pos = 0; pos < numberOfNodes; pos++) {
    -      threadBlockList.add(new ArrayList<BlockDetails>());
    -    }
    +      //new the empty lists
    +      for (int pos = 0; pos < numberOfNodes; pos++) {
    +        threadBlockList.add(new ArrayList<BlockDetails>());
    +      }
     
    -    //block balance to every thread
    -    for (int pos = 0; pos < blocksInfo.length; ) {
    -      for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) {
    -        if (pos < blocksInfo.length) {
    -          threadBlockList.get(threadNum).add(blocksInfo[pos++]);
    +      //block balance to every thread
    +      for (int pos = 0; pos < blocksInfo.length; ) {
    +        for (int threadNum = 0; threadNum < numberOfNodes; threadNum++) {
    +          if (pos < blocksInfo.length) {
    +            threadBlockList.get(threadNum).add(blocksInfo[pos++]);
    +          }
             }
           }
    +      LOGGER.info("*****************Started all csv reading***********");
    +      startProcess(numberOfNodes);
    +      LOGGER.info("*****************Completed all csv reading***********");
    +      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
    +              meta.getPartitionID(), System.currentTimeMillis());
    +    } else {
    +      scanRddIterator();
         }
    -    LOGGER.info("*****************Started all csv reading***********");
    -    startProcess(numberOfNodes);
    -    LOGGER.info("*****************Completed all csv reading***********");
    -    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
    -        meta.getPartitionID(), System.currentTimeMillis());
         setOutputDone();
         return false;
       }
     
    +  private void scanRddIterator() throws RuntimeException {
    +    Iterator<String[]> iterator = RddInputUtils.getAndRemove(rddIteratorKey);
    +    if (iterator != null) {
    +      try{
    +        while(iterator.hasNext()){
    +          putRow(data.outputRowMeta, iterator.next());
    --- End diff --
    
    I agree, we can merge kettle steps.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82413334
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---
    @@ -17,6 +17,8 @@
     
     package org.apache.carbondata.spark
     
    +import org.apache.spark.sql.{getDB, SQLContext}
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r80912853
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -72,6 +76,89 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
       override def hashCode(): Int = 41 * (41 + rddId) + idx
     }
     
    +object CarbonDataLoadRDD extends Logging{
    --- End diff --
    
    This object is not a RDD. I think what you want is a helper function to do the loading


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82334174
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -72,6 +76,89 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
       override def hashCode(): Int = 41 * (41 + rddId) + idx
     }
     
    +object CarbonDataLoadRDD extends Logging{
    --- End diff --
    
    Rename to CarbonDataLoadRDDFuncs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #203: [CARBONDATA-279]Load RDD to carbonda...

Posted by QiangCai <gi...@git.apache.org>.
Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/203#discussion_r82344172
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala ---
    @@ -462,3 +478,133 @@ class CarbonDataLoadRDD[K, V](
       }
     }
     
    +class CarbonRDDDataLoadRDD[K, V](
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---