You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@crunch.apache.org by "David Whiting (JIRA)" <ji...@apache.org> on 2013/11/20 16:45:37 UTC

[jira] [Created] (CRUNCH-301) Cogrouping tables where RHS has a Scala tuple value type causes duplicated and missing RHS values

David Whiting created CRUNCH-301:
------------------------------------

             Summary: Cogrouping tables where RHS has a Scala tuple value type causes duplicated and missing RHS values
                 Key: CRUNCH-301
                 URL: https://issues.apache.org/jira/browse/CRUNCH-301
             Project: Crunch
          Issue Type: Bug
          Components: Scrunch
    Affects Versions: 0.8.0
         Environment: Hadoop 2
            Reporter: David Whiting


Suppose you have three record types, Rec1, Rec2 and Rec3.
Rec1 references Rec2 via key1, and Rec2 references Rec3 (one-to-many) by key2. If you innerJoin Rec2 and Rec3 to make a PCollection[(Rec2,Rec3)] and they cogroup it against Rec1, then instead of surfacing n different (Rec2,Rec3) tuples applicable to the Rec1, it surfaces just one of the (Rec2, Rec3) tuples multiple times.

This only happens when running with MRPipeline, and not with MemPipeline.

This is the simplest complete program I could come up with which will produce this unexpected result:

{code:scala}
package testcases

import org.apache.crunch.impl.mr.MRPipeline
import org.apache.crunch.io.{From, To}
import org.apache.crunch.scrunch.PCollection
import org.apache.crunch.types.avro.{ReflectDataFactory, Avros}
import org.apache.avro.file.DataFileWriter
import org.apache.hadoop.fs.{Path, FSDataOutputStream}
import org.apache.hadoop.conf.Configuration

object IsolatedBug {

  case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
  case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = this(0, "", 0.0) }
  case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
  def run() {
    val prefix = "/user/davw/tmp/isolation"

    val ones = Seq(Rec1(1, "hello"), Rec1(1, "tjena"), Rec1(2, "goodbye"))
    val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b", 0.7), Rec2(2, "c", 9.9))
    val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))

    writeCollection(new Path(prefix + "/ones"), ones)
    writeCollection(new Path(prefix + "/twos"), twos)
    writeCollection(new Path(prefix + "/threes"), threes)

    val pipe = new MRPipeline(getClass)
    val oneF = new PCollection(pipe.read(From.avroFile(prefix + "/ones", Avros.reflects(classOf[Rec1]))))
    val twoF = new PCollection(pipe.read(From.avroFile(prefix + "/twos", Avros.reflects(classOf[Rec2]))))
    val threeF = new PCollection(pipe.read(From.avroFile(prefix + "/threes", Avros.reflects(classOf[Rec3]))))
    (oneF.by(_.k)
      cogroup
      (twoF.by(_.k2)
         innerJoin threeF.by(_.k2))
        .values()
        .by(_._1.k))
      .values()
      .map(
        {case (one, twothree) =>
          (one ++ twothree)
            .map(_.toString)
            .reduce((a,b) => a + "\t" + b)})
      .write(To.textFile(prefix + "/output"))

    pipe.done()

  }

  def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
    writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records)
  }

  @SuppressWarnings(Array("rawtypes", "unchecked"))
  private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) {
    val r: AnyRef = records.iterator.next()
    val schema = new ReflectDataFactory().getReflectData.getSchema(r.getClass)

    val writer = new ReflectDataFactory().getWriter[T](schema)
    val dataFileWriter = new DataFileWriter(writer)
    dataFileWriter.create(schema, outputStream)

    for (record <- records) {
      dataFileWriter.append(record)
    }
    dataFileWriter.close()
    outputStream.close()
  }

  def main(args: Seq[String]) { run() }
}
{code}

The result that is produced is:
{code}
Rec1(1,tjena)	Rec1(1,hello)	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))	(Rec2(1,a,0.5),Rec3(a,4))
Rec1(2,goodbye)	(Rec2(2,c,9.9),Rec3(c,6))
{code}

As you can see, there's a single (Rec2, Rec3) tuple repeated many times, instead of showing all the distinct ones. This does not happen if you join against Rec2 on its own.




--
This message was sent by Atlassian JIRA
(v6.1#6144)