You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by saurabh3d <sa...@oracle.com> on 2016/11/03 08:14:29 UTC

How to join dstream and JDBCRDD with checkpointing enabled

Hi All,

We have a spark streaming job with checkpoint enabled, it executes correctly
first time, but throw below exception when restarted from checkpoint.

org.apache.spark.SparkException: RDD transformations and actions can only be
invoked by the driver, not inside of other transformations; for example,
rdd1.map(x => rdd2.values.count() * x) is invalid because the values
transformation and count action cannot be performed inside of the rdd1.map
transformation. For more information, see SPARK-5063.
	at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:352)
	at org.apache.spark.rdd.RDD.union(RDD.scala:565)
	at
org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23)
	at
org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19)
	at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)

Please suggest any workaround for this issue. 

Code:
        String URL = "jdbc:oracle:thin:" + USERNAME + "/" + PWD + "@//" +
CONNECTION_STRING;

        Map<String, String> options = ImmutableMap.of(
                "driver", "oracle.jdbc.driver.OracleDriver",
                "url", URL,
                "dbtable", "READINGS_10K",
                "fetchSize", "10000");

        DataFrame OracleDB_DF = sqlContext.load("jdbc", options);
		JavaPairRDD<String, Row> OracleDB_RDD = OracleDB_DF.toJavaRDD()
                .mapToPair(x -> new Tuple2(x.getString(0), x));
	
        Dstream
                .transformToPair(
                        rdd -> rdd
                                .mapToPair(
                                        record ->
                                                new Tuple2<>(
                                                       
record.getKey().toString(),
                                                        record))
                                .join(OracleDB_RDD))
				.print();

Spark version 1.6, running in yarn cluster mode.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-dstream-and-JDBCRDD-with-checkpointing-enabled-tp28001.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org