You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by "gaofeng5096@capinfo.com.cn" <ga...@capinfo.com.cn> on 2020/06/06 10:15:20 UTC

hudi关于spark2.3版本不兼容的问题

’我们大数据集群spark版本为2.3,然后执行hudi的代码报错:
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.avro.Schema.createUnion([Lorg/apache/avro/Schema;)Lorg/apache/avro/Schema;


代码完整为:
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "spark")
    val spark = SparkSession.builder.appName("Demo")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .master("local[3]")
      .getOrCreate()
        insert(spark)
//    update(spark)
//    query(spark)
    //    incremen¬talQueryPermalink(spark)

    spark.stop()
  }

def insert(spark: SparkSession): Unit = {
  val tableName = "hudi_archive_test"
  val pathRoot = "/Users/tangxiuhong"
  val basePath = pathRoot + "/deltalake/hudi/"
  val inserts = List(
    """{"id" : 1,  "name": "iteblog", "age" : 101, "ts" : 1, "dt" : "20191212"}""",
    """{"id" : 2, "name": "iteblog_hadoop", "age" : 102, "ts" : 1, "dt" : "20191213"}""",
    """{"id" : 3, "name": "hudi", "age" : 103, "ts" : 2, "dt" : "20191212"}""")

  //    val inserts = List(
  //      """{"id" : 4,  "name": "iteblog", "age" : 102, "ts" : 2, "dt" : "20191212","addr" : "云南"}""",
  //      """{"id" : 5, "name": "iteblog_hadoop", "age" : 103, "ts" : 2, "dt" : "20191213","addr" : "浙江"}""",
  //      """{"id" : 6,  "name": "hudi", "age" : 104, "ts" : 2, "dt" : "20191212","addr" : "云南"}""")
  val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
  df.show(10)
  df.write.format("org.apache.hudi")
    // 设置主键列名
    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
    // 设置数据更新时间的列名
    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
    // 设置多级分区必须设置为org.apache.hudi.keygen.ComplexKeyGenerator
    .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator")
    // 设置多级分区列
    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt,ts")
    // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
    .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
    // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引
    .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
    // 并行度参数设置
    .option("hoodie.insert.shuffle.parallelism", "2")
    .option("hoodie.upsert.shuffle.parallelism", "2")
    // 表名称设置
    .option(HoodieWriteConfig.TABLE_NAME, tableName)
    .mode(SaveMode.Append)
    .save(basePath)
}

报错截图:
我应该怎么处理这个问题呢?





gaofeng5096@capinfo.com.cn

Re:hudi关于spark2.3版本不兼容的问题

Posted by lamber-ken <la...@163.com>.


你好,


从1.5.2版本开始,仅支持spark-2.4.4,avro的版本是1.8.2,所以请升级到spark-2.4.4版本,再使用hudi。
另:方便加微信吗?微信号:xleesf,加入国内最大微信群




在 2020-06-06 18:15:20,"gaofeng5096@capinfo.com.cn" <ga...@capinfo.com.cn> 写道:

’我们大数据集群spark版本为2.3,然后执行hudi的代码报错:
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.avro.Schema.createUnion([Lorg/apache/avro/Schema;)Lorg/apache/avro/Schema;




代码完整为:
def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "spark")
val spark = SparkSession.builder.appName("Demo")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .master("local[3]")
      .getOrCreate()
insert(spark)
//    update(spark)
//    query(spark)
    //    incremen¬talQueryPermalink(spark)

spark.stop()
  }




def insert(spark: SparkSession): Unit = {
val tableName = "hudi_archive_test"
val pathRoot = "/Users/tangxiuhong"
val basePath = pathRoot + "/deltalake/hudi/"
val inserts = List(
"""{"id" : 1,  "name": "iteblog", "age" : 101, "ts" : 1, "dt" : "20191212"}""",
"""{"id" : 2, "name": "iteblog_hadoop", "age" : 102, "ts" : 1, "dt" : "20191213"}""",
"""{"id" : 3, "name": "hudi", "age" : 103, "ts" : 2, "dt" : "20191212"}""")

//    val inserts = List(
  //      """{"id" : 4,  "name": "iteblog", "age" : 102, "ts" : 2, "dt" : "20191212","addr" : "云南"}""",
  //      """{"id" : 5, "name": "iteblog_hadoop", "age" : 103, "ts" : 2, "dt" : "20191213","addr" : "浙江"}""",
  //      """{"id" : 6,  "name": "hudi", "age" : 104, "ts" : 2, "dt" : "20191212","addr" : "云南"}""")
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
  df.show(10)
  df.write.format("org.apache.hudi")
// 设置主键列名
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
// 设置数据更新时间的列名
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
// 设置多级分区必须设置为org.apache.hudi.keygen.ComplexKeyGenerator
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator")
// 设置多级分区列
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt,ts")
// 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
// 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引
.option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
// 并行度参数设置
.option("hoodie.insert.shuffle.parallelism", "2")
    .option("hoodie.upsert.shuffle.parallelism", "2")
// 表名称设置
.option(HoodieWriteConfig.TABLE_NAME, tableName)
    .mode(SaveMode.Append)
    .save(basePath)
}



报错截图:


我应该怎么处理这个问题呢?








gaofeng5096@capinfo.com.cn