You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by sathyanarayanan mudhaliyar <sa...@gmail.com> on 2017/03/09 03:00:06 UTC

        code:
                directKafkaStream.foreachRDD(rdd ->
                {
                    rdd.foreach(record ->
                    {
                        messages1.add(record._2);
                    });
                    JavaRDD<String> lines = sc.parallelize(messages1);
                    JavaPairRDD<Integer, String> data =
lines.mapToPair(new PairFunction<String, Integer, String>()
                    {
                        @Override
                        public Tuple2<Integer, String> call(String a)
                        {
                            String[] tokens = StringUtil.split(a, '%');
                            return new Tuple2<Integer,
String>(Integer.getInteger(tokens[3]),tokens[2]);
                        }
                    });
                    Function2<String, String, String> reduceSumFunc =
(accum, n) -> (accum.concat(n));
                    JavaPairRDD<Integer, String> yearCount =
data.reduceByKey(reduceSumFunc);

javaFunctions(yearCount).writerBuilder("movie_keyspace",
"movie_count", mapTupleToRow(Integer.class,
String.class)).withColumnSelector(someColumns("year","list_of_movies")).saveToCassandra();
// this is the error line
                });




        ----------


        error:
        com.datastax.spark.connector.writer.NullKeyColumnException:
Invalid null value for key column year
        	at com.datastax.spark.connector.writer.RoutingKeyGenerator$$anonfun$fillRoutingKey$1.apply$mcVI$sp(RoutingKeyGenerator.scala:49)
        	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        	at com.datastax.spark.connector.writer.RoutingKeyGenerator.fillRoutingKey(RoutingKeyGenerator.scala:47)
        	at com.datastax.spark.connector.writer.RoutingKeyGenerator.apply(RoutingKeyGenerator.scala:56)
        	at com.datastax.spark.connector.writer.TableWriter.batchRoutingKey(TableWriter.scala:126)
        	at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151)
        	at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151)
        	at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:107)
        	at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
        	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        	at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
        	at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158)
        	at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
        	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
        	at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
        	at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
        	at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
        	at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
        	at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
        	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        	at org.apache.spark.scheduler.Task.run(Task.scala:86)
        	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        	at java.lang.Thread.run(Thread.java:745)


    ----------

    Trying to connect Kafka and cassandra using spark
    Able to store a JavaRDD but not able to store a JavaPairRDD into cassandra
    I have given comment in the line where the error is
    Thank you

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re:

Posted by Marco Mistroni <mm...@gmail.com>.
Try to remove  the Kafka code as it seems Kafka is not the issue. Here.
Create a DS and save to Cassandra and see what happens....Even in the
console
That should give u a starting point?
Hth

On 9 Mar 2017 3:07 am, "sathyanarayanan mudhaliyar" <
sathyanarayananmudhaliyar@gmail.com> wrote:

        code:
                directKafkaStream.foreachRDD(rdd ->
                {
                    rdd.foreach(record ->
                    {
                        messages1.add(record._2);
                    });
                    JavaRDD<String> lines = sc.parallelize(messages1);
                    JavaPairRDD<Integer, String> data =
lines.mapToPair(new PairFunction<String, Integer, String>()
                    {
                        @Override
                        public Tuple2<Integer, String> call(String a)
                        {
                            String[] tokens = StringUtil.split(a, '%');
                            return new Tuple2<Integer,
String>(Integer.getInteger(tokens[3]),tokens[2]);
                        }
                    });
                    Function2<String, String, String> reduceSumFunc =
(accum, n) -> (accum.concat(n));
                    JavaPairRDD<Integer, String> yearCount =
data.reduceByKey(reduceSumFunc);

javaFunctions(yearCount).writerBuilder("movie_keyspace",
"movie_count", mapTupleToRow(Integer.class,
String.class)).withColumnSelector(someColumns("year","list_of_
movies")).saveToCassandra();
// this is the error line
                });




        ----------


        error:
        com.datastax.spark.connector.writer.NullKeyColumnException:
Invalid null value for key column year
                at com.datastax.spark.connector.writer.RoutingKeyGenerator$$
anonfun$fillRoutingKey$1.apply$mcVI$sp(RoutingKeyGenerator.scala:49)
                at scala.collection.immutable.Range.foreach$mVc$sp(Range.
scala:160)
                at com.datastax.spark.connector.writer.RoutingKeyGenerator.
fillRoutingKey(RoutingKeyGenerator.scala:47)
                at com.datastax.spark.connector.writer.RoutingKeyGenerator.
apply(RoutingKeyGenerator.scala:56)
                at com.datastax.spark.connector.writer.TableWriter.
batchRoutingKey(TableWriter.scala:126)
                at com.datastax.spark.connector.writer.TableWriter$$anonfun$
write$1$$anonfun$19.apply(TableWriter.scala:151)
                at com.datastax.spark.connector.writer.TableWriter$$anonfun$
write$1$$anonfun$19.apply(TableWriter.scala:151)
                at com.datastax.spark.connector.writer.GroupingBatchBuilder.
next(GroupingBatchBuilder.scala:107)
                at com.datastax.spark.connector.writer.GroupingBatchBuilder.
next(GroupingBatchBuilder.scala:31)
                at scala.collection.Iterator$class.foreach(Iterator.scala:
893)
                at com.datastax.spark.connector.writer.GroupingBatchBuilder.
foreach(GroupingBatchBuilder.scala:31)
                at com.datastax.spark.connector.writer.TableWriter$$anonfun$
write$1.apply(TableWriter.scala:158)
                at com.datastax.spark.connector.writer.TableWriter$$anonfun$
write$1.apply(TableWriter.scala:135)
                at com.datastax.spark.connector.cql.CassandraConnector$$
anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
                at com.datastax.spark.connector.cql.CassandraConnector$$
anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
                at com.datastax.spark.connector.cql.CassandraConnector.
closeResourceAfterUse(CassandraConnector.scala:140)
                at com.datastax.spark.connector.cql.CassandraConnector.
withSessionDo(CassandraConnector.scala:110)
                at com.datastax.spark.connector.writer.TableWriter.write(
TableWriter.scala:135)
                at com.datastax.spark.connector.RDDFunctions$$anonfun$
saveToCassandra$1.apply(RDDFunctions.scala:37)
                at com.datastax.spark.connector.RDDFunctions$$anonfun$
saveToCassandra$1.apply(RDDFunctions.scala:37)
                at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
scala:70)
                at org.apache.spark.scheduler.Task.run(Task.scala:86)
                at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:274)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)


    ----------

    Trying to connect Kafka and cassandra using spark
    Able to store a JavaRDD but not able to store a JavaPairRDD into
cassandra
    I have given comment in the line where the error is
    Thank you

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org