You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "guowei (JIRA)" <ji...@apache.org> on 2015/01/04 09:11:34 UTC

[jira] [Commented] (SPARK-5066) Can not get all key that has same hashcode when reading key ordered from different Streaming.

    [ https://issues.apache.org/jira/browse/SPARK-5066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14263787#comment-14263787 ] 

guowei commented on SPARK-5066:
-------------------------------

it should return K4 first in your example

> Can not get all key that has same hashcode  when reading key ordered  from different Streaming.
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5066
>                 URL: https://issues.apache.org/jira/browse/SPARK-5066
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.2.0
>            Reporter: DoingDone9
>            Priority: Critical
>
> when spill is open, data ordered by hashCode will be spilled to disk. We need get all key that has the same hashCode from different tmp files when merge value, but it just read the key that has the minHashCode that in a tmp file, we can not read all key.
> Example :
> If file1 has [k1, k2, k3], file2 has [k4,k5,k1].
> And hashcode of k4 < hashcode of k5 < hashcode of k1 <  hashcode of k2 <  hashcode of k3
> we just  read k1 from file1 and k4 from file2. Can not read all k1.
> Code :
> private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
>     inputStreams.foreach { it =>
>       val kcPairs = new ArrayBuffer[(K, C)]
>       readNextHashCode(it, kcPairs)
>       if (kcPairs.length > 0) {
>         mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
>       }
>     }
>  private def readNextHashCode(it: BufferedIterator[(K, C)], buf: ArrayBuffer[(K, C)]): Unit = {
>       if (it.hasNext) {
>         var kc = it.next()
>         buf += kc
>         val minHash = hashKey(kc)
>         while (it.hasNext && it.head._1.hashCode() == minHash) {
>           kc = it.next()
>           buf += kc
>         }
>       }
>     }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org