You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by ajithme <gi...@git.apache.org> on 2018/07/12 04:06:55 UTC

[GitHub] carbondata pull request #2495: Added for kafka integration with Carbon Strea...

GitHub user ajithme opened a pull request:

    https://github.com/apache/carbondata/pull/2495

    Added for kafka integration with Carbon StreamSQL

    1. Pass source table properties to streamReader.load()
    2. Do not pass schema when sparkSession.readStream
    3. Remove querySchema validation against sink as dataFrame made from kafka source will not have schema ( its written in value column of schema )
    4. Extract the dataframe from kafka source which contain actual data schema @ writeStream
    
    Be sure to do all of the following checklist to help us incorporate 
    your contribution quickly and easily:
    
     - [ ] Any interfaces changed? NO
     
     - [ ] Any backward compatibility impacted? NO
     
     - [ ] Document update required? Yes: Need to use CSV parser
    
     - [ ] Testing done Done
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ajithme/carbondata kafkaStreamSQLIntegration

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2495
    
----
commit 0560c5e69c61d6594a91994da918493335bd0cb4
Author: Ajith <aj...@...>
Date:   2018-07-12T03:47:22Z

    Added for kafka integration with Carbon StreamSQL
    1. Pass source table properties to streamReader.load()
    2. Do not pass schema when sparkSession.readStream
    3. Remove querySchema validation against sink as dataFrame made from kafka source will not have schema ( its written in value column of schema )
    4. Extract the dataframe from kafka source which contain actual data schema @ writeStream

----


---

[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2495#discussion_r202222509
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala ---
    @@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand(
       override def processData(sparkSession: SparkSession): Seq[Row] = {
         val df = sparkSession.sql(query)
         var sourceTable: CarbonTable = null
    +    var dataFrame: Option[DataFrame] = None
     
         // find the streaming source table in the query
         // and replace it with StreamingRelation
    -    val streamLp = df.logicalPlan transform {
    +    df.logicalPlan transform {
           case r: LogicalRelation
             if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
                r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
    -        val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r)
    +        val (source, resolvedFrame) = prepareDataFrame(sparkSession, r)
    --- End diff --
    
    please add comment here to describe what is done inside prepareDataFrame


---

[GitHub] carbondata issue #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integration wit...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:

    https://github.com/apache/carbondata/pull/2495
  
    LGTM. Merging into carbonstore branch
    Thanks for working on this.


---

[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2495#discussion_r202222238
  
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---
    @@ -58,14 +58,16 @@ object StreamJobManager {
                                                     "streaming sink table " +
                                                     "('streaming' tblproperty is not 'sink' or 'true')")
         }
    +    // TODO: validate query schema against sink ( as in kafka we cannot get schema directly)
    +    /*
    --- End diff --
    
    can we move this validation before, so that we still do validation for non-kafka format?


---

[GitHub] carbondata issue #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integration wit...

Posted by ajithme <gi...@git.apache.org>.
Github user ajithme commented on the issue:

    https://github.com/apache/carbondata/pull/2495
  
    Merged https://github.com/apache/carbondata/commit/9ac55a5a656ebe106697ca76a04916bea2ef3109


---

[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2495#discussion_r202222412
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala ---
    @@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand(
       override def processData(sparkSession: SparkSession): Seq[Row] = {
         val df = sparkSession.sql(query)
         var sourceTable: CarbonTable = null
    +    var dataFrame: Option[DataFrame] = None
     
         // find the streaming source table in the query
         // and replace it with StreamingRelation
    --- End diff --
    
    please modify this comment to describe the updated code


---

[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

Posted by ajithme <gi...@git.apache.org>.
Github user ajithme closed the pull request at:

    https://github.com/apache/carbondata/pull/2495


---

[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2495#discussion_r202222321
  
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---
    @@ -102,14 +104,23 @@ object StreamJobManager {
         }
     
         validateSourceTable(sourceTable)
    -    validateSinkTable(streamDf.schema, sinkTable)
    +
    +    // kafka surce always have fixed schema, need to get actual schema
    +    val isKafka = Option(sourceTable.getTableInfo.getFactTable.getTableProperties
    --- End diff --
    
    Please add a function in CarbonTable to get the underlying format


---

[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

Posted by ajithme <gi...@git.apache.org>.
Github user ajithme commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2495#discussion_r202232048
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala ---
    @@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand(
       override def processData(sparkSession: SparkSession): Seq[Row] = {
         val df = sparkSession.sql(query)
         var sourceTable: CarbonTable = null
    +    var dataFrame: Option[DataFrame] = None
     
         // find the streaming source table in the query
         // and replace it with StreamingRelation
    -    val streamLp = df.logicalPlan transform {
    +    df.logicalPlan transform {
           case r: LogicalRelation
             if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
                r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
    -        val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r)
    +        val (source, resolvedFrame) = prepareDataFrame(sparkSession, r)
    --- End diff --
    
    Added method comments


---

[GitHub] carbondata issue #2495: Added for kafka integration with Carbon StreamSQL

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2495
  
    Can one of the admins verify this patch?


---

[GitHub] carbondata issue #2495: Added for kafka integration with Carbon StreamSQL

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2495
  
    Can one of the admins verify this patch?


---

[GitHub] carbondata issue #2495: Added for kafka integration with Carbon StreamSQL

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2495
  
    Can one of the admins verify this patch?


---