You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2016/09/04 14:50:59 UTC

Generating random Data using Spark and saving it to table, views appreciated

Hi All,

The following code creates an array of certain rows in Spark and saves the
output into a Hive ORC table. You can save it in whatever format you prefer.
I wanted to create generic test data in Spark. It is not something standard
but had similar approach for Oracle.
It is a cooked up stuff and not necessarily clever. It creates columns of
data using primitive types. Anyone is welcomed to take it, modify it and
let me know if it can be bettered.
Also it uses to see whether that ORC table exists or not. It uses a
monolithically increasing ID for rows and if data is there, it starts from
MAX(ID) + 1

It would be interesting if one can add complex columns to it as well?

Appreciate any comments or help

The code attached as well

class UsedFunctions {
  import scala.util.Random
  import scala.math._
  import org.apache.spark.sql.functions._
  def randomString(chars: String, length: Int): String =
     (0 until length).map(_ => chars(Random.nextInt(chars.length))).mkString
  def clustered(id : Int, numRows: Int) : Double  = (id - 1).floor/numRows
  def scattered(id : Int, numRows: Int) : Double  = (id - 1 % numRows).abs
  def randomised(seed: Int, numRows: Int) : Double  = (Random.nextInt(seed)
% numRows).abs
  def padString(id: Int, chars: String, length: Int): String =
     (0 until length).map(_ => chars(Random.nextInt(chars.length))).mkString
+ id.toString
  def padSingleChar(chars: String, length: Int): String =
     (0 until length).map(_ => chars(Random.nextInt(chars.length))).mkString
}
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
//spark.udf.register("randomString", randomString(_:String, _:Int))
case class columns (
                       id: Int
                     , clustered: Double
                     , scattered: Double
                     , randomised: Double
                     , random_string: String
                     , small_vc: String
                     , padding: String
                     , padding2: String
                   )
//val chars = ('a' to 'z') ++ ('A' to 'Z') ++ ('0' to '9') ++ ("-!£$")
val chars = ('a' to 'z') ++ ('A' to 'Z')
//
//get the max(ID) from test.dummy2
//
val numRows = 60000   //do in increment of 50K rows otherwise you blow up
driver memory!
val rows = HiveContext.sql("SELECT COUNT(1) FROM
test.dummy2").collect.apply(0).getLong(0)
var start = 0
if (rows == 0) {
    start = 1
} else {
   val maxID = HiveContext.sql("SELECT MAX(id) FROM
test.dummy2").collect.apply(0).getInt(0)
   start = maxID + 1
}
val end = start + numRows - 1
println (" starting at ID = " + start + " , ending on = " +  end )
val UsedFunctions = new UsedFunctions
val text = ( start to end ).map(i =>
             (
                 i.toString
               , UsedFunctions.clustered(i,numRows).toString
               , UsedFunctions.scattered(i,numRows).toString
               , UsedFunctions.randomised(i,numRows).toString
               , UsedFunctions.randomString(chars.mkString(""),50)
               , UsedFunctions.padString(i, " ", 10)
               , UsedFunctions.padSingleChar("x ", 4000)
               , UsedFunctions.padSingleChar("y ", 4000)
             )
           ).
    toArray
val df = sc.parallelize(text).
                              map(p => columns(
                                                  p._1.toString.toInt
                                                , p._2.toString.toDouble
                                                , p._3.toString.toDouble
                                                , p._4.toString.toDouble
                                                , p._5.toString
                                                , p._6.toString
                                                , p._7.toString
                                                , p._8.toString
                                              )
                                 ).
    toDF
//
// register DF as tempTable
//
df.registerTempTable("tmp")
// Need to create and populate target ORC table test.dummy2
//
//HiveContext.sql("""DROP TABLE IF EXISTS test.dummy2""")
  var sqltext  = ""
  sqltext = """
  CREATE TABLE if not exists test.dummy2(
     ID INT
   , CLUSTERED INT
   , SCATTERED INT
   , RANDOMISED INT
   , RANDOM_STRING VARCHAR(50)
   , SMALL_VC VARCHAR(10)
   , PADDING  VARCHAR(4000)
   , PADDING2 VARCHAR(4000)
  )
  --CLUSTERED BY (ID) INTO 256 BUCKETS
  STORED AS ORC
  TBLPROPERTIES (
  "orc.create.index"="true",
  "orc.bloom.filter.columns"="ID",
  "orc.bloom.filter.fpp"="0.05",
  "orc.compress"="SNAPPY",
  "orc.stripe.size"="16777216",
  "orc.row.index.stride"="10000" )
  """
   HiveContext.sql(sqltext)
  //
  // Put data in Hive table. Clean up is already done
  //
  sqltext = """
  INSERT INTO TABLE test.dummy2
  SELECT
          ID
        , CLUSTERED
        , SCATTERED
        , RANDOMISED
        , RANDOM_STRING
        , SMALL_VC
        , PADDING
        , PADDING2
  FROM tmp
  """
  HiveContext.sql(sqltext)
  HiveContext.sql("""select MIN(id) AS minID, MAX(id) AS maxID from
test.dummy2""").show
  println ("\nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
  sys.exit()


Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.