You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/11/25 17:23:57 UTC

[GitHub] [spark] xuanyuanking commented on a change in pull request #30395: [SPARK-32863][SS] Full outer stream-stream join

xuanyuanking commented on a change in pull request #30395:
URL: https://github.com/apache/spark/pull/30395#discussion_r530517789



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -315,40 +321,17 @@ object UnsupportedOperationChecker extends Logging {
                 throwError(s"$joinType join with a streaming DataFrame/Dataset " +
                   "on the right and a static DataFrame/Dataset on the left is not supported")
               } else if (left.isStreaming && right.isStreaming) {
-                val watermarkInJoinKeys = StreamingJoinHelper.isWatermarkInJoinKeys(subPlan)
-
-                val hasValidWatermarkRange =
-                  StreamingJoinHelper.getStateValueWatermark(
-                    left.outputSet, right.outputSet, condition, Some(1000000)).isDefined
-
-                if (!watermarkInJoinKeys && !hasValidWatermarkRange) {
-                  throwError(
-                    s"Stream-stream $joinType join between two streaming DataFrame/Datasets " +
-                    "is not supported without a watermark in the join keys, or a watermark on " +
-                    "the nullable side and an appropriate range condition")
-                }
+                checkForStreamStreamJoinWatermark(j)
               }
 
             // We support streaming right outer joins with static on the left always, and with
             // stream on both sides under the appropriate conditions.
             case RightOuter =>
               if (left.isStreaming && !right.isStreaming) {
-                throwError("Right outer join with a streaming DataFrame/Dataset on the left and " +
+                throwError(s"$RightOuter join with a streaming DataFrame/Dataset on the left and " +

Review comment:
       ditto

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -298,8 +298,14 @@ object UnsupportedOperationChecker extends Logging {
               // no further validations needed
 
             case FullOuter =>
-              if (left.isStreaming || right.isStreaming) {
-                throwError("Full outer joins with streaming DataFrames/Datasets are not supported")
+              if (left.isStreaming && !right.isStreaming) {
+                throwError(s"$FullOuter joins with streaming DataFrames/Datasets on the left " +

Review comment:
       nit: do you mean `${FullOuter.sql}` here? I think we can use literal 'FullOuter' here.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -333,13 +339,23 @@ case class StreamingSymmetricHashJoinExec(
           stateFormatVersion match {
             case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key, kv.value))
             case 2 => kv.matched
-            case _ =>
-              throw new IllegalStateException("Unexpected state format version! " +
-                s"version $stateFormatVersion")
+            case _ => throwBadStateFormatVersionException()
           }
         }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
 
         hashJoinOutputIter ++ outerOutputIter
+      case FullOuter =>
+        lazy val isKeyToValuePairMatched = (kv: KeyToValuePair) =>
+          stateFormatVersion match {
+            case 2 => kv.matched
+            case _ => throwBadStateFormatVersionException()
+          }
+        val leftSideOutputIter = leftSideJoiner.removeOldState().filterNot(isKeyToValuePairMatched)

Review comment:
       Super nit, let's start the new line in the same position for `left/rightSideOutputIter`
   ```
   val leftSideOutputIter = leftSideJoiner.removeOldState().filterNot(
     isKeyToValuePairMatched).map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
   ```

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -438,4 +421,32 @@ object UnsupportedOperationChecker extends Logging {
     throw new AnalysisException(
       msg, operator.origin.line, operator.origin.startPosition, Some(operator))
   }
+
+  private def checkForStreamStreamJoinWatermark(join: Join): Unit = {
+    val watermarkInJoinKeys = StreamingJoinHelper.isWatermarkInJoinKeys(join)
+
+    val hasValidWatermarkRange = join.joinType match {

Review comment:
       Let's also keep the comment https://github.com/apache/spark/pull/30395/files#diff-7c879c08d2f379c139d5229a88857229ae69bb48f0a138a3d64e1b2dde3502feL341 in this new method.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -382,6 +398,7 @@ case class StreamingSymmetricHashJoinExec(
             leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
           case LeftOuter => rightSideJoiner.removeOldState()
           case RightOuter => leftSideJoiner.removeOldState()
+          case FullOuter => Iterator.empty

Review comment:
       Let's also add comments for FullOuter in: https://github.com/apache/spark/pull/30395/files#diff-6cd66da710d8d54025c1edf658bbec5230e8b4e748f9f2f884a60b1ba1efed42R395




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org