You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by "gaofeng5096@capinfo.com.cn" <ga...@capinfo.com.cn> on 2020/05/26 07:23:02 UTC

hudi对应的spark-avro的问题



spark版本2.3.2.3.1.0.0-78,提交代码为:
def main(args: Array[String]): Unit = {
  val spark = SparkSession.builder.appName("Demo")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .master("local[3]")
    .getOrCreate()
  //    insert(spark)
  update(spark)
  query(spark)
  //    incremen¬talQueryPermalink(spark)

  spark.stop()
}

/**
 * 插入数据
 *
 * @param spark
 */
def insert(spark: SparkSession): Unit = {
  val tableName = "hudi_archive_test"
  val pathRoot = "/Users/tangxiuhong"
  val basePath = pathRoot + "/deltalake/hudi/"
  val inserts = List(
    """{"id" : 1,  "name": "iteblog", "age" : 101, "ts" : 1, "dt" : "20191212"}""",
    """{"id" : 2, "name": "iteblog_hadoop", "age" : 102, "ts" : 1, "dt" : "20191213"}""",
    """{"id" : 3, "name": "hudi", "age" : 103, "ts" : 2, "dt" : "20191212"}""")

  //    val inserts = List(
  //      """{"id" : 4,  "name": "iteblog", "age" : 102, "ts" : 2, "dt" : "20191212","addr" : "云南"}""",
  //      """{"id" : 5, "name": "iteblog_hadoop", "age" : 103, "ts" : 2, "dt" : "20191213","addr" : "浙江"}""",
  //      """{"id" : 6,  "name": "hudi", "age" : 104, "ts" : 2, "dt" : "20191212","addr" : "云南"}""")
  val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

  df.write.format("org.apache.hudi")
    // 设置主键列名
    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
    // 设置数据更新时间的列名
    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
    // 设置多级分区必须设置为org.apache.hudi.keygen.ComplexKeyGenerator
    .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator")
    // 设置多级分区列
    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt,ts")
    // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
    .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
    // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引
    .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
    // 并行度参数设置
    .option("hoodie.insert.shuffle.parallelism", "2")
    .option("hoodie.upsert.shuffle.parallelism", "2")
    // 表名称设置
    .option(HoodieWriteConfig.TABLE_NAME, tableName)
    .mode(SaveMode.Append)
    .save(basePath)
}
报以上错误是怎么回事呢?


gaofeng5096@capinfo.com.cn

Re: hudi���������spark-avro���������

Posted by Lamber Ken <la...@apache.org>.
嗨,从 hudi-0.5.2 版本后,推荐使用 spark-2.4.4 版本,同时在spark-2.4.4中,spark-avro被单独出来,需要手动添加到 jars 目录中。并不兼容spark-2.3.3版。

如果有其他问题,联系微信 19941866946,进国内微信群。

On 2020/05/26 07:23:02, "gaofeng5096@capinfo.com.cn" <ga...@capinfo.com.cn> wrote: 
> 
> 
> 
> spark版本2.3.2.3.1.0.0-78,提交代码为:
> def main(args: Array[String]): Unit = {
>   val spark = SparkSession.builder.appName("Demo")
>     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>     .master("local[3]")
>     .getOrCreate()
>   //    insert(spark)
>   update(spark)
>   query(spark)
>   //    incremen¬talQueryPermalink(spark)
> 
>   spark.stop()
> }
> 
> /**
>  * 插入数据
>  *
>  * @param spark
>  */
> def insert(spark: SparkSession): Unit = {
>   val tableName = "hudi_archive_test"
>   val pathRoot = "/Users/tangxiuhong"
>   val basePath = pathRoot + "/deltalake/hudi/"
>   val inserts = List(
>     """{"id" : 1,  "name": "iteblog", "age" : 101, "ts" : 1, "dt" : "20191212"}""",
>     """{"id" : 2, "name": "iteblog_hadoop", "age" : 102, "ts" : 1, "dt" : "20191213"}""",
>     """{"id" : 3, "name": "hudi", "age" : 103, "ts" : 2, "dt" : "20191212"}""")
> 
>   //    val inserts = List(
>   //      """{"id" : 4,  "name": "iteblog", "age" : 102, "ts" : 2, "dt" : "20191212","addr" : "云南"}""",
>   //      """{"id" : 5, "name": "iteblog_hadoop", "age" : 103, "ts" : 2, "dt" : "20191213","addr" : "浙江"}""",
>   //      """{"id" : 6,  "name": "hudi", "age" : 104, "ts" : 2, "dt" : "20191212","addr" : "云南"}""")
>   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
> 
>   df.write.format("org.apache.hudi")
>     // 设置主键列名
>     .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
>     // 设置数据更新时间的列名
>     .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
>     // 设置多级分区必须设置为org.apache.hudi.keygen.ComplexKeyGenerator
>     .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator")
>     // 设置多级分区列
>     .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt,ts")
>     // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
>     .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
>     // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引
>     .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
>     // 并行度参数设置
>     .option("hoodie.insert.shuffle.parallelism", "2")
>     .option("hoodie.upsert.shuffle.parallelism", "2")
>     // 表名称设置
>     .option(HoodieWriteConfig.TABLE_NAME, tableName)
>     .mode(SaveMode.Append)
>     .save(basePath)
> }
> 报以上错误是怎么回事呢?
> 
> 
> gaofeng5096@capinfo.com.cn
> 

Re: hudi���������spark-avro���������

Posted by Lamber Ken <la...@apache.org>.
另外一个细节点:用apache的邮箱,收不到图片。

谢谢

On 2020/05/26 07:23:02, "gaofeng5096@capinfo.com.cn" <ga...@capinfo.com.cn> wrote: 
> 
> 
> 
> spark版本2.3.2.3.1.0.0-78,提交代码为:
> def main(args: Array[String]): Unit = {
>   val spark = SparkSession.builder.appName("Demo")
>     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>     .master("local[3]")
>     .getOrCreate()
>   //    insert(spark)
>   update(spark)
>   query(spark)
>   //    incremen¬talQueryPermalink(spark)
> 
>   spark.stop()
> }
> 
> /**
>  * 插入数据
>  *
>  * @param spark
>  */
> def insert(spark: SparkSession): Unit = {
>   val tableName = "hudi_archive_test"
>   val pathRoot = "/Users/tangxiuhong"
>   val basePath = pathRoot + "/deltalake/hudi/"
>   val inserts = List(
>     """{"id" : 1,  "name": "iteblog", "age" : 101, "ts" : 1, "dt" : "20191212"}""",
>     """{"id" : 2, "name": "iteblog_hadoop", "age" : 102, "ts" : 1, "dt" : "20191213"}""",
>     """{"id" : 3, "name": "hudi", "age" : 103, "ts" : 2, "dt" : "20191212"}""")
> 
>   //    val inserts = List(
>   //      """{"id" : 4,  "name": "iteblog", "age" : 102, "ts" : 2, "dt" : "20191212","addr" : "云南"}""",
>   //      """{"id" : 5, "name": "iteblog_hadoop", "age" : 103, "ts" : 2, "dt" : "20191213","addr" : "浙江"}""",
>   //      """{"id" : 6,  "name": "hudi", "age" : 104, "ts" : 2, "dt" : "20191212","addr" : "云南"}""")
>   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
> 
>   df.write.format("org.apache.hudi")
>     // 设置主键列名
>     .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
>     // 设置数据更新时间的列名
>     .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
>     // 设置多级分区必须设置为org.apache.hudi.keygen.ComplexKeyGenerator
>     .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator")
>     // 设置多级分区列
>     .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt,ts")
>     // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
>     .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
>     // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引
>     .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
>     // 并行度参数设置
>     .option("hoodie.insert.shuffle.parallelism", "2")
>     .option("hoodie.upsert.shuffle.parallelism", "2")
>     // 表名称设置
>     .option(HoodieWriteConfig.TABLE_NAME, tableName)
>     .mode(SaveMode.Append)
>     .save(basePath)
> }
> 报以上错误是怎么回事呢?
> 
> 
> gaofeng5096@capinfo.com.cn
>