You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Colin Williams <co...@gmail.com> on 2018/04/11 00:47:55 UTC

Specifying a custom Partitioner on RDD creation in Spark 2

Hi,

I'm currently creating RDDs using a pattern like follows:

val rdd: RDD[String] = session.sparkContext.parallelize(longkeys).flatMap(
  key => {
    logInfo(s"job at key: ${key}")
    Source.fromBytes(S3Util.getBytes(S3Util.getClient(region,
S3Util.getCredentialsProvider("INSTANCE", "")), bucket, key))
      .getLines()
  }
  )

We've been using this pattern or similar to workaround issues
regarding S3 and our hadoop version. However, this same pattern could
might be applied to other types of data sources, which may not have a
connector.

This method has been working out fairly well, but I'd like more
control regarding how the data is partitioned from the start.



I tried to manually partition the data without a partitioner, but got
JVM exceptions regarding my Arrays being to large for the JVM.

    val keyList = groupedKeys.keys.toList
    val rdd: RDD[String] =
session.sparkContext.parallelize(keyList,keyList.length).flatMap {
      key =>
        logInfo(s"job at day: ${key}")
        val byteArrayBuffer = new ArrayBuffer[Byte]()
        val objectKeyList: List[String] = groupedKeys(key)
        objectKeyList.foreach(
          objectKey => {
            logInfo(s"working on object: ${objectKey}")
            byteArrayBuffer.appendAll(S3Util.getBytes(S3Util.getClient(region,
S3Util.getCredentialsProvider("INSTANCE", "")), bucket, objectKey))
          }
        )
        Source.fromBytes(byteArrayBuffer.toArray[Byte]).getLines()
    }



Then I've defined a custom partitioner based on my source data:

  class dayPartitioner(keys: List[String]) extends Partitioner with Logger {

    val keyMap: Map[String, List[String]] = keys.groupBy(_.substring(0, 10))
    val partitions = keyMap.keySet.size
    val partitionMap: Map[String, Int] = keyMap.keys.zipWithIndex.toMap

    override def getPartition(key: Any): Int = {
      val keyString = key.asInstanceOf[String]
      val partitionKey = keyString.substring(0, 10)
      partitionMap(partitionKey)
    }

    override def numPartitions: Int = partitions
  }

}


I'd like to know do I have to create a custom RDD class to specify my
RDD and use it like in the pattern above?

If so I'd also like a reference regarding doing this, to hopefully
save me some headaches and gotchas from a naive approach. I've found
one such example https://stackoverflow.com/a/25204589 but it's from an
older version of Spark.

I'm hoping maybe there is something more recent and more in-depth. I
don't mind references to books or otherwise.


Best,

Colin Williams

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org