You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "suisenkotoba (via GitHub)" <gi...@apache.org> on 2023/03/03 06:45:02 UTC

[GitHub] [iceberg] suisenkotoba opened a new issue, #6998: Iceberg with Dataproc Metastore: MERGE INTO TABLE is not supported temporarily

suisenkotoba opened a new issue, #6998:
URL: https://github.com/apache/iceberg/issues/6998

   ### Query engine
   
   PySpark on Dataproc
   
   ### Question
   
   I use spark 3.3 on dataproc ([image version 2.1](https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.1)) with iceberg 1.1.0. The dataproc cluster already had dataproc metastore attached. I already added iceberg extension in my spark config, and even used table version 2, but I still got error `MERGE INTO TABLE is not supported temporarily`
   
   This is my python code:
   ```python
   from pyspark.sql import SparkSession
   from pyspark import SparkConf
   from pyspark.sql.types import LongType, BooleanType, StructField, StructType
   
   
   conf = (
           SparkConf()
           .setAppName('test_iceberg')
           .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
           .set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')
           .set('spark.sql.catalog.spark_catalog.type', 'hive')
           .set(f'spark.sql.catalog.dev', 'org.apache.iceberg.spark.SparkCatalog')
           .set(f'spark.sql.catalog.dev.type', 'hive')
       )
   spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
   
   df = spark.read.option("mergeSchema", "true").parquet('gs://poc-kafka/mis').sort("timestamp")
   
   # Create iceberg table 
   schema = df.select("value.after.*").schema
   additional_fields = [StructField('start_at', LongType(), True), 
                  StructField('end_at', LongType(), True), 
                  StructField('is_current', BooleanType(), True)]
   schema = StructType(schema.fields + additional_fields)
   empty_df = spark.createDataFrame([], schema)
   spark.sql("CREATE DATABASE IF NOT EXISTS dev.dummy")
   empty_df.createOrReplaceTempView("empty")
   spark.sql("CREATE TABLE IF NOT EXISTS dev.dummy.fruit USING iceberg  "
            "TBLPROPERTIES ('format-version'='2') "
            "AS (SELECT * FROM empty) ")
   
   # Merge table
   df.createOrReplaceTempView("changelog")
   sql = "MERGE INTO dev.dummy.fruit d " \
         "USING (SELECT value.after.*, value.op op, " \
         "            value.source.ts_ms start_at, True is_current, " \
         "            NULL end_at " \
         "        FROM changelog) s " \
         "ON d.id = s.id " \
         "WHEN MATCHED THEN UPDATE SET d.is_current = False, d.end_at = s.start_at " \
         "WHEN NOT MATCHED THEN INSERT * "
   spark.sql(sql)
   ```
   
   [This document ](https://iceberg.apache.org/docs/latest/spark-writes/#merge-into) clearly stated that the `MERGE INTO` is already supported in Spark 3 so I'm wondering if may be my configuration was wrong, or the table I created was not in iceberg table format, or even it has anything to do with dataproc or dataproc metastore? If it helps, this is how I created the dataproc cluster:
   
   ```shell
   gcloud dataproc clusters create de-dev-dp --image-version=2.1 --region asia-southeast2 \
    --project de-dev --dataproc-metastore projects/de-dev/locations/asia-southeast2/services/de-dev-dpms  \
    --max-idle=30m --num-masters=1 --master-machine-type n2-standard-2  \
    --properties='^#^dataproc:pip.packages=google-cloud-secret-manager==2.15.0,numpy==1.24.1#spark:spark.jars=https://jdbc.postgresql.org/download/postgresql-42.5.1.jar,https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.3_2.13/1.1.0/iceberg-spark-runtime-3.3_2.13-1.1.0.jar#core:fs.defaultFS=gs://de-dev-dataproc-fs'  \
    --subnet projects/de-dev/regions/asia-southeast2/subnetworks/subnet-de-dev-gke-dp  \
    --service-account=de-dev-ga-dataproc@de-dev.iam.gserviceaccount.com  \
    --master-boot-disk-type pd-ssd --master-boot-disk-size 30GB --num-workers=2 \
    --worker-machine-type n2-standard-2   --worker-boot-disk-type pd-ssd \
    --worker-boot-disk-size 30GB   --bucket de-dev-adhoc --temp-bucket de-dev-adhoc \
    --no-address   --optional-components=JUPYTER --enable-component-gateway
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] suisenkotoba closed issue #6998: Iceberg with Dataproc Metastore: MERGE INTO TABLE is not supported temporarily

Posted by "suisenkotoba (via GitHub)" <gi...@apache.org>.
suisenkotoba closed issue #6998: Iceberg with Dataproc Metastore: MERGE INTO TABLE is not supported temporarily
URL: https://github.com/apache/iceberg/issues/6998


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org