You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Aydin Kocas (JIRA)" <ji...@apache.org> on 2018/02/05 12:25:00 UTC

[jira] [Created] (SPARK-23337) withWatermark raises an exception on nested objects

Aydin Kocas created SPARK-23337:
-----------------------------------

             Summary: withWatermark raises an exception on nested objects
                 Key: SPARK-23337
                 URL: https://issues.apache.org/jira/browse/SPARK-23337
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.2.1
         Environment: Linux Ubuntu, Spark on standalone mode
            Reporter: Aydin Kocas


Hi,

 

when using a nested object (here: _source.creationTime) from a json file as the parameter for the withWatermark-method, I get an exception (see below).

Anything else works flawlessly with the nested object.

 


+*{color:#14892c}works:{color}*+ 
{code:java}
Dataset<Row> jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", "10 seconds").toDF();{code}
 

json structure:
{code:java}
root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- myTime: timestamp (nullable = true)
..{code}

+*{color:#d04437}does not work - nested json{color}:*+
{code:java}
Dataset<Row> jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.creationTime", "10 seconds").toDF();{code}
 

json structure:

 
{code:java}
root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- _source: struct (nullable = true)
 | |-- creationTime: timestamp (nullable = true)
..
 
Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
'EventTimeWatermark '_source.createTime, interval 10 seconds
+- Deduplicate [_id#0], true
 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
 at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
 at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
 at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
 at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796)
 at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
 at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
 at scala.collection.immutable.List.foldLeft(List.scala:84)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
 at scala.collection.immutable.List.foreach(List.scala:381)
 at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
 at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
 at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
 at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
 at org.apache.spark.sql.Dataset.<init>(Dataset.scala:165)
 at org.apache.spark.sql.Dataset.<init>(Dataset.scala:171)
 at org.apache.spark.sql.Dataset$.apply(Dataset.scala:62)
 at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2889)
 at org.apache.spark.sql.Dataset.withWatermark(Dataset.scala:569)
 at my.package.hpstatistics.Importer.importViaSparkStreaming(Importer.java:278)
 at my.package.hpstatistics.Main.main(Main.java:80)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
Failed to copy node.
Is otherCopyArgs specified correctly for EventTimeWatermark.
Exception message: argument type mismatch
ctor: public org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark(org.apache.spark.sql.catalyst.expressions.Attribute,org.apache.spark.unsafe.types.CalendarInterval,org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)?
types: class org.apache.spark.sql.catalyst.expressions.Alias, class org.apache.spark.unsafe.types.CalendarInterval, class org.apache.spark.sql.catalyst.plans.logical.Deduplicate
args: _source#3.createTime AS createTime#12, interval 10 seconds, Deduplicate [_id#0], true
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
, tree:
'EventTimeWatermark '_source.createTime, interval 10 seconds
+- Deduplicate [_id#0], true
 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:415)
 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:385)
 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
 ... 29 more
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org