You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shardul Mahadik (Jira)" <ji...@apache.org> on 2021/07/13 21:28:00 UTC

[jira] [Comment Edited] (SPARK-28266) data duplication when `path` serde property is present

    [ https://issues.apache.org/jira/browse/SPARK-28266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380170#comment-17380170 ] 

Shardul Mahadik edited comment on SPARK-28266 at 7/13/21, 9:27 PM:
-------------------------------------------------------------------

I would like to propose another angle to look at the issue.

In this case, Spark can be reading Hive tables created by other products like Hive, Trino or any external system. In such a case, Spark should not be interpreting {{path}} property as it can controlled by an end user in these system, it is not a restricted property and does not have a meaning in Hive itself.

So here, the issue is not that {{spark.sql.sources.provider}} is missing. It was never expected to be set since the table was not created through Spark.

An easy example to demonstrate this issue is by creating an empty table with {{path}} property through Hive, and then trying to read it through Spark.
{code:java}
hive (default)> CREATE TABLE test (id bigint)
              > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
              > WITH SERDEPROPERTIES ('path'='someRandomValue')
              > STORED AS PARQUET;
OK
Time taken: 0.069 seconds
{code}
{code:java}
scala> spark.sql("SELECT * FROM test")
org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://user/username/someRandomValue
  at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803)
  at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800)
  at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
  at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
  at scala.util.Success.$anonfun$map$1(Try.scala:255)
  at scala.util.Success.map(Try.scala:213)
  at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
  at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
  at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
  at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
  at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
{code}

In the case the path property points to a valid location, it may result in incorrect data
{code}
 hive (default)> CREATE TABLE test1 (id bigint)
              > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
              > WITH SERDEPROPERTIES ('path'='/user/username/test1'')
              > STORED AS PARQUET LOCATION '/user/username/test1'';
OK
Time taken: 0.046 seconds

hive (default)> INSERT INTO test1 VALUES (1);
1 Rows loaded to test1
OK
Time taken: 59.979 seconds
{code}

{code}
scala> spark.sql("SELECT * FROM test1").show()
+---+
| id|
+---+
|  1|
|  1|
+---+
{code}



was (Author: shardulm):
I would like to propose another angle to look at the issue.

In this case, Spark can be reading Hive tables created by other products like Hive, Trino or any external system. In such a case, Spark should not be interpreting {{path}} property as it can controlled by an end user in these system, it is not a restricted property and does not have a meaning in Hive itself.

So here, the issue is not that {{spark.sql.sources.provider}} is missing. It was never expected to be set since the table was not created through Spark.

An easy example to demonstrate this issue is by creating an empty table with {{path}} property through Hive, and then trying to read it through Spark.
{code:java}
hive (default)> CREATE TABLE test (id bigint)
              > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
              > WITH SERDEPROPERTIES ('path'='someRandomValue')
              > STORED AS PARQUET;
OK
Time taken: 0.069 seconds
{code}
{code:java}
scala> spark.sql("SELECT * FROM test")
org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://user/username/someRandomValue
  at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803)
  at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800)
  at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
  at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
  at scala.util.Success.$anonfun$map$1(Try.scala:255)
  at scala.util.Success.map(Try.scala:213)
  at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
  at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
  at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
  at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
  at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
{code}

> data duplication when `path` serde property is present
> ------------------------------------------------------
>
>                 Key: SPARK-28266
>                 URL: https://issues.apache.org/jira/browse/SPARK-28266
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0, 2.2.1, 2.2.2
>            Reporter: Ruslan Dautkhanov
>            Priority: Major
>              Labels: correctness
>
> Spark duplicates returned datasets when `path` serde is present in a parquet table. 
> Confirmed versions affected: Spark 2.2, Spark 2.3, Spark 2.4.
> Confirmed unaffected versions: Spark 2.1 and earlier (tested with Spark 1.6 at least).
> Reproducer:
> {code:python}
> >>> spark.sql("create table ruslan_test.test55 as select 1 as id")
> DataFrame[]
> >>> spark.table("ruslan_test.test55").explain()
> == Physical Plan ==
> HiveTableScan [id#16], HiveTableRelation `ruslan_test`.`test55`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#16]
> >>> spark.table("ruslan_test.test55").count()
> 1
> {code}
> (all is good at this point, now exist session and run in Hive for example - )
> {code:sql}
> ALTER TABLE ruslan_test.test55 SET SERDEPROPERTIES ( 'path'='hdfs://epsdatalake/hivewarehouse/ruslan_test.db/test55' )
> {code}
> So LOCATION and serde `path` property would point to the same location.
> Now see count returns two records instead of one:
> {code:python}
> >>> spark.table("ruslan_test.test55").count()
> 2
> >>> spark.table("ruslan_test.test55").explain()
> == Physical Plan ==
> *(1) FileScan parquet ruslan_test.test55[id#9] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://epsdatalake/hivewarehouse/ruslan_test.db/test55, hdfs://epsdatalake/hive..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
> >>>
> {code}
> Also notice that the presence of `path` serde property makes TABLE location 
> show up twice - 
> {quote}
> InMemoryFileIndex[hdfs://epsdatalake/hivewarehouse/ruslan_test.db/test55, hdfs://epsdatalake/hive..., 
> {quote}
> We have some applications that create parquet tables in Hive with `path` serde property
> and it makes data duplicate in query results. 
> Hive, Impala etc and Spark version 2.1 and earlier read such tables fine, but not Spark 2.2 and later releases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org