You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yunjian Zhang (JIRA)" <ji...@apache.org> on 2017/06/02 23:07:04 UTC
[jira] [Comment Edited] (SPARK-20973) insert table fail caused by
unable to fetch data definition file from remote hdfs
[ https://issues.apache.org/jira/browse/SPARK-20973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035597#comment-16035597 ]
Yunjian Zhang edited comment on SPARK-20973 at 6/2/17 11:06 PM:
----------------------------------------------------------------
I did check the source code and add a patch to fix the insert issue as below, unable to attach file here, so just past the content as well.
----------------------------------------------
--- a/./workspace1/spark-2.1.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/./workspace/git/gdr/spark/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -57,7 +57,7 @@ private[hive] class SparkHiveWriterContainer(
extends Logging
with HiveInspectors
with Serializable {
-
+
private val now = new Date()
private val tableDesc: TableDesc = fileSinkConf.getTableInfo
// Add table properties from storage handler to jobConf, so any custom storage
@@ -154,6 +154,12 @@ private[hive] class SparkHiveWriterContainer(
conf.value.setBoolean("mapred.task.is.map", true)
conf.value.setInt("mapred.task.partition", splitID)
}
+
+ def newSerializer(tableDesc: TableDesc): Serializer = {
+ val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
+ serializer.initialize(null, tableDesc.getProperties)
+ serializer
+ }
def newSerializer(jobConf: JobConf, tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
@@ -162,10 +168,11 @@ private[hive] class SparkHiveWriterContainer(
}
protected def prepareForWrite() = {
- val serializer = newSerializer(jobConf, fileSinkConf.getTableInfo)
+ val serializer = newSerializer(conf.value, fileSinkConf.getTableInfo)
+ logInfo("CHECK table deser:" + fileSinkConf.getTableInfo.getDeserializer(conf.value))
val standardOI = ObjectInspectorUtils
.getStandardObjectInspector(
- fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
+ fileSinkConf.getTableInfo.getDeserializer(conf.value).getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]
was (Author: daniel.yj.zhang@gmail.com):
I did check the source code and add a patch to fix the insert issue
> insert table fail caused by unable to fetch data definition file from remote hdfs
> ----------------------------------------------------------------------------------
>
> Key: SPARK-20973
> URL: https://issues.apache.org/jira/browse/SPARK-20973
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.1.0
> Reporter: Yunjian Zhang
> Labels: patch
>
> I implemented my own hive serde to handle special data files which needs to read data definition during process.
> the process include
> 1.read definition file location from TBLPROPERTIES
> 2.read file content as per step 1
> 3.init serde base on step 2.
> //DDL of the table as below:
> ---------------------------------------------
> CREATE EXTERNAL TABLE dw_user_stg_txt_out
> ROW FORMAT SERDE 'com.ebay.dss.gdr.hive.serde.abvro.AbvroSerDe'
> STORED AS
> INPUTFORMAT 'com.ebay.dss.gdr.mapred.AbAsAvroInputFormat'
> OUTPUTFORMAT 'com.ebay.dss.gdr.hive.ql.io.ab.AvroAsAbOutputFormat'
> LOCATION 'hdfs://${remote_hdfs}/user/data'
> TBLPROPERTIES (
> 'com.ebay.dss.dml.file' = 'hdfs://${remote_hdfs}/dml/user.dml'
> )
> // insert statement
> insert overwrite table dw_user_stg_txt_out select * from dw_user_stg_txt_avro;
> //fail with ERROR
> 17/06/02 15:46:34 ERROR SparkSQLDriver: Failed in [insert overwrite table dw_user_stg_txt_out select * from dw_user_stg_txt_avro]
> java.lang.RuntimeException: FAILED to get dml file from: hdfs://${remote-hdfs}/dml/user.dml
> at com.ebay.dss.gdr.hive.serde.abvro.AbvroSerDe.initialize(AbvroSerDe.java:109)
> at org.apache.spark.sql.hive.SparkHiveWriterContainer.newSerializer(hiveWriterContainers.scala:160)
> at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:258)
> at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:170)
> at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:347)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org