You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by raaggarw <ra...@adobe.com> on 2016/06/09 09:53:34 UTC

OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

Hi,

I was trying to port my code from spark 1.5.2 to spark 2.0 however i faced
some outofMemory issues. On drilling down i could see that OOM is because of
join, because removing join fixes the issue. I then created a small
spark-app to reproduce this:

(48 cores, 300gb ram - divided among 4 workers)

line1 :df1 = Read a set a of parquet files into dataframe
line2: df1.count
line3: df2 = Read data from hbase using custom DefaultSource (implemented
using TableScan)
line4: df2.count
line5: df3 = df1.join(df2, df1("field1") === df2("field2"), "inner")
line6: df3.count -> *this is where it fails in Spark 2.0 and runs fine in
spark 1.5.2*

Problem:
First lot of WARN messages
2016-06-09 08:14:18,884 WARN  [broadcast-exchange-0]
memory.TaskMemoryManager (TaskMemoryManager.java:allocatePage(264)) - Failed
to allocate a page (1048576 bytes), try again.
And then OOM

I then tried to dump data fetched from hbase into s3 and then created df2
from s3 rather than hbase, then it worked fine in spark 2.0 as well.

Could someone please guide me through next steps?

Thanks
Ravi
Computer Scientist @ Adobe




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-when-doing-joins-in-spark-2-0-while-same-code-runs-fine-in-spark-1-5-2-tp27124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

Posted by Ravi Aggarwal <ra...@adobe.com>.
Hi Ian,

Thanks for the information.

I think you are referring to post http://apache-spark-user-list.1001560.n3.nabble.com/How-spark-decides-whether-to-do-BroadcastHashJoin-or-SortMergeJoin-td27369.html.

Yeah I could solve above issue of mine using spark.sql.autoBroadcastJoinThreshold=-1, so that it always results in Sort-Merge join instead of BroadcastHashJoin, Rather ideal fix for me is to calculate size of my custom default source (BaseRelation’s sizeInBytes) in right manner, to make spark planner take appropriate decision for me.

Thanks
Ravi

From: ianoconnell@gmail.com [mailto:ianoconnell@gmail.com] On Behalf Of Ian O'Connell
Sent: Wednesday, July 20, 2016 11:05 PM
To: Ravi Aggarwal <ra...@adobe.com>
Cc: Ted Yu <yu...@gmail.com>; user <us...@spark.apache.org>
Subject: Re: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

Ravi did your issue ever get solved for this?

I think i've been hitting the same thing, it looks like the spark.sql.autoBroadcastJoinThreshold stuff isn't kicking in as expected, if I set that to -1 then the computation proceeds successfully.

On Tue, Jun 14, 2016 at 12:28 AM, Ravi Aggarwal <ra...@adobe.com>> wrote:
Hi,

Is there any breakthrough here?

I had one more observation while debugging the issue
Here are the 4 types of data I had:

Da -> stored in parquet
Di -> stored in parquet
Dl1 -> parquet version of lookup
Dl2 -> hbase version of lookup

Joins performed and type of join done by spark:
Da and Di             Sort-merge         failed (OOM)
Da and Dl1           B-H                         passed
Da and Dl2           Sort-Merge        passed
Di and Dl1            B-H                         passed
Di and Dl2            Sort-Merge        failed (OOM)

From entries I can deduce that problem is with sort-merge join involving Di.
So the hbase thing is out of equation, that is not the culprit.
In physical plan I could see there are only two operations that are done additionally in sort-merge as compared to Broadcast-hash.

==> Exchange Hashpartitioning

==> Sort
And finally sort-merge join.

Can we deduce anything from this?

Thanks
Ravi
From: Ravi Aggarwal
Sent: Friday, June 10, 2016 12:31 PM
To: 'Ted Yu' <yu...@gmail.com>>
Cc: user <us...@spark.apache.org>>
Subject: RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

Hi Ted,
Thanks for the reply.

Here is the code
Btw – df.count is running fine on dataframe generated from this default source. I think it is something in the combination of join and hbase data source that is creating issue. But not sure entirely.
I have also dumped the physical plans of both approaches s3a/s3a join and s3a/hbase join, In case you want that let me know.

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, FileFormat}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.slf4j.LoggerFactory

class DefaultSource extends SchemaRelationProvider with FileFormat {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType) = {
    new HBaseRelation(schema, parameters)(sqlContext)
  }

  def inferSchema(sparkSession: SparkSession,
                  options: Map[String, String],
                  files: Seq[FileStatus]): Option[StructType] = ???

  def prepareWrite(sparkSession: SparkSession,
                   job: Job,
                   options: Map[String, String],
                   dataSchema: StructType): OutputWriterFactory = ???
}

object HBaseConfigurationUtil {
  lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
  val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    conf.set("hbase.mapred.outputtable", tableName)
    conf.set("hbase.zookeeper.quorum", hbaseQuorum)
    conf
  }
}

class HBaseRelation(val schema: StructType, parameters: Map[String, String])
                   (@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {

  import sqlContext.sparkContext

  override def buildScan(): RDD[Row] = {

    val bcDataSchema = sparkContext.broadcast(schema)

    val tableName = parameters.get("path") match {
      case Some(t) => t
      case _ => throw new RuntimeException("Table name (path) not provided in parameters")
    }

    val hbaseQuorum = parameters.get("hbaseQuorum") match {
      case Some(s: String) => s
      case _ => throw new RuntimeException("hbaseQuorum not provided in options")
    }

    val rdd = sparkContext.newAPIHadoopRDD(
      HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum),
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )

    val rowRdd = rdd
      .map(tuple => tuple._2)
      .map { record =>

      val cells: java.util.List[Cell] = record.listCells()

      val splitRec = cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0)))) {(a, b) =>
        a :+ CellUtil.cloneValue(b.asInstanceOf[Cell])
      }

      val keyFieldName = bcDataSchema.value.fields.filter(e => e.metadata.contains("isPrimary") && e.metadata.getBoolean("isPrimary"))(0).name

      val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b) => {
        val fieldCell = b.asInstanceOf[Cell]
        a :+ new String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset, fieldCell.getQualifierLength + fieldCell.getQualifierOffset)
      }
      }

      val res = Map(schemaArr.zip(splitRec).toArray: _*)

      val recordFields = res.map(value => {
        val colDataType =
          try {
            bcDataSchema.value.fields.filter(_.name == value._1)(0).dataType
          } catch {
            case e: ArrayIndexOutOfBoundsException => throw new RuntimeException("Schema doesn't contain the fieldname")
          }
        CatalystTypeConverters.convertToScala(
          Cast(Literal(value._2), colDataType).eval(),
          colDataType)
      }).toArray
      Row(recordFields: _*)
    }

    rowRdd
  }
}

Thanks
Ravi

From: Ted Yu [mailto:yuzhihong@gmail.com]
Sent: Thursday, June 9, 2016 7:56 PM
To: Ravi Aggarwal <ra...@adobe.com>>
Cc: user <us...@spark.apache.org>>
Subject: Re: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

bq. Read data from hbase using custom DefaultSource (implemented using TableScan)

Did you use the DefaultSource from hbase-spark module in hbase master branch ?
If you wrote your own, mind sharing related code ?

Thanks

On Thu, Jun 9, 2016 at 2:53 AM, raaggarw <ra...@adobe.com>> wrote:
Hi,

I was trying to port my code from spark 1.5.2 to spark 2.0 however i faced
some outofMemory issues. On drilling down i could see that OOM is because of
join, because removing join fixes the issue. I then created a small
spark-app to reproduce this:

(48 cores, 300gb ram - divided among 4 workers)

line1 :df1 = Read a set a of parquet files into dataframe
line2: df1.count
line3: df2 = Read data from hbase using custom DefaultSource (implemented
using TableScan)
line4: df2.count
line5: df3 = df1.join(df2, df1("field1") === df2("field2"), "inner")
line6: df3.count -> *this is where it fails in Spark 2.0 and runs fine in
spark 1.5.2*

Problem:
First lot of WARN messages
2016-06-09 08:14:18,884 WARN  [broadcast-exchange-0]
memory.TaskMemoryManager (TaskMemoryManager.java:allocatePage(264)) - Failed
to allocate a page (1048576 bytes), try again.
And then OOM

I then tried to dump data fetched from hbase into s3 and then created df2
from s3 rather than hbase, then it worked fine in spark 2.0 as well.

Could someone please guide me through next steps?

Thanks
Ravi
Computer Scientist @ Adobe




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-when-doing-joins-in-spark-2-0-while-same-code-runs-fine-in-spark-1-5-2-tp27124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org<ma...@spark.apache.org>



Re: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

Posted by Ian O'Connell <ia...@ianoconnell.com>.
Ravi did your issue ever get solved for this?

I think i've been hitting the same thing, it looks like
the spark.sql.autoBroadcastJoinThreshold stuff isn't kicking in as
expected, if I set that to -1 then the computation proceeds successfully.

On Tue, Jun 14, 2016 at 12:28 AM, Ravi Aggarwal <ra...@adobe.com> wrote:

> Hi,
>
>
>
> Is there any breakthrough here?
>
>
>
> I had one more observation while debugging the issue
>
> Here are the 4 types of data I had:
>
>
>
> Da -> stored in parquet
>
> Di -> stored in parquet
>
> Dl1 -> parquet version of lookup
>
> Dl2 -> hbase version of lookup
>
>
>
> Joins performed and type of join done by spark:
>
> Da and Di             Sort-merge         failed (OOM)
>
> Da and Dl1           B-H                         passed
>
> Da and Dl2           Sort-Merge        passed
>
> Di and Dl1            B-H                         passed
>
> Di and Dl2            Sort-Merge        failed (OOM)
>
>
>
> From entries I can deduce that problem is with sort-merge join involving
> Di.
>
> So the hbase thing is out of equation, that is not the culprit.
>
> In physical plan I could see there are only two operations that are done
> additionally in sort-merge as compared to Broadcast-hash.
>
> è Exchange Hashpartitioning
>
> è Sort
>
> And finally sort-merge join.
>
>
>
> Can we deduce anything from this?
>
>
>
> Thanks
>
> Ravi
>
> *From:* Ravi Aggarwal
> *Sent:* Friday, June 10, 2016 12:31 PM
> *To:* 'Ted Yu' <yu...@gmail.com>
> *Cc:* user <us...@spark.apache.org>
> *Subject:* RE: OutOfMemory when doing joins in spark 2.0 while same code
> runs fine in spark 1.5.2
>
>
>
> Hi Ted,
>
> Thanks for the reply.
>
>
>
> Here is the code
>
> Btw – df.count is running fine on dataframe generated from this default
> source. I think it is something in the combination of join and hbase data
> source that is creating issue. But not sure entirely.
>
> I have also dumped the physical plans of both approaches s3a/s3a join and
> s3a/hbase join, In case you want that let me know.
>
>
>
> import org.apache.hadoop.fs.FileStatus
>
> import org.apache.hadoop.hbase.client._
>
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>
> import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
>
> import org.apache.hadoop.hbase._
>
> import org.apache.hadoop.mapreduce.Job
>
> import org.apache.spark.rdd.RDD
>
> import org.apache.spark.sql.Row
>
> import org.apache.spark.sql.catalyst.CatalystTypeConverters
>
> import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
>
> import org.apache.spark.sql.execution.datasources.{OutputWriterFactory,
> FileFormat}
>
> import org.apache.spark.sql.sources._
>
> import org.apache.spark.sql.types._
>
> import org.apache.spark.sql._
>
> import org.slf4j.LoggerFactory
>
>
>
> class DefaultSource extends SchemaRelationProvider with FileFormat {
>
>
>
>   override def createRelation(sqlContext: SQLContext, parameters:
> Map[String, String], schema: StructType) = {
>
>     new HBaseRelation(schema, parameters)(sqlContext)
>
>   }
>
>
>
>   def inferSchema(sparkSession: SparkSession,
>
>                   options: Map[String, String],
>
>                   files: Seq[FileStatus]): Option[StructType] = ???
>
>
>
>   def prepareWrite(sparkSession: SparkSession,
>
>                    job: Job,
>
>                    options: Map[String, String],
>
>                    dataSchema: StructType): OutputWriterFactory = ???
>
> }
>
>
>
> object HBaseConfigurationUtil {
>
>   lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
>
>   val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
>
>     val conf = HBaseConfiguration.create()
>
>     conf.set(TableInputFormat.INPUT_TABLE, tableName)
>
>     conf.set("hbase.mapred.outputtable", tableName)
>
>     conf.set("hbase.zookeeper.quorum", hbaseQuorum)
>
>     conf
>
>   }
>
> }
>
>
>
> class HBaseRelation(val schema: StructType, parameters: Map[String,
> String])
>
>                    (@transient val sqlContext: SQLContext) extends
> BaseRelation with TableScan {
>
>
>
>   import sqlContext.sparkContext
>
>
>
>   override def buildScan(): RDD[Row] = {
>
>
>
>     val bcDataSchema = sparkContext.broadcast(schema)
>
>
>
>     val tableName = parameters.get("path") match {
>
>       case Some(t) => t
>
>       case _ => throw new RuntimeException("Table name (path) not provided
> in parameters")
>
>     }
>
>
>
>     val hbaseQuorum = parameters.get("hbaseQuorum") match {
>
>       case Some(s: String) => s
>
>       case _ => throw new RuntimeException("hbaseQuorum not provided in
> options")
>
>     }
>
>
>
>     val rdd = sparkContext.newAPIHadoopRDD(
>
>       HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum),
>
>       classOf[TableInputFormat],
>
>       classOf[ImmutableBytesWritable],
>
>       classOf[Result]
>
>     )
>
>
>
>     val rowRdd = rdd
>
>       .map(tuple => tuple._2)
>
>       .map { record =>
>
>
>
>       val cells: java.util.List[Cell] = record.listCells()
>
>
>
>       val splitRec =
> cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0)))) {(a, b) =>
>
>         a :+ CellUtil.cloneValue(b.asInstanceOf[Cell])
>
>       }
>
>
>
>       val keyFieldName = bcDataSchema.value.fields.filter(e =>
> e.metadata.contains("isPrimary") &&
> e.metadata.getBoolean("isPrimary"))(0).name
>
>
>
>       val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b)
> => {
>
>         val fieldCell = b.asInstanceOf[Cell]
>
>         a :+ new
> String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset,
> fieldCell.getQualifierLength + fieldCell.getQualifierOffset)
>
>       }
>
>       }
>
>
>
>       val res = Map(schemaArr.zip(splitRec).toArray: _*)
>
>
>
>       val recordFields = res.map(value => {
>
>         val colDataType =
>
>           try {
>
>             bcDataSchema.value.fields.filter(_.name ==
> value._1)(0).dataType
>
>           } catch {
>
>             case e: ArrayIndexOutOfBoundsException => throw new
> RuntimeException("Schema doesn't contain the fieldname")
>
>           }
>
>         CatalystTypeConverters.convertToScala(
>
>           Cast(Literal(value._2), colDataType).eval(),
>
>           colDataType)
>
>       }).toArray
>
>       Row(recordFields: _*)
>
>     }
>
>
>
>     rowRdd
>
>   }
>
> }
>
>
>
> Thanks
>
> Ravi
>
>
>
> *From:* Ted Yu [mailto:yuzhihong@gmail.com <yu...@gmail.com>]
> *Sent:* Thursday, June 9, 2016 7:56 PM
> *To:* Ravi Aggarwal <ra...@adobe.com>
> *Cc:* user <us...@spark.apache.org>
> *Subject:* Re: OutOfMemory when doing joins in spark 2.0 while same code
> runs fine in spark 1.5.2
>
>
>
> bq. Read data from hbase using custom DefaultSource (implemented using
> TableScan)
>
>
>
> Did you use the DefaultSource from hbase-spark module in hbase master
> branch ?
>
> If you wrote your own, mind sharing related code ?
>
>
>
> Thanks
>
>
>
> On Thu, Jun 9, 2016 at 2:53 AM, raaggarw <ra...@adobe.com> wrote:
>
> Hi,
>
> I was trying to port my code from spark 1.5.2 to spark 2.0 however i faced
> some outofMemory issues. On drilling down i could see that OOM is because
> of
> join, because removing join fixes the issue. I then created a small
> spark-app to reproduce this:
>
> (48 cores, 300gb ram - divided among 4 workers)
>
> line1 :df1 = Read a set a of parquet files into dataframe
> line2: df1.count
> line3: df2 = Read data from hbase using custom DefaultSource (implemented
> using TableScan)
> line4: df2.count
> line5: df3 = df1.join(df2, df1("field1") === df2("field2"), "inner")
> line6: df3.count -> *this is where it fails in Spark 2.0 and runs fine in
> spark 1.5.2*
>
> Problem:
> First lot of WARN messages
> 2016-06-09 08:14:18,884 WARN  [broadcast-exchange-0]
> memory.TaskMemoryManager (TaskMemoryManager.java:allocatePage(264)) -
> Failed
> to allocate a page (1048576 bytes), try again.
> And then OOM
>
> I then tried to dump data fetched from hbase into s3 and then created df2
> from s3 rather than hbase, then it worked fine in spark 2.0 as well.
>
> Could someone please guide me through next steps?
>
> Thanks
> Ravi
> Computer Scientist @ Adobe
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-when-doing-joins-in-spark-2-0-while-same-code-runs-fine-in-spark-1-5-2-tp27124.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
>

RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

Posted by Ravi Aggarwal <ra...@adobe.com>.
Hi,

Is there any breakthrough here?

I had one more observation while debugging the issue
Here are the 4 types of data I had:

Da -> stored in parquet
Di -> stored in parquet
Dl1 -> parquet version of lookup
Dl2 -> hbase version of lookup

Joins performed and type of join done by spark:
Da and Di             Sort-merge         failed (OOM)
Da and Dl1           B-H                         passed
Da and Dl2           Sort-Merge        passed
Di and Dl1            B-H                         passed
Di and Dl2            Sort-Merge        failed (OOM)

From entries I can deduce that problem is with sort-merge join involving Di.
So the hbase thing is out of equation, that is not the culprit.
In physical plan I could see there are only two operations that are done additionally in sort-merge as compared to Broadcast-hash.

è Exchange Hashpartitioning

è Sort
And finally sort-merge join.

Can we deduce anything from this?

Thanks
Ravi
From: Ravi Aggarwal
Sent: Friday, June 10, 2016 12:31 PM
To: 'Ted Yu' <yu...@gmail.com>
Cc: user <us...@spark.apache.org>
Subject: RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

Hi Ted,
Thanks for the reply.

Here is the code
Btw – df.count is running fine on dataframe generated from this default source. I think it is something in the combination of join and hbase data source that is creating issue. But not sure entirely.
I have also dumped the physical plans of both approaches s3a/s3a join and s3a/hbase join, In case you want that let me know.

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, FileFormat}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.slf4j.LoggerFactory

class DefaultSource extends SchemaRelationProvider with FileFormat {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType) = {
    new HBaseRelation(schema, parameters)(sqlContext)
  }

  def inferSchema(sparkSession: SparkSession,
                  options: Map[String, String],
                  files: Seq[FileStatus]): Option[StructType] = ???

  def prepareWrite(sparkSession: SparkSession,
                   job: Job,
                   options: Map[String, String],
                   dataSchema: StructType): OutputWriterFactory = ???
}

object HBaseConfigurationUtil {
  lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
  val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    conf.set("hbase.mapred.outputtable", tableName)
    conf.set("hbase.zookeeper.quorum", hbaseQuorum)
    conf
  }
}

class HBaseRelation(val schema: StructType, parameters: Map[String, String])
                   (@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {

  import sqlContext.sparkContext

  override def buildScan(): RDD[Row] = {

    val bcDataSchema = sparkContext.broadcast(schema)

    val tableName = parameters.get("path") match {
      case Some(t) => t
      case _ => throw new RuntimeException("Table name (path) not provided in parameters")
    }

    val hbaseQuorum = parameters.get("hbaseQuorum") match {
      case Some(s: String) => s
      case _ => throw new RuntimeException("hbaseQuorum not provided in options")
    }

    val rdd = sparkContext.newAPIHadoopRDD(
      HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum),
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )

    val rowRdd = rdd
      .map(tuple => tuple._2)
      .map { record =>

      val cells: java.util.List[Cell] = record.listCells()

      val splitRec = cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0)))) {(a, b) =>
        a :+ CellUtil.cloneValue(b.asInstanceOf[Cell])
      }

      val keyFieldName = bcDataSchema.value.fields.filter(e => e.metadata.contains("isPrimary") && e.metadata.getBoolean("isPrimary"))(0).name

      val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b) => {
        val fieldCell = b.asInstanceOf[Cell]
        a :+ new String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset, fieldCell.getQualifierLength + fieldCell.getQualifierOffset)
      }
      }

      val res = Map(schemaArr.zip(splitRec).toArray: _*)

      val recordFields = res.map(value => {
        val colDataType =
          try {
            bcDataSchema.value.fields.filter(_.name == value._1)(0).dataType
          } catch {
            case e: ArrayIndexOutOfBoundsException => throw new RuntimeException("Schema doesn't contain the fieldname")
          }
        CatalystTypeConverters.convertToScala(
          Cast(Literal(value._2), colDataType).eval(),
          colDataType)
      }).toArray
      Row(recordFields: _*)
    }

    rowRdd
  }
}

Thanks
Ravi

From: Ted Yu [mailto:yuzhihong@gmail.com]
Sent: Thursday, June 9, 2016 7:56 PM
To: Ravi Aggarwal <ra...@adobe.com>>
Cc: user <us...@spark.apache.org>>
Subject: Re: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

bq. Read data from hbase using custom DefaultSource (implemented using TableScan)

Did you use the DefaultSource from hbase-spark module in hbase master branch ?
If you wrote your own, mind sharing related code ?

Thanks

On Thu, Jun 9, 2016 at 2:53 AM, raaggarw <ra...@adobe.com>> wrote:
Hi,

I was trying to port my code from spark 1.5.2 to spark 2.0 however i faced
some outofMemory issues. On drilling down i could see that OOM is because of
join, because removing join fixes the issue. I then created a small
spark-app to reproduce this:

(48 cores, 300gb ram - divided among 4 workers)

line1 :df1 = Read a set a of parquet files into dataframe
line2: df1.count
line3: df2 = Read data from hbase using custom DefaultSource (implemented
using TableScan)
line4: df2.count
line5: df3 = df1.join(df2, df1("field1") === df2("field2"), "inner")
line6: df3.count -> *this is where it fails in Spark 2.0 and runs fine in
spark 1.5.2*

Problem:
First lot of WARN messages
2016-06-09 08:14:18,884 WARN  [broadcast-exchange-0]
memory.TaskMemoryManager (TaskMemoryManager.java:allocatePage(264)) - Failed
to allocate a page (1048576 bytes), try again.
And then OOM

I then tried to dump data fetched from hbase into s3 and then created df2
from s3 rather than hbase, then it worked fine in spark 2.0 as well.

Could someone please guide me through next steps?

Thanks
Ravi
Computer Scientist @ Adobe




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-when-doing-joins-in-spark-2-0-while-same-code-runs-fine-in-spark-1-5-2-tp27124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org<ma...@spark.apache.org>


RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

Posted by Ravi Aggarwal <ra...@adobe.com>.
Hi Ted,
Thanks for the reply.

Here is the code
Btw – df.count is running fine on dataframe generated from this default source. I think it is something in the combination of join and hbase data source that is creating issue. But not sure entirely.
I have also dumped the physical plans of both approaches s3a/s3a join and s3a/hbase join, In case you want that let me know.

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, FileFormat}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.slf4j.LoggerFactory

class DefaultSource extends SchemaRelationProvider with FileFormat {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType) = {
    new HBaseRelation(schema, parameters)(sqlContext)
  }

  def inferSchema(sparkSession: SparkSession,
                  options: Map[String, String],
                  files: Seq[FileStatus]): Option[StructType] = ???

  def prepareWrite(sparkSession: SparkSession,
                   job: Job,
                   options: Map[String, String],
                   dataSchema: StructType): OutputWriterFactory = ???
}

object HBaseConfigurationUtil {
  lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
  val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    conf.set("hbase.mapred.outputtable", tableName)
    conf.set("hbase.zookeeper.quorum", hbaseQuorum)
    conf
  }
}

class HBaseRelation(val schema: StructType, parameters: Map[String, String])
                   (@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {

  import sqlContext.sparkContext

  override def buildScan(): RDD[Row] = {

    val bcDataSchema = sparkContext.broadcast(schema)

    val tableName = parameters.get("path") match {
      case Some(t) => t
      case _ => throw new RuntimeException("Table name (path) not provided in parameters")
    }

    val hbaseQuorum = parameters.get("hbaseQuorum") match {
      case Some(s: String) => s
      case _ => throw new RuntimeException("hbaseQuorum not provided in options")
    }

    val rdd = sparkContext.newAPIHadoopRDD(
      HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum),
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )

    val rowRdd = rdd
      .map(tuple => tuple._2)
      .map { record =>

      val cells: java.util.List[Cell] = record.listCells()

      val splitRec = cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0)))) {(a, b) =>
        a :+ CellUtil.cloneValue(b.asInstanceOf[Cell])
      }

      val keyFieldName = bcDataSchema.value.fields.filter(e => e.metadata.contains("isPrimary") && e.metadata.getBoolean("isPrimary"))(0).name

      val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b) => {
        val fieldCell = b.asInstanceOf[Cell]
        a :+ new String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset, fieldCell.getQualifierLength + fieldCell.getQualifierOffset)
      }
      }

      val res = Map(schemaArr.zip(splitRec).toArray: _*)

      val recordFields = res.map(value => {
        val colDataType =
          try {
            bcDataSchema.value.fields.filter(_.name == value._1)(0).dataType
          } catch {
            case e: ArrayIndexOutOfBoundsException => throw new RuntimeException("Schema doesn't contain the fieldname")
          }
        CatalystTypeConverters.convertToScala(
          Cast(Literal(value._2), colDataType).eval(),
          colDataType)
      }).toArray
      Row(recordFields: _*)
    }

    rowRdd
  }
}

Thanks
Ravi

From: Ted Yu [mailto:yuzhihong@gmail.com]
Sent: Thursday, June 9, 2016 7:56 PM
To: Ravi Aggarwal <ra...@adobe.com>
Cc: user <us...@spark.apache.org>
Subject: Re: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

bq. Read data from hbase using custom DefaultSource (implemented using TableScan)

Did you use the DefaultSource from hbase-spark module in hbase master branch ?
If you wrote your own, mind sharing related code ?

Thanks

On Thu, Jun 9, 2016 at 2:53 AM, raaggarw <ra...@adobe.com>> wrote:
Hi,

I was trying to port my code from spark 1.5.2 to spark 2.0 however i faced
some outofMemory issues. On drilling down i could see that OOM is because of
join, because removing join fixes the issue. I then created a small
spark-app to reproduce this:

(48 cores, 300gb ram - divided among 4 workers)

line1 :df1 = Read a set a of parquet files into dataframe
line2: df1.count
line3: df2 = Read data from hbase using custom DefaultSource (implemented
using TableScan)
line4: df2.count
line5: df3 = df1.join(df2, df1("field1") === df2("field2"), "inner")
line6: df3.count -> *this is where it fails in Spark 2.0 and runs fine in
spark 1.5.2*

Problem:
First lot of WARN messages
2016-06-09 08:14:18,884 WARN  [broadcast-exchange-0]
memory.TaskMemoryManager (TaskMemoryManager.java:allocatePage(264)) - Failed
to allocate a page (1048576 bytes), try again.
And then OOM

I then tried to dump data fetched from hbase into s3 and then created df2
from s3 rather than hbase, then it worked fine in spark 2.0 as well.

Could someone please guide me through next steps?

Thanks
Ravi
Computer Scientist @ Adobe




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-when-doing-joins-in-spark-2-0-while-same-code-runs-fine-in-spark-1-5-2-tp27124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org<ma...@spark.apache.org>


Re: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

Posted by Ted Yu <yu...@gmail.com>.
bq. Read data from hbase using custom DefaultSource (implemented using
TableScan)

Did you use the DefaultSource from hbase-spark module in hbase master
branch ?
If you wrote your own, mind sharing related code ?

Thanks

On Thu, Jun 9, 2016 at 2:53 AM, raaggarw <ra...@adobe.com> wrote:

> Hi,
>
> I was trying to port my code from spark 1.5.2 to spark 2.0 however i faced
> some outofMemory issues. On drilling down i could see that OOM is because
> of
> join, because removing join fixes the issue. I then created a small
> spark-app to reproduce this:
>
> (48 cores, 300gb ram - divided among 4 workers)
>
> line1 :df1 = Read a set a of parquet files into dataframe
> line2: df1.count
> line3: df2 = Read data from hbase using custom DefaultSource (implemented
> using TableScan)
> line4: df2.count
> line5: df3 = df1.join(df2, df1("field1") === df2("field2"), "inner")
> line6: df3.count -> *this is where it fails in Spark 2.0 and runs fine in
> spark 1.5.2*
>
> Problem:
> First lot of WARN messages
> 2016-06-09 08:14:18,884 WARN  [broadcast-exchange-0]
> memory.TaskMemoryManager (TaskMemoryManager.java:allocatePage(264)) -
> Failed
> to allocate a page (1048576 bytes), try again.
> And then OOM
>
> I then tried to dump data fetched from hbase into s3 and then created df2
> from s3 rather than hbase, then it worked fine in spark 2.0 as well.
>
> Could someone please guide me through next steps?
>
> Thanks
> Ravi
> Computer Scientist @ Adobe
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-when-doing-joins-in-spark-2-0-while-same-code-runs-fine-in-spark-1-5-2-tp27124.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>