You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "xiongliang1989 (via GitHub)" <gi...@apache.org> on 2023/03/30 02:35:48 UTC

[GitHub] [iceberg] xiongliang1989 commented on issue #5515: "Table not found" error while using rewriteDataFiles

xiongliang1989 commented on issue #5515:
URL: https://github.com/apache/iceberg/issues/5515#issuecomment-1489598133

   Query engine
   Spark 3.1.1, Iceberg 1.1.0
   
   Question
   Hi all.
   I created iceberg table using hadoop catalog by java code like this:
   `
   //  get sparkSession;
           SparkSession session = SparkSession
                   .builder()
                   .appName("spark-iceberg")
                   .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
                   .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
                   .config("spark.sql.catalog.local.type", "hadoop")
                   .config("spark.sql.catalog.local.warehouse", "hdfs://bigdata/bigdata/warehouse")
                   .master("local[1]")
                   .getOrCreate();
   
           Configuration hadoopConf = new Configuration();
           HadoopCatalog catalog = new HadoopCatalog(hadoopConf, "hdfs://bigdata/bigdata/warehouse");
           TableIdentifier name = TableIdentifier.of("db", "n1_log");
   
           // create table
           if (!catalog.tableExists(name)) {
               session.sql("CREATE table local.db.n1_log (ran_ip string, ran_port int, amf_ip string, amf_port int, procedure_id int, procedure_bitmap int,cause_type int, cause_value int, ue_id_type  int, ue_id string, start_time long,end_time long, ran_ue_ngap_id string, amf_ue_ngap_id string, switch_off int, date_time string) USING iceberg PARTITIONED BY (date_time) TBLPROPERTIES ('write.metadata.delete-after-commit.enabled'='true', 'write.metadata.previous-versions-max'='1')");
           }
   
           // get Data source
           Dataset<Row> data = session.readStream()
                   .format("kafka")
                   .option("kafka.bootstrap.servers", "127.0.0.1:9092")
                   .option("kafka.group.id", "groupId")
                   .option("startingOffsets", "latest")
                   .option("failOnDataLoss", "false")
                   .option("maxOffsetsPerTrigger", "15000000")
                   .option("subscribe", "n1_log")
                   .load()
                   .selectExpr("CAST(value AS STRING)")
                   .toDF("data")
                   .map(new N1LogMapFunction(),
                           Encoders.bean(N1Log.class))
                   .select("ranIp", "ranPort", "amfIp", "amfPort", "procedureId", "procedureBitmap", "causeType", "causeValue", "ueIdType", "ueId", "startTime", "endTime", "ranUeNgapId", "amfUeNgapId", "switchOff", "dateTime")
                   .toDF("ran_ip", "ran_port", "amf_ip", "amf_port", "procedure_id", "procedure_bitmap", "cause_type", "cause_value", "ue_id_type", "ue_id", "start_time", "end_time", "ran_ue_ngap_id", "amf_ue_ngap_id", "switch_off", "date_time");
   
           // write
           StreamingQuery query = data.writeStream()
                   .format("iceberg")
                   .outputMode("append")
                   .partitionBy("date_time")
                   .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
                   .option("path", "hdfs://bigdata/bigdata/warehouse/db/n1_log")
                   .option("checkpointLocation", "hdfs://bigdata/bigdata/checkpointLocation/n1_log")
                   .start();
   
           query.awaitTermination();
           query.stop();
   `
   
   I can query data from iceberg table; But I use the small file merge function, get some error;
   `
   SparkSession session = SparkSession
                   .builder()
                   .appName("spark-iceberg")
                   .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
                   .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
                   .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
                   .config("spark.sql.catalog.local.type", "hadoop")
                   .config("spark.sql.catalog.local.warehouse", "hdfs://sugon-bigdata/sugon-bigdata/warehouse")
                   .master("local[1]")
                   .getOrCreate();
   
   HadoopCatalog hadoopCatalog = new HadoopCatalog(new Configuration(), "hdfs://sugon-bigdata/sugon-bigdata/warehouse");
   Table table = hadoopCatalog.loadTable(TableIdentifier.of("db", "n1_log"));
   
   SparkActions.get(session)
                       .rewriteDataFiles(table)
                       .filter(Expressions.equal("date_time", "2023-03-30"))
                       .option("target-file-size-bytes", Long.toString(500 * 1024 * 1024))
                       .execute();
   `
   
   After I run the code, get the error:
   `
   23/03/30 10:32:38 WARN Query: Query for candidates of org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics and subclasses resulted in no possible candidates
   Required table missing : "CDS" in Catalog "" Schema "". DataNucleus requires this table to perform its persistence operations. Either your MetaData is incorrect, or you need to enable "datanucleus.schema.autoCreateTables"
   org.datanucleus.store.rdbms.exceptions.MissingTableException: Required table missing : "CDS" in Catalog "" Schema "". DataNucleus requires this table to perform its persistence operations. Either your MetaData is incorrect, or you need to enable "datanucleus.schema.autoCreateTables"
   
   23/03/30 10:32:38 WARN MetaStoreDirectSql: Self-test query [select "DB_ID" from "DBS"] failed; direct SQL is disabled
   javax.jdo.JDODataStoreException: Error executing SQL query "select "DB_ID" from "DBS"".
   
   NestedThrowablesStackTrace:
   java.sql.SQLSyntaxErrorException: Table/View 'DBS' does not exist.
   `
   Could you give me some help?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org