You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Stephen Boesch (JIRA)" <ji...@apache.org> on 2014/12/06 04:51:12 UTC
[jira] [Commented] (SPARK-4775) Possible problem in a simple join?
Getting duplicate rows and missing rows
[ https://issues.apache.org/jira/browse/SPARK-4775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236567#comment-14236567 ]
Stephen Boesch commented on SPARK-4775:
---------------------------------------
How do I attached files here? Since I am uncertain and the files are small I am going to include here as comments.
Here is the standalone testing program
package org.apache.spark.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.scalatest.FunSuite
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
case class JoinTable2Cols(intcol: Int, strcol: String)
/**
* SparkSQLJoinSuite
*
*/
class SparkSQLJoinSuite extends FunSuite {
test("Basic Join on vanilla SparkSql: Simple Two Way 2 cols") {
val testnm = "Basic Join on vanilla SparkSql: Simple Two Way 2 cols"
import org.apache.spark.sql._
// val hbclocal = hbc.asInstanceOf[SQLContext]
// import hbclocal._
val sc = new SparkContext("local[1]","BasicJoinTest")
val ssc = new SQLContext(sc)
import ssc._
val rdd1 = sc.parallelize((1 to 2).map { ix => JoinTable2Cols(ix, s"valA$ix")})
rdd1.registerTempTable("SparkJoinTable1")
println("Table1 Contents:")
val q1 = ssc.sql("select * from SparkJoinTable1").collect.foreach(println)
val ids = Seq((1, 1), (1, 2), (2, 3), (2, 4))
val rdd2 = sc.parallelize(ids.map { case (ix, is) => JoinTable2Cols(ix, s"valB$is")})
val table2 = rdd2.registerTempTable("SparkJoinTable2")
println("Table2 Contents:")
val q2 = ssc.sql("select * from SparkJoinTable2").collect.foreach(println)
val query = s"""select t1.intcol t1intcol, t2.intcol t2intcol, t1.strcol t1strcol,
t2.strcol t2strcol from SparkJoinTable1 t1 JOIN
SparkJoinTable2 t2 on t1.intcol = t2.intcol""".stripMargin
println(query)
val res = ssc.sql(query).sortBy(r =>
s"${r.getInt(0)} ${r.getInt(1)} ${r.getString(2)} ${r.getString(3)}")
// res.collect.foreach(println)
val exparr = Seq[Seq[Any]](
Seq(1, 1, "valA1", "valB1"),
Seq(1, 1, "valA1", "valB2"),
Seq(2, 2, "valA2", "valB3"),
Seq(2, 2, "valA2", "valB4"))
run(ssc, testnm, query, exparr)
}
def run(sqlCtx: SQLContext, testName: String, sql: String, exparr: Seq[Seq[Any]]) = {
val execQuery1 = sqlCtx.executeSql(sql)
val result1 = execQuery1.toRdd.collect()
assert(result1.size == exparr.length, s"$testName failed on size")
verify(testName,
sql,
for (rx <- 0 until exparr.size)
yield result1(rx).toSeq, exparr
)
}
def verify(testName: String, sql: String, result1: Seq[Seq[Any]], exparr: Seq[Seq[Any]]) = {
println(s"$sql came back with ${result1.size} results")
println(result1.mkString("Results\n","\n",""))
var res = {
for (rx <- 0 until exparr.size)
yield compareWithTol(result1(rx).toSeq, exparr(rx), s"Row$rx failed")
}.foldLeft(true) { case (res1, newres) => res1 && newres}
assert(res, "One or more rows did not match expected")
println(s"Test $testName completed successfully")
}
val CompareTol = 1e-6
def compareWithTol(actarr: Seq[Any], exparr: Seq[Any], emsg: String): Boolean = {
actarr.zip(exparr).forall { case (a, e) =>
val eq = (a, e) match {
case (a: Double, e: Double) =>
Math.abs(a - e) <= CompareTol
case (a: Float, e: Float) =>
Math.abs(a - e) <= CompareTol
case (a, e) =>
a == e
case _ => throw new IllegalArgumentException("Expected tuple")
}
if (!eq) {
System.err.println(s"ERROR: $emsg: Mismatch- act=$a exp=$e")
}
eq
}
}
}
> Possible problem in a simple join? Getting duplicate rows and missing rows
> ---------------------------------------------------------------------------
>
> Key: SPARK-4775
> URL: https://issues.apache.org/jira/browse/SPARK-4775
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.3.0
> Environment: Run on Mac but should be agnostic
> Reporter: Stephen Boesch
>
> I am working on testing of HBase joins. As part of this work some simple vanilla SparkSQL tests were created. Some of the results are surprising: here are the details:
> ------------------------------------
> Consider the following schema that includes two columns:
> case class JoinTable2Cols(intcol: Int, strcol: String)
> Let us register two temp tables using this schema and insert 2 rows and 4 rows respectively:
> val rdd1 = sc.parallelize((1 to 2).map { ix => JoinTable2Cols(ix, s"valA$ix")})
> rdd1.registerTempTable("SparkJoinTable1")
> val ids = Seq((1, 1), (1, 2), (2, 3), (2, 4))
> val rdd2 = sc.parallelize(ids.map { case (ix, is) => JoinTable2Cols(ix, s"valB$is")})
> val table2 = rdd2.registerTempTable("SparkJoinTable2")
> Here is the data in both tables:
> Table1 Contents:
> [1,valA1]
> [2,valA2]
> Table2 Contents:
> [1,valB1]
> [1,valB2]
> [2,valB3]
> [2,valB4]
> Now let us join the tables on the first column:
> select t1.intcol t1intcol, t2.intcol t2intcol, t1.strcol t1strcol,
> t2.strcol t2strcol from SparkJoinTable1 t1 JOIN
> SparkJoinTable2 t2 on t1.intcol = t2.intcol
> What results do we get:
> came back with 4 results
> Results
> [1,1,valA1,valB2]
> [1,1,valA1,valB2]
> [2,2,valA2,valB4]
> [2,2,valA2,valB4]
> Huh??
> Where did valB1 and valB3 go? Why do we have duplicate rows?
> Note: the expected results were:
> Seq(1, 1, "valA1", "valB1"),
> Seq(1, 1, "valA1", "valB2"),
> Seq(2, 2, "valA2", "valB3"),
> Seq(2, 2, "valA2", "valB4"))
> A standalone testing program is attached SparkSQLJoinSuite. An abridged version of the actual output is also attached.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org