You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/23 03:54:22 UTC

[GitHub] [iceberg] zhangdove opened a new issue #1230: How to read/write iceberg in Spark Structed Streaming

zhangdove opened a new issue #1230:
URL: https://github.com/apache/iceberg/issues/1230


   I did some test consume kafka message, write to iceberg table by Spark structed streaming. I'm having some trouble.
   
   1.My environment
   ```
   Spark version:3.0.0
   Iceberg version:0.9.0
   ```
   
   2.Create Iceberg table
   ```
     def createPartitionTable(catalog: HadoopCatalog, tableIdentifier: TableIdentifier): Unit = {
       val columns: List[Types.NestedField] = new ArrayList[Types.NestedField]
       columns.add(Types.NestedField.of(1, true, "id", Types.IntegerType.get, "id doc"))
       columns.add(Types.NestedField.of(2, true, "name", Types.StringType.get, "name doc"))
   
       val schema: Schema = new Schema(columns)
       val table = catalog.createTable(tableIdentifier, schema, PartitionSpec.unpartitioned())
     }
   ```
   
   3.The pseudocode is as follows
   ```scala
       val (dbName, tbName, kafkatopic, bootstrapServers) = ("testNs", "doveTb", "topic", "ip:9092....")
   
       val spark = SparkSession.builder()
         .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
         .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
         .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://nameservice1/iceberg/warehouse")
         .getOrCreate()
   
       // 1. read kafka data
       val streamingDF = spark.readStream.format("kafka")
         .option("kafka.bootstrap.servers", bootstrapServers)
         .option("subscribe", kafkatopic)
         .load()
   
       // 2. consume message
       streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
         executorBatchDf(spark, batchDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").toDF("key", "value"), batchId)
       }
   
       def executorBatchDf(spark: SparkSession, batchDF: DataFrame, batchId: Long): Unit = {
         batchDF.persist()
         val icebergHadoopWarehouse = spark.sparkContext.getConf.get("spark.sql.catalog.hadoop_prod.warehouse")
   
         val selectArray = Array("database", "table", "type", "data")
         val kafkaSourceDF = batchDF.filter(_.get(1) != null)
           .select(json_tuple(batchDF("value"), selectArray: _*))
           .toDF(selectArray: _*)
   
         println(s"kafkaSourceDF println(batchId:${batchId})")
         kafkaSourceDF.show(false)
   
         // case one : read table by spark.table("prod.db.table")
         // val icebergTableDF = spark.table(s"hadoop_prod.${schemaName}.${tableName}")
         // case two : read table by spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")
         val icebergTableDF = spark.read.format("iceberg").load(s"${icebergHadoopWarehouse}/${dbName}/${tbName}")
   
         println(s"icebergTableDF println(batchId:${batchId})")
         icebergTableDF.show(false)
   
         val insertDf = kafkaSourceDF
           .filter(kafkaSourceDF("type") === "insert")
           .select(from_json(kafkaSourceDF("data"), icebergTableDF.schema))
           .toDF("struct")
           .selectExpr(icebergTableDF.schema.fieldNames.map(row => "struct." + row): _*)
   
         val df = insertDf.union(icebergTableDF)
   
         df.writeTo(s"hadoop_prod.${dbName}.${tbName}")
           .overwrite(lit(true))
         batchDF.unpersist()
       }
   ```
   4.Kafka Message
   ```
   {"database": "testNs","table": "doveTb","type": "insert","data": {"id": 1,"name": "dove1"}}
   {"database": "testNs","table": "doveTb","type": "insert","data": {"id": 2,"name": "dove2"}}
   ```
   5.Result
   case one : read table by spark.table("prod.db.table").
   Result : Read iceberg table is error(table is empty) when the second batch .
   ```
   kafkaSourceDF println(batchId:1)
   +--------+------+------+-----------------------+
   |database|table |type  |data                   |
   +--------+------+------+-----------------------+
   |testNs  |doveTb|insert|{"id":1,"name":"dove1"}|
   +--------+------+------+-----------------------+
   
   icebergTableDF println(batchId:1)
   +---+----+
   |id |name|
   +---+----+
   +---+----+
   
   kafkaSourceDF println(batchId:2)
   +--------+------+------+-----------------------+
   |database|table |type  |data                   |
   +--------+------+------+-----------------------+
   |testNs  |doveTb|insert|{"id":2,"name":"dove2"}|
   +--------+------+------+-----------------------+
   
   icebergTableDF println(batchId:2)
   +---+----+
   |id |name|
   +---+----+
   +---+----+
   ```
   case two : read table by spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table").
   Result : normal.
   ```
   kafkaSourceDF println(batchId:1)
   +--------+------+------+-----------------------+
   |database|table |type  |data                   |
   +--------+------+------+-----------------------+
   |testNs  |doveTb|insert|{"id":1,"name":"dove1"}|
   +--------+------+------+-----------------------+
   
   icebergTableDF println(batchId:1)
   +---+----+
   |id |name|
   +---+----+
   +---+----+
   
   kafkaSourceDF println(batchId:2)
   +--------+------+------+-----------------------+
   |database|table |type  |data                   |
   +--------+------+------+-----------------------+
   |testNs  |doveTb|insert|{"id":2,"name":"dove2"}|
   +--------+------+------+-----------------------+
   
   icebergTableDF println(batchId:2)
   +---+-----+
   |id |name |
   +---+-----+
   |1  |dove1|
   +---+-----+
   ```
   
   6.Question
   The phenomenon is `spark.table("prod.db.table")` is not refreshed iceberg table when the next batch. However, `spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")` does the opposite by automatically refreshing.
   Is there a difference between `spark.table("prod.db.table")` and `spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")`?
   
   I'm not sure if I'm using it the wrong way.
   
   Link: https://iceberg.apache.org/spark/#querying-with-dataframes
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangdove commented on issue #1230: How to read/write iceberg in Spark Structed Streaming

Posted by GitBox <gi...@apache.org>.
zhangdove commented on issue #1230:
URL: https://github.com/apache/iceberg/issues/1230#issuecomment-713373058


   At least I can close the current issue now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangdove commented on issue #1230: How to read/write iceberg in Spark Structed Streaming

Posted by GitBox <gi...@apache.org>.
zhangdove commented on issue #1230:
URL: https://github.com/apache/iceberg/issues/1230#issuecomment-713372597


   When Analyzing Iceberg's Catalog, I find that There is still an issue left here, and I have made some new discoveries:
   
   `spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")` By this way, Iceberg table loading does not use the Iceberg Catalog. Of course, Iceberg's metadata information will not be cached. Instead, Iceberg Table will be obtained directly by using `IcebergSource.findTable(options,conf)`.
   
   However, when Iceberg table is loaded using `spark.table("prod.db.table")`, CachingCatalog(`cache-enabled`default value is true) automatically looks for Iceberg table from the cache(Caffeine Cache).
   
   Finally, whether it is incorrect that I find that the description of the document [in this place](https://github.com/apache/iceberg/blob/master/site/docs/spark.md#querying-with-dataframes)?
   
   The correct description should not be this ?
   `Using spark.table("prod.db.table") loads an isolated table reference that is not refreshed when other queries update the table.`
   
   @rdblue How do you think this description? Should we update this place?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangdove commented on issue #1230: How to read/write iceberg in Spark Structed Streaming

Posted by GitBox <gi...@apache.org>.
zhangdove commented on issue #1230:
URL: https://github.com/apache/iceberg/issues/1230#issuecomment-663332474


   @HeartSaVioR Thansks for your reply.Maybe there's something wrong with the description of my issue name.
   
   I have tested some differences between `spark.table("prod.db.table")` and `spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")`(Spark Structed Streaming), I wonder why


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangdove closed issue #1230: How to read/write iceberg in Spark Structed Streaming

Posted by GitBox <gi...@apache.org>.
zhangdove closed issue #1230:
URL: https://github.com/apache/iceberg/issues/1230


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on issue #1230: How to read/write iceberg in Spark Structed Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #1230:
URL: https://github.com/apache/iceberg/issues/1230#issuecomment-663349963


   Sorry for that case I have no idea. I'm also starting to explore the project.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HeartSaVioR commented on issue #1230: How to read/write iceberg in Spark Structed Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #1230:
URL: https://github.com/apache/iceberg/issues/1230#issuecomment-663320473


   Document is missing, but micro-batch sink is available for Spark structured streaming so you can just write directly without overwriting table (which means you're rewriting all records per batch).
   
   This is the python code I'm experimenting with Iceberg. I've just written it to python to avoid long compilation - there's nothing specific to python/pyspark, so you can simply do the same with Scala as well.
   
   https://github.com/HeartSaVioR/structured_streaming_experiments/blob/master/src/rate_data_source_to_iceberg.py


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org