You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/01 09:44:47 UTC

[GitHub] [iceberg] fcvr1010 opened a new issue #3055: Concurrent append hangs in Spark 3

fcvr1010 opened a new issue #3055:
URL: https://github.com/apache/iceberg/issues/3055


   Hi, I run into a strange behaviour I could not really explain and I'd appreciate your feedback.
   
   I'm running 5 concurrent PySpark 3.0 jobs on AWS EMR whose code is essentially the following:
   
   ```python
   # `customer` is a variable of the job, each job has a different value.
   
   data = [(customer, "abcde")]
   df = (
       spark.createDataFrame(data, ["customer", "data"])
       .repartition(1)
       .sortWithinPartitions("customer")
       .cache()
   )
   df.count()
   
   trials = []
   for i in range(20):
       start = time.time()
       df.write.saveAsTable(TABLE_NAME, mode="append")
       end = time.time()
       trials.append(end - start)
       time.sleep(1)
   ```
   
   Basically, what I want to do is measure the throughput (in terms of commits-per-minute) we can get when writing concurrently to the same table but to different partitions. In fact, here the table I write to is customer-partitioned.
   
   I have setup my environment in AWS as per the [Iceberg docs](https://iceberg.apache.org/aws/), in particular using the Glue Catalog and DynamoDB to obtain the lock on the catalog. I'm using version 0.12 of Iceberg.
   
   What I noticed is that after some time (minutes, typically), one or more of the jobs would hang. I tried to understand at which point by running `sudo -u hadoop jstack ApplicationMasterPID`. This is, I think, the relevant portion of the output
   
   ```
   "iceberg-worker-pool-71" #382 daemon prio=5 os_prio=0 tid=0x00007f0c55c30000 nid=0xbf9e waiting on condition [0x00007f0c3ab48000]
   "iceberg-worker-pool-70" #381 daemon prio=5 os_prio=0 tid=0x00007f0c55c2e000 nid=0xbf9d waiting on condition [0x00007f0c3ac49000]
   [… several more like this]
   
   "Thread-6" #34 daemon prio=5 os_prio=0 tid=0x00007f0c6c028800 nid=0xb08a runnable [0x00007f0c584ad000]
      java.lang.Thread.State: RUNNABLE
   	at java.io.FilterInputStream.read(FilterInputStream.java:133)
   	at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3454)
   	at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3447)
   	at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3435)
   	at java.io.FilterInputStream.read(FilterInputStream.java:83)
   	at software.amazon.awssdk.services.s3.checksums.ChecksumValidatingInputStream.read(ChecksumValidatingInputStream.java:125)
   	at java.io.FilterInputStream.read(FilterInputStream.java:133)
   	at software.amazon.awssdk.core.io.SdkFilterInputStream.read(SdkFilterInputStream.java:66)
   	at org.apache.iceberg.aws.s3.S3InputStream.read(S3InputStream.java:93)
   	at org.apache.iceberg.avro.AvroIO$AvroInputStreamAdapter.read(AvroIO.java:120)
   	at org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:61)
   	at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
   	at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
   	at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
   	at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
   	at org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237)
   	at org.apache.iceberg.ManifestLists.read(ManifestLists.java:46)
   	at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127)
   	at org.apache.iceberg.BaseSnapshot.allManifests(BaseSnapshot.java:141)
   	at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:307)
   	at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:194)
   	at org.apache.iceberg.spark.source.SparkWrite.access$1200(SparkWrite.java:87)
   	at org.apache.iceberg.spark.source.SparkWrite$BatchAppend.commit(SparkWrite.java:251)
   	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:396)
   	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
   	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:253)
   	at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:259)
   	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
   	- locked <0x0000000730b3fa68> (a org.apache.spark.sql.execution.datasources.v2.AppendDataExec)
   	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
   	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:54)
   	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
   	at org.apache.spark.sql.execution.SparkPlan$$Lambda$1848/1701024125.apply(Unknown Source)
   	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
   	at org.apache.spark.sql.execution.SparkPlan$$Lambda$1849/213791705.apply(Unknown Source)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
   	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
   	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:124)
   	- locked <0x000000072f18b268> (a org.apache.spark.sql.execution.QueryExecution)
   	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:123)
   	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
   	at org.apache.spark.sql.DataFrameWriter$$Lambda$3022/93072724.apply(Unknown Source)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
   	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
   	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:132)
   	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1668/344209694.apply(Unknown Source)
   	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
   	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
   	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1667/2017859747.apply(Unknown Source)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:248)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:131)
   	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1663/388091508.apply(Unknown Source)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
   	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
   	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:660)
   	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:596)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   	at py4j.Gateway.invoke(Gateway.java:282)
   	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   	at py4j.commands.CallCommand.execute(CallCommand.java:79)
   	at py4j.GatewayConnection.run(GatewayConnection.java:238)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   
   The full output is here: [stack.log](https://github.com/apache/iceberg/files/7082344/stack.log)
   
   So, if I understand correctly, all workers thread of Iceberg are waiting for jobs to process, and `Thread-6` is actually running and trying to read data from S3 in order to perform the perform the commit. I presume such data is the Iceberg meta-data. Now, if that is correct, why would such read hang? Should I perhaps try a different library? Any guidance is deeply appreciated. Also, please let me know if there are better ways to get more information about the blocked threads.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fcvr1010 commented on issue #3055: Concurrent append hangs in Spark 3

Posted by GitBox <gi...@apache.org>.
fcvr1010 commented on issue #3055:
URL: https://github.com/apache/iceberg/issues/3055#issuecomment-1002637717


   @rdblue thanks for your feedback and really sorry about my super late reply but we had to pause our Iceberg investigations for a bit. As soon as I get the chance to test this again, I'll let you know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue edited a comment on issue #3055: Concurrent append hangs in Spark 3

Posted by GitBox <gi...@apache.org>.
rdblue edited a comment on issue #3055:
URL: https://github.com/apache/iceberg/issues/3055#issuecomment-920998395


   This is the issue I'd track. We think this is a connection leak and I have a stack trace for it:
   
   ```
   21/09/05 16:11:49 WARN S3InputStream: Unclosed input stream created by:
   	org.apache.iceberg.aws.s3.S3InputStream.<init>(S3InputStream.java:60)
   	org.apache.iceberg.aws.s3.S3InputFile.newStream(S3InputFile.java:52)
   	org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101)
   	org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
   	org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
   	org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
   	org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237)
   	org.apache.iceberg.ManifestLists.read(ManifestLists.java:46)
   	org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:137)
   	org.apache.iceberg.BaseSnapshot.dataManifests(BaseSnapshot.java:159)
   	org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:365)
   	org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:164)
   	org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:283)
   	org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:405)
   	org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214)
   	org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198)
   	org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190)
   	org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:282)
   	org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:201)
   	org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:92)
   	org.apache.iceberg.spark.source.SparkWrite$BatchAppend.commit(SparkWrite.java:258)
   	org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
   	org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
   	org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on issue #3055: Concurrent append hangs in Spark 3

Posted by GitBox <gi...@apache.org>.
rdblue commented on issue #3055:
URL: https://github.com/apache/iceberg/issues/3055#issuecomment-920998395


   This is the issue I'd track. We think this is a connection leak and I have a stack trace for it:
   
   ```21/09/05 16:11:49 WARN S3InputStream: Unclosed input stream created by:
   	org.apache.iceberg.aws.s3.S3InputStream.<init>(S3InputStream.java:60)
   	org.apache.iceberg.aws.s3.S3InputFile.newStream(S3InputFile.java:52)
   	org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101)
   	org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
   	org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
   	org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
   	org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237)
   	org.apache.iceberg.ManifestLists.read(ManifestLists.java:46)
   	org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:137)
   	org.apache.iceberg.BaseSnapshot.dataManifests(BaseSnapshot.java:159)
   	org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:365)
   	org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:164)
   	org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:283)
   	org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:405)
   	org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214)
   	org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198)
   	org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190)
   	org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:282)
   	org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:201)
   	org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:92)
   	org.apache.iceberg.spark.source.SparkWrite$BatchAppend.commit(SparkWrite.java:258)
   	org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
   	org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
   	org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on issue #3055: Concurrent append hangs in Spark 3

Posted by GitBox <gi...@apache.org>.
rdblue commented on issue #3055:
URL: https://github.com/apache/iceberg/issues/3055#issuecomment-909358112


   I think we had a recent report of an Avro issue resurfacing, where readers could hang in certain cases. We'll have to look into this more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on issue #3055: Concurrent append hangs in Spark 3

Posted by GitBox <gi...@apache.org>.
rdblue commented on issue #3055:
URL: https://github.com/apache/iceberg/issues/3055#issuecomment-925402232


   @fcvr1010, are there any related exceptions in logs when you see this behavior? I'm looking into this and it looks like everything is getting correctly closed in this path to me. The only leaks I think could happen would result from exceptions in Avro classes. Can you check?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on issue #3055: Concurrent append hangs in Spark 3

Posted by GitBox <gi...@apache.org>.
rdblue commented on issue #3055:
URL: https://github.com/apache/iceberg/issues/3055#issuecomment-909358112


   I think we had a recent report of an Avro issue resurfacing, where readers could hang in certain cases. We'll have to look into this more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fcvr1010 commented on issue #3055: Concurrent append hangs in Spark 3

Posted by GitBox <gi...@apache.org>.
fcvr1010 commented on issue #3055:
URL: https://github.com/apache/iceberg/issues/3055#issuecomment-920680998


   @rdblue thanks for your comment. Are there any updates with respect to such issue or another ticket I could track?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org