You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Vladimir Vladimirov <sm...@gmail.com> on 2015/10/20 01:38:07 UTC

Problem using User Defined Predicate pushdown with core RDD and parquet - UDP class not found

Hi all

I feel like this questions is more Spark dev related that Spark user
related. Please correct me if I'm wrong.

My project's data flow involves sampling records from the data stored as
Parquet dataset.
I've checked DataFrames API and it doesn't support user defined predicates
projection pushdown - only simple filter expressions.
I want to use custom filter function predicate pushdown feature of parquet
while loading data with newAPIHadoopFile.
Simple filters constructed with org.apache.parquet.filter2 API works fine.
But User Defined Predicate works only with `--master local` mode.

When I try to run in yarn-client mode my test program that uses UDP class
to be used by parquet-mr I'm getting class not found exception.

I suspect that the issue could be related to the way how class loader works
from parquet or maybe it could be related to the fact that Spark executor
processes has my jar loaded from HTTP server and there is some security
policies (classpath shows that the jar URI is actually HTTP URL and not
local file).

I've tried to create uber jar with all dependencies and shipt it with the
spark app - no success.

PS I'm using spark 1.5.1.

Here is my command line I'm using to submit the application:

SPARK_CLASSPATH=./lib/my-jar-with-dependencies.jar spark-submit \
    --master yarn-client
    --num-executors 3 --driver-memory 3G --executor-memory 2G \
    --executor-cores 1 \
    --jars
./lib/my-jar-with-dependencies.jar,./lib/snappy-java-1.1.2.jar,./lib/parquet-hadoop-1.7.0.jar,./lib/parquet-avro-1.7.0.jar,./lib/parquet-column-1.7.0.jar,/opt/cloudera/parcels/CDH/jars/avro-1.7.6-cdh5.4.0.jar,/opt/cloudera/parcels/CDH/jars/avro-mapred-1.7.6-cdh5.4.0-hadoop2.jar,
\
    --class my.app.parquet.filters.tools.TestSparkApp \
    ./lib/my-jar-with-dependencies.jar \
    yarn-client \
    "/user/vvlad/2015/*/*/*/EVENTS"

Here is the code of my UDP class:

package my.app.parquet.filters.udp

import org.apache.parquet.filter2.predicate.Statistics
import org.apache.parquet.filter2.predicate.UserDefinedPredicate


import java.lang.{Integer => JInt}

import scala.util.Random

class SampleIntColumn(threshold: Double) extends UserDefinedPredicate[JInt]
with Serializable {
  lazy val random = { new Random() }
  val myThreshold = threshold
  override def keep(value: JInt): Boolean = {
    random.nextFloat() < myThreshold
  }

  override def canDrop(statistics: Statistics[JInt]): Boolean = false

  override def inverseCanDrop(statistics: Statistics[JInt]): Boolean = false

  override def toString: String = {
    "%s(%f)".format(getClass.getName, myThreshold)
  }
}

Spark app:

package my.app.parquet.filters.tools

import my.app.parquet.filters.udp.SampleIntColumn
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.avro.AvroReadSupport
import org.apache.parquet.filter2.dsl.Dsl.IntColumn
import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.spark.{SparkContext, SparkConf}

import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.parquet.filter2.predicate.FilterPredicate


object TestSparkApp {
  def main (args: Array[String]) {
    val conf = new SparkConf()
      //"local[2]" or yarn-client etc
      .setMaster(args(0))
      .setAppName("Spark Scala App")
      .set("spark.executor.memory", "1g")
      .set("spark.rdd.compress", "true")
      .set("spark.storage.memoryFraction", "1")

    val sc = new SparkContext(conf)

    val job = new Job(sc.hadoopConfiguration)
    ParquetInputFormat.setReadSupportClass(job,
classOf[AvroReadSupport[GenericRecord]])

    val sampler = new SampleIntColumn(0.05)
    val impField = IntColumn("impression")

    val pred: FilterPredicate = impField.filterBy(sampler)

    ParquetInputFormat.setFilterPredicate(job.getConfiguration, pred)


println(job.getConfiguration.get("parquet.private.read.filter.predicate"))

println(job.getConfiguration.get("parquet.private.read.filter.predicate.human.readable"))

    val records1 = sc.newAPIHadoopFile(
    //<path to parquet>
      args(1),
      classOf[ParquetInputFormat[GenericRecord]],
      classOf[Void],
      classOf[GenericRecord],
      job.getConfiguration
    ).map(_._2).cache()

    println("result count " + records1.count().toString)

    sc.stop()
  }
}



Here are logs with exception I'm getting:


15/10/19 11:14:43 INFO TaskSetManager: Starting task 21.0 in stage 0.0 (TID
0, hdp010........, NODE_LOCAL, 2815 bytes)
15/10/19 11:14:43 INFO TaskSetManager: Starting task 14.0 in stage 0.0 (TID
1, hdp042........, NODE_LOCAL, 2816 bytes)
15/10/19 11:14:43 INFO YarnClientSchedulerBackend: Registered executor:
AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@hdp027........:43593/user/Executor#-832887318])
with ID 3
15/10/19 11:14:43 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
2, hdp027........, NODE_LOCAL, 2814 bytes)
15/10/19 11:14:44 INFO BlockManagerMasterEndpoint: Registering block
manager hdp027........:64266 with 883.8 MB RAM, BlockManagerId(3,
hdp027........, 64266)
15/10/19 11:14:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on hdp010........:23967 (size: 1516.0 B, free: 883.8 MB)
15/10/19 11:14:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on hdp042........:63034 (size: 1516.0 B, free: 883.8 MB)
15/10/19 11:14:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on hdp010........:23967 (size: 25.1 KB, free: 883.8 MB)
15/10/19 11:14:45 INFO TaskSetManager: Starting task 48.0 in stage 0.0 (TID
3, hdp010........, NODE_LOCAL, 2816 bytes)
15/10/19 11:14:45 WARN TaskSetManager: Lost task 21.0 in stage 0.0 (TID 0,
hdp010........): java.lang.RuntimeException: java.io.IOException: Could not
read object from config with key parquet.private.read.filter.predicate
at
org.apache.parquet.hadoop.ParquetInputFormat.getFilterPredicate(ParquetInputFormat.java:196)
at
org.apache.parquet.hadoop.ParquetInputFormat.getFilter(ParquetInputFormat.java:205)
at
org.apache.parquet.hadoop.ParquetInputFormat.createRecordReader(ParquetInputFormat.java:241)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:151)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:124)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Could not read object from config with key
parquet.private.read.filter.predicate
at
org.apache.parquet.hadoop.util.SerializationUtil.readObjectFromConfAsBase64(SerializationUtil.java:102)
at
org.apache.parquet.hadoop.ParquetInputFormat.getFilterPredicate(ParquetInputFormat.java:194)
... 17 more
Caused by: java.lang.ClassNotFoundException:
my.app.parquet.filters.udp.SampleIntColumn
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.parquet.hadoop.util.SerializationUtil.readObjectFromConfAsBase64(SerializationUtil.java:100)
... 18 more


Best Regards
Vladimir Vladimirov