You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "angerszhu (Jira)" <ji...@apache.org> on 2020/08/13 15:28:00 UTC
[jira] [Updated] (SPARK-32607) Script Transformation no-serde read
line should respect `TOK_TABLEROWFORMATLINES`
[ https://issues.apache.org/jira/browse/SPARK-32607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
angerszhu updated SPARK-32607:
------------------------------
Description:
reader.readLine() should respect ``
{code:java}
protected def createOutputIteratorWithoutSerde(
writerThread: BaseScriptTransformationWriterThread,
inputStream: InputStream,
proc: Process,
stderrBuffer: CircularBuffer): Iterator[InternalRow] = {
new Iterator[InternalRow] {
var curLine: String = null
val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
val outputRowFormat = ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")
val processRowWithoutSerde = if (!ioschema.schemaLess) {
prevLine: String =>
new GenericInternalRow(
prevLine.split(outputRowFormat)
.zip(outputFieldWriters)
.map { case (data, writer) => writer(data) })
} else {
// In schema less mode, hive default serde will choose first two output column as output
// if output column size less then 2, it will throw ArrayIndexOutOfBoundsException.
// Here we change spark's behavior same as hive's default serde.
// But in hive, TRANSFORM with schema less behavior like origin spark, we will fix this
// to keep spark and hive behavior same in SPARK-32388
val kvWriter = CatalystTypeConverters.createToCatalystConverter(StringType)
prevLine: String =>
new GenericInternalRow(
prevLine.split(outputRowFormat).slice(0, 2)
.map(kvWriter))
}
override def hasNext: Boolean = {
try {
if (curLine == null) {
curLine = reader.readLine()
if (curLine == null) {
checkFailureAndPropagate(writerThread, null, proc, stderrBuffer)
return false
}
}
true
} catch {
case NonFatal(e) =>
// If this exception is due to abrupt / unclean termination of `proc`,
// then detect it and propagate a better exception message for end users
checkFailureAndPropagate(writerThread, e, proc, stderrBuffer)
throw e
}
}
override def next(): InternalRow = {
if (!hasNext) {
throw new NoSuchElementException
}
val prevLine = curLine
curLine = reader.readLine()
processRowWithoutSerde(prevLine)
}
}
}
{code}
> Script Transformation no-serde read line should respect `TOK_TABLEROWFORMATLINES`
> ---------------------------------------------------------------------------------
>
> Key: SPARK-32607
> URL: https://issues.apache.org/jira/browse/SPARK-32607
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: angerszhu
> Priority: Major
>
> reader.readLine() should respect ``
> {code:java}
> protected def createOutputIteratorWithoutSerde(
> writerThread: BaseScriptTransformationWriterThread,
> inputStream: InputStream,
> proc: Process,
> stderrBuffer: CircularBuffer): Iterator[InternalRow] = {
> new Iterator[InternalRow] {
> var curLine: String = null
> val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
> val outputRowFormat = ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")
> val processRowWithoutSerde = if (!ioschema.schemaLess) {
> prevLine: String =>
> new GenericInternalRow(
> prevLine.split(outputRowFormat)
> .zip(outputFieldWriters)
> .map { case (data, writer) => writer(data) })
> } else {
> // In schema less mode, hive default serde will choose first two output column as output
> // if output column size less then 2, it will throw ArrayIndexOutOfBoundsException.
> // Here we change spark's behavior same as hive's default serde.
> // But in hive, TRANSFORM with schema less behavior like origin spark, we will fix this
> // to keep spark and hive behavior same in SPARK-32388
> val kvWriter = CatalystTypeConverters.createToCatalystConverter(StringType)
> prevLine: String =>
> new GenericInternalRow(
> prevLine.split(outputRowFormat).slice(0, 2)
> .map(kvWriter))
> }
> override def hasNext: Boolean = {
> try {
> if (curLine == null) {
> curLine = reader.readLine()
> if (curLine == null) {
> checkFailureAndPropagate(writerThread, null, proc, stderrBuffer)
> return false
> }
> }
> true
> } catch {
> case NonFatal(e) =>
> // If this exception is due to abrupt / unclean termination of `proc`,
> // then detect it and propagate a better exception message for end users
> checkFailureAndPropagate(writerThread, e, proc, stderrBuffer)
> throw e
> }
> }
> override def next(): InternalRow = {
> if (!hasNext) {
> throw new NoSuchElementException
> }
> val prevLine = curLine
> curLine = reader.readLine()
> processRowWithoutSerde(prevLine)
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org