You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Stephen Boesch (JIRA)" <ji...@apache.org> on 2014/12/06 04:51:12 UTC

[jira] [Commented] (SPARK-4775) Possible problem in a simple join? Getting duplicate rows and missing rows

    [ https://issues.apache.org/jira/browse/SPARK-4775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14236567#comment-14236567 ] 

Stephen Boesch commented on SPARK-4775:
---------------------------------------

How do I attached files here?    Since I am uncertain and the files are small I am going to include here as comments.

Here is the standalone testing program

package org.apache.spark.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.scalatest.FunSuite

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

case class JoinTable2Cols(intcol: Int, strcol: String)

/**
 * SparkSQLJoinSuite
 *
 */
class SparkSQLJoinSuite extends FunSuite {

  test("Basic Join on vanilla SparkSql: Simple Two Way  2 cols") {
    val testnm = "Basic Join on vanilla SparkSql: Simple Two Way 2 cols"
    import org.apache.spark.sql._
    //    val hbclocal = hbc.asInstanceOf[SQLContext]
    //    import hbclocal._
    val sc = new SparkContext("local[1]","BasicJoinTest")
    val ssc = new SQLContext(sc)
    import ssc._
    val rdd1 = sc.parallelize((1 to 2).map { ix => JoinTable2Cols(ix, s"valA$ix")})
    rdd1.registerTempTable("SparkJoinTable1")
    println("Table1 Contents:")
    val q1 = ssc.sql("select * from SparkJoinTable1").collect.foreach(println)
    val ids = Seq((1, 1), (1, 2), (2, 3), (2, 4))
    val rdd2 = sc.parallelize(ids.map { case (ix, is) => JoinTable2Cols(ix, s"valB$is")})
    val table2 = rdd2.registerTempTable("SparkJoinTable2")
    println("Table2 Contents:")
    val q2 = ssc.sql("select * from SparkJoinTable2").collect.foreach(println)
    val query = s"""select t1.intcol t1intcol, t2.intcol t2intcol, t1.strcol t1strcol,
                t2.strcol t2strcol from SparkJoinTable1 t1 JOIN
                    SparkJoinTable2 t2 on t1.intcol = t2.intcol""".stripMargin

    println(query)
    val res = ssc.sql(query).sortBy(r =>
      s"${r.getInt(0)} ${r.getInt(1)} ${r.getString(2)} ${r.getString(3)}")
    //    res.collect.foreach(println)
    val exparr = Seq[Seq[Any]](
      Seq(1, 1, "valA1", "valB1"),
      Seq(1, 1, "valA1", "valB2"),
      Seq(2, 2, "valA2", "valB3"),
      Seq(2, 2, "valA2", "valB4"))
    run(ssc, testnm, query, exparr)
  }

  def run(sqlCtx: SQLContext, testName: String, sql: String, exparr: Seq[Seq[Any]]) = {
    val execQuery1 = sqlCtx.executeSql(sql)
    val result1 = execQuery1.toRdd.collect()
    assert(result1.size == exparr.length, s"$testName failed on size")
    verify(testName,
      sql,
      for (rx <- 0 until exparr.size)
        yield result1(rx).toSeq, exparr
    )
  }

  def verify(testName: String, sql: String, result1: Seq[Seq[Any]], exparr: Seq[Seq[Any]]) = {
    println(s"$sql came back with ${result1.size} results")
    println(result1.mkString("Results\n","\n",""))
    var res = {
      for (rx <- 0 until exparr.size)
      yield compareWithTol(result1(rx).toSeq, exparr(rx), s"Row$rx failed")
    }.foldLeft(true) { case (res1, newres) => res1 && newres}

    assert(res, "One or more rows did not match expected")

    println(s"Test $testName completed successfully")
  }

  val CompareTol = 1e-6

  def compareWithTol(actarr: Seq[Any], exparr: Seq[Any], emsg: String): Boolean = {
    actarr.zip(exparr).forall { case (a, e) =>
      val eq = (a, e) match {
        case (a: Double, e: Double) =>
          Math.abs(a - e) <= CompareTol
        case (a: Float, e: Float) =>
          Math.abs(a - e) <= CompareTol
        case (a, e) =>
          a == e
        case _ => throw new IllegalArgumentException("Expected tuple")
      }
      if (!eq) {
        System.err.println(s"ERROR: $emsg: Mismatch- act=$a exp=$e")
      }
      eq
    }
  }

}


> Possible problem in a simple join?  Getting duplicate rows and missing rows
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-4775
>                 URL: https://issues.apache.org/jira/browse/SPARK-4775
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.3.0
>         Environment: Run on Mac but should be agnostic
>            Reporter: Stephen Boesch
>
> I am working on testing of HBase joins. As part of this work some simple vanilla SparkSQL tests were created.  Some of the results are surprising: here are the details:
> ------------------------------------
> Consider the following schema that includes two columns:
> case class JoinTable2Cols(intcol: Int, strcol: String)
> Let us register two temp tables using this schema and insert 2 rows and 4 rows respectively:
>     val rdd1 = sc.parallelize((1 to 2).map { ix => JoinTable2Cols(ix, s"valA$ix")})
>     rdd1.registerTempTable("SparkJoinTable1")
>     val ids = Seq((1, 1), (1, 2), (2, 3), (2, 4))
>     val rdd2 = sc.parallelize(ids.map { case (ix, is) => JoinTable2Cols(ix, s"valB$is")})
>     val table2 = rdd2.registerTempTable("SparkJoinTable2")
> Here is the data in both tables:
> Table1 Contents:
> [1,valA1]
> [2,valA2]
> Table2 Contents:
> [1,valB1]
> [1,valB2]
> [2,valB3]
> [2,valB4]
> Now let us join the tables on the first column:
> select t1.intcol t1intcol, t2.intcol t2intcol, t1.strcol t1strcol,
>                 t2.strcol t2strcol from SparkJoinTable1 t1 JOIN
>                     SparkJoinTable2 t2 on t1.intcol = t2.intcol
> What results do we get:
>  came back with 4 results
> Results
> [1,1,valA1,valB2]
> [1,1,valA1,valB2]
> [2,2,valA2,valB4]
> [2,2,valA2,valB4]
> Huh??
> Where did valB1 and valB3 go? Why do we have duplicate rows?
> Note: the expected results were:
>       Seq(1, 1, "valA1", "valB1"),
>       Seq(1, 1, "valA1", "valB2"),
>       Seq(2, 2, "valA2", "valB3"),
>       Seq(2, 2, "valA2", "valB4"))
> A standalone testing program is attached  SparkSQLJoinSuite. An abridged version of the actual output is also attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org