You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by m....@accenture.com on 2014/12/09 11:16:41 UTC

NoSuchMethodError: writing spark-streaming data to cassandra

Hi,

I am intending to save the streaming data from kafka into Cassandra, using spark-streaming:
But there seems to be problem with line
javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();
I am getting NoSuchMethodError.
The code, the error-log and POM.xml dependencies are listed below:
Please help me find the reason as to why is this happening.


public class SparkStream {
        static int key=0;
        public static void main(String args[]) throws Exception
        {
                  if(args.length != 3)
                  {
                        System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
                        System.exit(1);
                  }

                  Logger.getLogger("org").setLevel(Level.OFF);
                  Logger.getLogger("akka").setLevel(Level.OFF);
                  Map<String,Integer> topicMap = new HashMap<String,Integer>();
                  String[] topic = args[2].split(",");
                  for(String t: topic)
                  {
                        topicMap.put(t, new Integer(3));
                  }

                  /* Connection to Spark */
                  SparkConf conf = new SparkConf();
                  JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
                  JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));


                  /* Receive Kafka streaming inputs */
                  JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );


                  /* Create DStream */
                  JavaDStream<TestTable> data = messages.map(new Function<Tuple2<String,String>, TestTable >()
                  {
                        public TestTable call(Tuple2<String, String> message)
                        {
                                return new TestTable(new Integer(++key), message._2() );
                        }
                  }
                  );

                  /* Write to cassandra */
                javaFunctions(data,TestTable.class).saveToCassandra("testkeyspace","test_table");
        //      data.print(); //creates console output stream.


                  jssc.start();
                  jssc.awaitTermination();

        }
}

class TestTable implements Serializable
{
        Integer key;
        String value;

        public TestTable() {}

        public TestTable(Integer k, String v)
        {
                  key=k;
                  value=v;
        }

        public Integer getKey(){
                  return key;
        }

        public void setKey(Integer k){
                  key=k;
        }

        public String getValue(){
                  return value;
        }

        public void setValue(String v){
                  value=v;
        }

        public String toString(){
                  return MessageFormat.format("TestTable'{'key={0},value={1}'}'", key, value);
        }
}

The output log is:
Exception in thread "main" java.lang.NoSuchMethodError: com.datastax.spark.connector.streaming.DStreamFunctions.<init>(Lorg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;)V
        at com.datastax.spark.connector.DStreamJavaFunctions.<init>(DStreamJavaFunctions.java:17)
        at com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(CassandraJavaUtil.java:89)
        at com.spark.SparkStream.main(SparkStream.java:83)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


And the POM dependencies are:

        <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-streaming-kafka_2.10</artifactId>
                  <version>1.1.0</version>
        </dependency>

        <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-streaming_2.10</artifactId>
                  <version>1.1.0</version>
        </dependency>

<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.1.0</version>
</dependency>
<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.0.4</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.1.1</version>
</dependency>


        <dependency>
                  <groupId>com.msiops.footing</groupId>
                  <artifactId>footing-tuple</artifactId>
                  <version>0.2</version>
        </dependency>

        <dependency>
                  <groupId>com.datastax.cassandra</groupId>
                  <artifactId>cassandra-driver-core</artifactId>
                  <version>1.0.8</version>
        </dependency>


Thanks,
Aiman



________________________________

This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy.
______________________________________________________________________________________

www.accenture.com

Re: NoSuchMethodError: writing spark-streaming data to cassandra

Posted by m....@accenture.com.
Hi,

@Gerard- Thanks, i added one more dependency for conf.set("spark.cassandra.connection.host", "localhost").

But now, i am able to create a connection, but the data is not getting inserted into the cassandra table.
and the logs show its getting connected and the next second getting disconnected.
the full code and the logs and dependencies are below:

public class SparkStream {
static int key=0;
public static void main(String args[]) throws Exception
{

if(args.length != 3)
{
System.out.println("parameters not given properly");
System.exit(1);
}

Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
Map<String,Integer> topicMap = new HashMap<String,Integer>();
String[] topic = args[2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(3));
}

/* Connection to Spark */
SparkConf conf = new SparkConf();
conf.set("spark.cassandra.connection.host", "localhost");
JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));


/* connection to cassandra */
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
System.out.println("+++++++++++cassandra connector created++++++++++++++++++++++++++++");

/* Receive Kafka streaming inputs */
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
System.out.println("+++++++++++++streaming Connection done!+++++++++++++++++++++++++++");


/* Create DStream */
JavaDStream<TestTable> data = messages.map(new Function< Tuple2<String,String>, TestTable >()
{
public TestTable call(Tuple2<String, String> message)
{
return new TestTable(new Integer(++key), message._2() );
}
}
);
System.out.println("++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++");


/* Write to cassandra */
javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();

jssc.start();
jssc.awaitTermination();

}
}

class TestTable implements Serializable
{
    Integer key;
    String value;

    public TestTable() {}

    public TestTable(Integer k, String v)
    {
        key=k;
        value=v;
    }

    public Integer getKey(){
        return key;
    }

    public void setKey(Integer k){
        key=k;
    }

    public String getValue(){
        return value;
    }

    public void setValue(String v){
        value=v;
    }

    public String toString(){
        return MessageFormat.format("TestTable'{'key={0}, value={1}'}'", key, value);

    }
}

The log is:
+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++
14/12/09 12:07:33 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:33 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:34 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

14/12/09 12:07:45 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:45 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:46 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

The POM.xml dependencies are:
   <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.1.0</version>
</dependency>

<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.10</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.10</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.1.1</version>
</dependency>


<dependency>
    <groupId>com.msiops.footing</groupId>
    <artifactId>footing-tuple</artifactId>
    <version>0.2</version>
</dependency>

<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>2.1.3</version>
</dependency>



Thanks and Regards,

Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #:  (+91) - 9836112841.
________________________________
From: Gerard Maas <ge...@gmail.com>
Sent: Tuesday, December 9, 2014 4:39 PM
To: Sarosh, M.
Cc: spark users
Subject: Re: NoSuchMethodError: writing spark-streaming data to cassandra

You're using two conflicting versions of the connector: the Scala version at 1.1.0 and the Java version at 1.0.4.

I don't use Java, but I guess you only need the java dependency for your job - and with the version fixed.

<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.1.0</version>
</dependency>
<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.0.4</version>
</dependency>

On Tue, Dec 9, 2014 at 11:16 AM, <m....@accenture.com>> wrote:

Hi,

I am intending to save the streaming data from kafka into Cassandra, using spark-streaming:
But there seems to be problem with line
javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();
I am getting NoSuchMethodError.
The code, the error-log and POM.xml dependencies are listed below:
Please help me find the reason as to why is this happening.


public class SparkStream {
        static int key=0;
        public static void main(String args[]) throws Exception
        {
                  if(args.length != 3)
                  {
                        System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
                        System.exit(1);
                  }

                  Logger.getLogger("org").setLevel(Level.OFF);
                  Logger.getLogger("akka").setLevel(Level.OFF);
                  Map<String,Integer> topicMap = new HashMap<String,Integer>();
                  String[] topic = args[2].split(",");
                  for(String t: topic)
                  {
                        topicMap.put(t, new Integer(3));
                  }

                  /* Connection to Spark */
                  SparkConf conf = new SparkConf();
                  JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
                  JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));


                  /* Receive Kafka streaming inputs */
                  JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );


                  /* Create DStream */
                  JavaDStream<TestTable> data = messages.map(new Function<Tuple2<String,String>, TestTable >()
                  {
                        public TestTable call(Tuple2<String, String> message)
                        {
                                return new TestTable(new Integer(++key), message._2() );
                        }
                  }
                  );

                  /* Write to cassandra */
                javaFunctions(data,TestTable.class).saveToCassandra("testkeyspace","test_table");
        //      data.print(); //creates console output stream.


                  jssc.start();
                  jssc.awaitTermination();

        }
}

class TestTable implements Serializable
{
        Integer key;
        String value;

        public TestTable() {}

        public TestTable(Integer k, String v)
        {
                  key=k;
                  value=v;
        }

        public Integer getKey(){
                  return key;
        }

        public void setKey(Integer k){
                  key=k;
        }

        public String getValue(){
                  return value;
        }

        public void setValue(String v){
                  value=v;
        }

        public String toString(){
                  return MessageFormat.format("TestTable'{'key={0},value={1}'}'", key, value);
        }
}

The output log is:
Exception in thread "main" java.lang.NoSuchMethodError: com.datastax.spark.connector.streaming.DStreamFunctions.<init>(Lorg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;)V
        at com.datastax.spark.connector.DStreamJavaFunctions.<init>(DStreamJavaFunctions.java:17)
        at com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(CassandraJavaUtil.java:89)
        at com.spark.SparkStream.main(SparkStream.java:83)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


And the POM dependencies are:

        <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-streaming-kafka_2.10</artifactId>
                  <version>1.1.0</version>
        </dependency>

        <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-streaming_2.10</artifactId>
                  <version>1.1.0</version>
        </dependency>

<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.1.0</version>
</dependency>
<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.0.4</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.1.1</version>
</dependency>


        <dependency>
                  <groupId>com.msiops.footing</groupId>
                  <artifactId>footing-tuple</artifactId>
                  <version>0.2</version>
        </dependency>

        <dependency>
                  <groupId>com.datastax.cassandra</groupId>
                  <artifactId>cassandra-driver-core</artifactId>
                  <version>1.0.8</version>
        </dependency>


Thanks,
Aiman



________________________________

This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy.
______________________________________________________________________________________

www.accenture.com<http://www.accenture.com>


Re: NoSuchMethodError: writing spark-streaming data to cassandra

Posted by Gerard Maas <ge...@gmail.com>.
You're using two conflicting versions of the connector: the Scala version
at 1.1.0 and the Java version at 1.0.4.

I don't use Java, but I guess you only need the java dependency for your
job - and with the version fixed.

<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>*1.1.0*</version>
</dependency>
<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>*1.0.4*</version>
</dependency>

On Tue, Dec 9, 2014 at 11:16 AM, <m....@accenture.com> wrote:

>
> Hi,
>
> I am intending to save the streaming data from kafka into Cassandra, using
> spark-streaming:
> But there seems to be problem with line
> javaFunctions(data).writerBuilder("testkeyspace", "test_table",
> mapToRow(TestTable.class)).saveToCassandra();
> I am getting NoSuchMethodError.
> The code, the error-log and POM.xml dependencies are listed below:
> Please help me find the reason as to why is this happening.
>
>
> public class SparkStream {
>         static int key=0;
>         public static void main(String args[]) throws Exception
>         {
>                   if(args.length != 3)
>                   {
>                         System.out.println("SparkStream <zookeeper_ip>
> <group_nm> <topic1,topic2,...>");
>                         System.exit(1);
>                   }
>
>                   Logger.getLogger("org").setLevel(Level.OFF);
>                   Logger.getLogger("akka").setLevel(Level.OFF);
>                   Map<String,Integer> topicMap = new
> HashMap<String,Integer>();
>                   String[] topic = args[2].split(",");
>                   for(String t: topic)
>                   {
>                         topicMap.put(t, new Integer(3));
>                   }
>
>                   /* Connection to Spark */
>                   SparkConf conf = new SparkConf();
>                   JavaSparkContext sc = new JavaSparkContext("local[4]",
> "SparkStream",conf);
>                   JavaStreamingContext jssc = new
> JavaStreamingContext(sc, new Duration(3000));
>
>
>                   /* Receive Kafka streaming inputs */
>                   JavaPairReceiverInputDStream<String, String> messages =
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
>
>
>                   /* Create DStream */
>                   JavaDStream<TestTable> data = messages.map(new
> Function<Tuple2<String,String>, TestTable >()
>                   {
>                         public TestTable call(Tuple2<String, String>
> message)
>                         {
>                                 return new TestTable(new Integer(++key),
> message._2() );
>                         }
>                   }
>                   );
>
>                   /* Write to cassandra */
>
> javaFunctions(data,TestTable.class).saveToCassandra("testkeyspace","test_table");
>         //      data.print(); //creates console output stream.
>
>
>                   jssc.start();
>                   jssc.awaitTermination();
>
>         }
> }
>
> class TestTable implements Serializable
> {
>         Integer key;
>         String value;
>
>         public TestTable() {}
>
>         public TestTable(Integer k, String v)
>         {
>                   key=k;
>                   value=v;
>         }
>
>         public Integer getKey(){
>                   return key;
>         }
>
>         public void setKey(Integer k){
>                   key=k;
>         }
>
>         public String getValue(){
>                   return value;
>         }
>
>         public void setValue(String v){
>                   value=v;
>         }
>
>         public String toString(){
>                   return
> MessageFormat.format("TestTable'{'key={0},value={1}'}'", key, value);
>         }
> }
>
> The output log is:
> Exception in thread "main" java.lang.NoSuchMethodError:
> com.datastax.spark.connector.streaming.DStreamFunctions.<init>(Lorg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;)V
>         at
> com.datastax.spark.connector.DStreamJavaFunctions.<init>(DStreamJavaFunctions.java:17)
>         at
> com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(CassandraJavaUtil.java:89)
>         at com.spark.SparkStream.main(SparkStream.java:83)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> And the POM dependencies are:
>
>         <dependency>
>                   <groupId>org.apache.spark</groupId>
>                   <artifactId>spark-streaming-kafka_2.10</artifactId>
>                   <version>1.1.0</version>
>         </dependency>
>
>         <dependency>
>                   <groupId>org.apache.spark</groupId>
>                   <artifactId>spark-streaming_2.10</artifactId>
>                   <version>1.1.0</version>
>         </dependency>
>
> <dependency>
>         <groupId>com.datastax.spark</groupId>
>         <artifactId>spark-cassandra-connector_2.10</artifactId>
>         <version>1.1.0</version>
> </dependency>
> <dependency>
>         <groupId>com.datastax.spark</groupId>
>         <artifactId>spark-cassandra-connector-java_2.10</artifactId>
>         <version>1.0.4</version>
> </dependency>
> <dependency>
>         <groupId>org.apache.spark</groupId>
>         <artifactId>spark-core_2.10</artifactId>
>         <version>1.1.1</version>
> </dependency>
>
>
>         <dependency>
>                   <groupId>com.msiops.footing</groupId>
>                   <artifactId>footing-tuple</artifactId>
>                   <version>0.2</version>
>         </dependency>
>
>         <dependency>
>                   <groupId>com.datastax.cassandra</groupId>
>                   <artifactId>cassandra-driver-core</artifactId>
>                   <version>1.0.8</version>
>         </dependency>
>
>
> Thanks,
> Aiman
>
>
>
> ------------------------------
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> ______________________________________________________________________________________
>
> www.accenture.com
>