You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Zhang Victor <zh...@outlook.com> on 2020/03/17 01:58:48 UTC
回复: [PySpark] How to write HFiles as an 'append' to the same directory?
Maybe set spark.hadoop.validateOutputSpecs=false?
________________________________
发件人: Gautham Acharya <ga...@alleninstitute.org>
发送时间: 2020年3月15日 3:23
收件人: user@spark.apache.org <us...@spark.apache.org>
主题: [PySpark] How to write HFiles as an 'append' to the same directory?
I have a process in Apache Spark that attempts to write HFiles to S3 in a batched process. I want the resulting HFiles in the same directory, as they are in the same column family. However, I’m getting a ‘directory already exists error’ when I try to run this on AWS EMR. How can I write Hfiles via Spark as an ‘append’, like I can do via a CSV?
The batch writing function looks like this:
for col_group in split_cols:
processed_chunk = batch_write_pandas_udf_for_col_aggregation(joined_dataframe, col_group, pandas_udf_func, group_by_args)
hfile_writer.write_hfiles(processed_chunk, output_path,
zookeeper_ip, table_name, constants.DEFAULT_COL_FAMILY)
The actual function to write the Hfiles is this:
rdd.saveAsNewAPIHadoopFile(output_path,
constants.OUTPUT_FORMAT_CLASS,
keyClass=constants.KEY_CLASS,
valueClass=constants.VALUE_CLASS,
keyConverter=constants.KEY_CONVERTER,
valueConverter=constants.VALUE_CONVERTER,
conf=conf)
The exception I’m getting:
Called with arguments: Namespace(job_args=['matrix_path=/tmp/matrix.csv', 'metadata_path=/tmp/metadata.csv', 'output_path=s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles', 'group_by_args=cluster_id', 'zookeeper_ip=ip-172-30-5-36.ec2.internal', 'table_name=test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a'], job_name='matrix_transformations')
job_args_tuples: [['matrix_path', '/tmp/matrix.csv'], ['metadata_path', '/tmp/metadata.csv'], ['output_path', 's3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'], ['group_by_args', 'cluster_id'], ['zookeeper_ip', 'ip-172-30-5-36.ec2.internal'], ['table_name', 'test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a']]
Traceback (most recent call last):
File "/mnt/var/lib/hadoop/steps/s-2ZIOR335HH9TR/main.py", line 56, in <module>
job_module.transform(spark, **job_args)
File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 93, in transform
File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py", line 73, in write_split_columnwise_transform
File "/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/output_handler/hfile_writer.py", line 44, in write_hfiles
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in saveAsNewAPIHadoopFile
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median already exists
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:393)
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:991)
at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:584)
at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)