You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rick Moritz (JIRA)" <ji...@apache.org> on 2017/05/05 11:58:04 UTC
[jira] [Commented] (SPARK-20489) Different results in local mode
and yarn mode when working with dates (race condition with
SimpleDateFormat?)
[ https://issues.apache.org/jira/browse/SPARK-20489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15998228#comment-15998228 ]
Rick Moritz commented on SPARK-20489:
-------------------------------------
If someone could try and replicate my observations, I think that would be a great bit of help - the above code should run as-is.
> Different results in local mode and yarn mode when working with dates (race condition with SimpleDateFormat?)
> -------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-20489
> URL: https://issues.apache.org/jira/browse/SPARK-20489
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0, 2.0.1, 2.0.2
> Environment: yarn-client mode in Zeppelin, Cloudera Spark2-distribution
> Reporter: Rick Moritz
> Priority: Critical
>
> Running the following code (in Zeppelin, or spark-shell), I get different results, depending on whether I am using local[*] -mode or yarn-client mode:
> {code:title=test case|borderStyle=solid}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import spark.implicits._
> val counter = 1 to 2
> val size = 1 to 3
> val sampleText = spark.createDataFrame(
> sc.parallelize(size)
> .map(Row(_)),
> StructType(Array(StructField("id", IntegerType, nullable=false))
> )
> )
> .withColumn("loadDTS",lit("2017-04-25T10:45:02.2"))
>
> val rddList = counter.map(
> count => sampleText
> .withColumn("loadDTS2", date_format(date_add(col("loadDTS"),count),"yyyy-MM-dd'T'HH:mm:ss.SSS"))
> .drop(col("loadDTS"))
> .withColumnRenamed("loadDTS2","loadDTS")
> .coalesce(4)
> .rdd
> )
> val resultText = spark.createDataFrame(
> spark.sparkContext.union(rddList),
> sampleText.schema
> )
> val testGrouped = resultText.groupBy("id")
> val timestamps = testGrouped.agg(
> max(unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS")) as "timestamp"
> )
> val loadDateResult = resultText.join(timestamps, "id")
> val filteredresult = loadDateResult.filter($"timestamp" === unix_timestamp($"loadDTS", "yyyy-MM-dd'T'HH:mm:ss.SSS"))
> filteredresult.count
> {code}
> The expected result, *3* is what I obtain in local mode, but as soon as I run fully distributed, I get *0*. If Increase size to {{1 to 32000}}, I do get some results (depending on the size of counter) - none of which makes any sense.
> Up to the application of the last filter, at first glance everything looks okay, but then something goes wrong. Potentially this is due to lingering re-use of SimpleDateFormats, but I can't get it to happen in a non-distributed mode. The generated execution plan is the same in each case, as expected.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org