You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tal Sliwowicz <ta...@taboola.com> on 2013/11/19 17:12:51 UTC

Using Cassandra as an input stream from Java

Hi,


I'm trying to use data stored in cassandra (v1.2) and need some help. I've
translated the the scala example - CassandraTest.scala - to Java, but I
keep getting the following exception:

Exception in thread "main" java.io.IOException: Could not get input splits
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
at
org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
at com.taboola.test.spark_cassandra.App.calc(App.java:119)
at com.taboola.test.spark_cassandra.App.main(App.java:65)
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
... 9 more
Caused by: java.lang.RuntimeException:
org.apache.thrift.transport.TTransportException
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
at
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at
org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
at
org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
... 8 more


This is the relevant code portion:

            Job job = new Job();
job.setInputFormatClass(ColumnFamilyInputFormat.class);
 String host = "<server>";
String port = "9160";

ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
"UserEvent", true);
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner
");
    ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");

    SlicePredicate predicate = new SlicePredicate();
    SliceRange sliceRange = new SliceRange();
    sliceRange.setStart(new byte[0]);
    sliceRange.setFinish(new byte[0]);
    predicate.setSlice_range(sliceRange);
    ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
 final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
IColumn>();
 JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
ByteBuffer.class, b.getClass());


I would appreciate any input you may have.

Thanks!

Tal

Re: Using Cassandra as an input stream from Java

Posted by Pulasthi Supun Wickramasinghe <pu...@gmail.com>.
Hi Lucas,

That did the trick just had to change JavaPairRDD<ByteBuffer,
SortedMap<ByteBuffer, IColumn>> to JavaPairRDD<ByteBuffer,* ? extends
* SortedMap<ByteBuffer,
IColumn>> thanks for the help.

Regards,
Pulasthi



On Thu, Dec 5, 2013 at 10:40 AM, Lucas Fernandes Brunialti <
lbrunialti@igcorp.com.br> wrote:

> Hi all,
>
> This should work:
>
> JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd =
> context.newAPIHadoopRDD(job.getConfiguration(),
>
> ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
> ByteBuffer.class, SortedMap.class);
>
> I have translated the word count written in scala to java, i just can't
> send it right now...
>
> Best Regards.
>
> Lucas.
> On Dec 5, 2013 1:51 AM, "Pulasthi Supun Wickramasinghe" <
> pulasthi911@gmail.com> wrote:
>
>> Hi Tal,
>>
>> Just checking if you have added your code to github :). if you have could
>> you point me to it.
>>
>> Best Regards,
>> Pulasthi
>>
>>
>> On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <pw...@gmail.com>wrote:
>>
>>> Tal - that would be great to have open sourced if you can do it!
>>>
>>> On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
>>> <pu...@gmail.com> wrote:
>>> > Hi Tal,
>>> >
>>> > Thanks for the info will try it out and see how it goes.
>>> >
>>> >
>>> > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <ta...@taboola.com>
>>> wrote:
>>> >>
>>> >> Hi Pulasthi,
>>> >>
>>> >> I couldn't make it work, so what I ended up doing was implement 3 Java
>>> >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
>>> another
>>> >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
>>> extends
>>> >> org.apache.hadoop.mapreduce.RecordReader and used them to load data
>>> from
>>> >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
>>> great!
>>> >> I'm cleaning up the code a bit and will upload to github as an open
>>> source
>>> >> (after the summit).
>>> >>
>>> > That's great looking forward check it out after you publish on github
>>> :).
>>> >
>>> >
>>> > Thanks,
>>> > Pulasthi
>>> >>
>>> >> I hope this helps for now,
>>> >>
>>> >> Tal
>>> >>
>>> >>
>>> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
>>> >> <pu...@gmail.com> wrote:
>>> >>>
>>> >>> Hi Tal,
>>> >>>
>>> >>> I also tried doing this by converting the scala sample into Java but
>>> i am
>>> >>> getting an compile time error below is the code
>>> >>>
>>> >>>  JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>>> >>>
>>> >>>         //Build the job configuration with ConfigHelper provided by
>>> >>> Cassandra
>>> >>>         Job job = null;
>>> >>>         try {
>>> >>>             job = new Job();
>>> >>>         } catch (IOException e) {
>>> >>>             e.printStackTrace();  //To change body of catch
>>> statement use
>>> >>> File | Settings | File Templates.
>>> >>>         }
>>> >>>         job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>> >>>
>>> >>>         String host = args[1];
>>> >>>         String port = args[2];
>>> >>>
>>> >>>         ConfigHelper.setInputInitialAddress(job.getConfiguration(),
>>> >>> host);
>>> >>>         ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>> >>>         ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>>> >>> host);
>>> >>>         ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>> >>>         ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>>> >>> "casDemo", "Words");
>>> >>>         ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>>> >>> "casDemo", "WordCount");
>>> >>>
>>> >>>         SlicePredicate predicate = new SlicePredicate();
>>> >>>         SliceRange sliceRange = new SliceRange();
>>> >>>         sliceRange.setStart(new byte[0]);
>>> >>>         sliceRange.setFinish(new byte[0]);
>>> >>>         predicate.setSlice_range(sliceRange);
>>> >>>         ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> >>> predicate);
>>> >>>
>>> >>>         ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>> >>> "Murmur3Partitioner");
>>> >>>         ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> >>> "Murmur3Partitioner");
>>> >>>
>>> >>>         // Make a new Hadoop RDD
>>> >>>         final SortedMap<ByteBuffer, IColumn>
>>> byteBufferIColumnSortedMap =
>>> >>> new TreeMap<ByteBuffer, IColumn>();
>>> >>>         JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
>>> >>> sc.newAPIHadoopRDD(job.getConfiguration(),
>>> ColumnFamilyInputFormat.class,
>>> >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>>> >>>
>>> >>>
>>> >>> i also tried the code segment that you have provided but i keep
>>> getting
>>> >>> the following error.
>>> >>>
>>> >>> java:
>>> >>>
>>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>>> >>>
>>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>>> >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
>>> >>>
>>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>>> >>> of ? extends java.util.SortedMap>)
>>> >>>
>>> >>> Did you encounter this if so any help on this would be appreciated.
>>> >>>
>>> >>> Best Regards,
>>> >>> Pulasthi
>>> >>>
>>> >>>
>>> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com>
>>> wrote:
>>> >>>>
>>> >>>> Hi,
>>> >>>>
>>> >>>>
>>> >>>> I'm trying to use data stored in cassandra (v1.2) and need some
>>> help.
>>> >>>> I've translated the the scala example - CassandraTest.scala - to
>>> Java, but I
>>> >>>> keep getting the following exception:
>>> >>>>
>>> >>>> Exception in thread "main" java.io.IOException: Could not get input
>>> >>>> splits
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>>> >>>> at
>>> >>>>
>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>>> >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>>> >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>>> >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>>> >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>>> >>>> at
>>> >>>>
>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>>> >>>> at
>>> >>>>
>>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>>> >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>>> >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
>>> >>>> Caused by: java.util.concurrent.ExecutionException:
>>> >>>> java.lang.RuntimeException:
>>> org.apache.thrift.transport.TTransportException
>>> >>>> at
>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>>> >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>>> >>>> ... 9 more
>>> >>>> Caused by: java.lang.RuntimeException:
>>> >>>> org.apache.thrift.transport.TTransportException
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>>> >>>> at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>> >>>> at java.lang.Thread.run(Thread.java:695)
>>> >>>> Caused by: org.apache.thrift.transport.TTransportException
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>>> >>>> at
>>> org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>>> >>>> at
>>> org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>>> >>>> at
>>> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>>> >>>> ... 8 more
>>> >>>>
>>> >>>>
>>> >>>> This is the relevant code portion:
>>> >>>>
>>> >>>>             Job job = new Job();
>>> >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>> >>>> String host = "<server>";
>>> >>>> String port = "9160";
>>> >>>>
>>> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>>> >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>> >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>>> >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>> >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>>> >>>> "UserEvent", true);
>>> >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>> >>>> "Murmur3Partitioner");
>>> >>>>    ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> >>>> "Murmur3Partitioner");
>>> >>>>
>>> >>>>    SlicePredicate predicate = new SlicePredicate();
>>> >>>>     SliceRange sliceRange = new SliceRange();
>>> >>>>    sliceRange.setStart(new byte[0]);
>>> >>>>    sliceRange.setFinish(new byte[0]);
>>> >>>>    predicate.setSlice_range(sliceRange);
>>> >>>>    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> >>>> predicate);
>>> >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>>> >>>> IColumn>();
>>> >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>>> >>>> ctx.newAPIHadoopRDD(job.getConfiguration(),
>>> ColumnFamilyInputFormat.class ,
>>> >>>> ByteBuffer.class, b.getClass());
>>> >>>>
>>> >>>>
>>> >>>> I would appreciate any input you may have.
>>> >>>>
>>> >>>> Thanks!
>>> >>>>
>>> >>>> Tal
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> Pulasthi Supun
>>> >>> Undergraduate
>>> >>> Dpt of Computer Science & Engineering
>>> >>> University of Moratuwa
>>> >>> Blog : http://pulasthisupun.blogspot.com/
>>> >>> Git hub profile: https://github.com/pulasthi
>>> >>
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > Pulasthi Supun
>>> > Undergraduate
>>> > Dpt of Computer Science & Engineering
>>> > University of Moratuwa
>>> > Blog : http://pulasthisupun.blogspot.com/
>>> > Git hub profile: https://github.com/pulasthi
>>>
>>
>>
>>
>> --
>> Pulasthi Supun
>> Undergraduate
>> Dpt of Computer Science & Engineering
>> University of Moratuwa
>> Blog : http://pulasthisupun.blogspot.com/
>> Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
>> <https://github.com/pulasthi>
>>
>


-- 
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>

Re: Using Cassandra as an input stream from Java

Posted by Lucas Fernandes Brunialti <lb...@igcorp.com.br>.
Hi all,

This should work:

JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd =
context.newAPIHadoopRDD(job.getConfiguration(),
ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
ByteBuffer.class, SortedMap.class);

I have translated the word count written in scala to java, i just can't
send it right now...

Best Regards.

Lucas.
On Dec 5, 2013 1:51 AM, "Pulasthi Supun Wickramasinghe" <
pulasthi911@gmail.com> wrote:

> Hi Tal,
>
> Just checking if you have added your code to github :). if you have could
> you point me to it.
>
> Best Regards,
> Pulasthi
>
>
> On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <pw...@gmail.com>wrote:
>
>> Tal - that would be great to have open sourced if you can do it!
>>
>> On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
>> <pu...@gmail.com> wrote:
>> > Hi Tal,
>> >
>> > Thanks for the info will try it out and see how it goes.
>> >
>> >
>> > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <ta...@taboola.com>
>> wrote:
>> >>
>> >> Hi Pulasthi,
>> >>
>> >> I couldn't make it work, so what I ended up doing was implement 3 Java
>> >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
>> another
>> >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
>> extends
>> >> org.apache.hadoop.mapreduce.RecordReader and used them to load data
>> from
>> >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
>> great!
>> >> I'm cleaning up the code a bit and will upload to github as an open
>> source
>> >> (after the summit).
>> >>
>> > That's great looking forward check it out after you publish on github
>> :).
>> >
>> >
>> > Thanks,
>> > Pulasthi
>> >>
>> >> I hope this helps for now,
>> >>
>> >> Tal
>> >>
>> >>
>> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
>> >> <pu...@gmail.com> wrote:
>> >>>
>> >>> Hi Tal,
>> >>>
>> >>> I also tried doing this by converting the scala sample into Java but
>> i am
>> >>> getting an compile time error below is the code
>> >>>
>> >>>  JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>> >>>
>> >>>         //Build the job configuration with ConfigHelper provided by
>> >>> Cassandra
>> >>>         Job job = null;
>> >>>         try {
>> >>>             job = new Job();
>> >>>         } catch (IOException e) {
>> >>>             e.printStackTrace();  //To change body of catch statement
>> use
>> >>> File | Settings | File Templates.
>> >>>         }
>> >>>         job.setInputFormatClass(ColumnFamilyInputFormat.class);
>> >>>
>> >>>         String host = args[1];
>> >>>         String port = args[2];
>> >>>
>> >>>         ConfigHelper.setInputInitialAddress(job.getConfiguration(),
>> >>> host);
>> >>>         ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>> >>>         ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>> >>> host);
>> >>>         ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>> >>>         ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>> >>> "casDemo", "Words");
>> >>>         ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>> >>> "casDemo", "WordCount");
>> >>>
>> >>>         SlicePredicate predicate = new SlicePredicate();
>> >>>         SliceRange sliceRange = new SliceRange();
>> >>>         sliceRange.setStart(new byte[0]);
>> >>>         sliceRange.setFinish(new byte[0]);
>> >>>         predicate.setSlice_range(sliceRange);
>> >>>         ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>> >>> predicate);
>> >>>
>> >>>         ConfigHelper.setInputPartitioner(job.getConfiguration(),
>> >>> "Murmur3Partitioner");
>> >>>         ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> >>> "Murmur3Partitioner");
>> >>>
>> >>>         // Make a new Hadoop RDD
>> >>>         final SortedMap<ByteBuffer, IColumn>
>> byteBufferIColumnSortedMap =
>> >>> new TreeMap<ByteBuffer, IColumn>();
>> >>>         JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
>> >>> sc.newAPIHadoopRDD(job.getConfiguration(),
>> ColumnFamilyInputFormat.class,
>> >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>> >>>
>> >>>
>> >>> i also tried the code segment that you have provided but i keep
>> getting
>> >>> the following error.
>> >>>
>> >>> java:
>> >>>
>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>> >>>
>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>> >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
>> >>>
>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>> >>> of ? extends java.util.SortedMap>)
>> >>>
>> >>> Did you encounter this if so any help on this would be appreciated.
>> >>>
>> >>> Best Regards,
>> >>> Pulasthi
>> >>>
>> >>>
>> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com>
>> wrote:
>> >>>>
>> >>>> Hi,
>> >>>>
>> >>>>
>> >>>> I'm trying to use data stored in cassandra (v1.2) and need some help.
>> >>>> I've translated the the scala example - CassandraTest.scala - to
>> Java, but I
>> >>>> keep getting the following exception:
>> >>>>
>> >>>> Exception in thread "main" java.io.IOException: Could not get input
>> >>>> splits
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>> >>>> at
>> >>>>
>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>> >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>> >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>> >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>> >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>> >>>> at
>> >>>>
>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>> >>>> at
>> >>>>
>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>> >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>> >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
>> >>>> Caused by: java.util.concurrent.ExecutionException:
>> >>>> java.lang.RuntimeException:
>> org.apache.thrift.transport.TTransportException
>> >>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>> >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>> >>>> ... 9 more
>> >>>> Caused by: java.lang.RuntimeException:
>> >>>> org.apache.thrift.transport.TTransportException
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>> >>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> >>>> at
>> >>>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> >>>> at
>> >>>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> >>>> at java.lang.Thread.run(Thread.java:695)
>> >>>> Caused by: org.apache.thrift.transport.TTransportException
>> >>>> at
>> >>>>
>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>> >>>> at
>> >>>>
>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>> >>>> at
>> >>>>
>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>> >>>> at
>> >>>>
>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>> >>>> at
>> >>>>
>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>> >>>> at
>> >>>>
>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>> >>>> at
>> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>> >>>> at
>> >>>>
>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>> >>>> at
>> >>>>
>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>> >>>> ... 8 more
>> >>>>
>> >>>>
>> >>>> This is the relevant code portion:
>> >>>>
>> >>>>             Job job = new Job();
>> >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>> >>>> String host = "<server>";
>> >>>> String port = "9160";
>> >>>>
>> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>> >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>> >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>> >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>> >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>> >>>> "UserEvent", true);
>> >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>> >>>> "Murmur3Partitioner");
>> >>>>    ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> >>>> "Murmur3Partitioner");
>> >>>>
>> >>>>    SlicePredicate predicate = new SlicePredicate();
>> >>>>     SliceRange sliceRange = new SliceRange();
>> >>>>    sliceRange.setStart(new byte[0]);
>> >>>>    sliceRange.setFinish(new byte[0]);
>> >>>>    predicate.setSlice_range(sliceRange);
>> >>>>    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>> >>>> predicate);
>> >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>> >>>> IColumn>();
>> >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>> >>>> ctx.newAPIHadoopRDD(job.getConfiguration(),
>> ColumnFamilyInputFormat.class ,
>> >>>> ByteBuffer.class, b.getClass());
>> >>>>
>> >>>>
>> >>>> I would appreciate any input you may have.
>> >>>>
>> >>>> Thanks!
>> >>>>
>> >>>> Tal
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Pulasthi Supun
>> >>> Undergraduate
>> >>> Dpt of Computer Science & Engineering
>> >>> University of Moratuwa
>> >>> Blog : http://pulasthisupun.blogspot.com/
>> >>> Git hub profile: https://github.com/pulasthi
>> >>
>> >>
>> >
>> >
>> >
>> > --
>> > Pulasthi Supun
>> > Undergraduate
>> > Dpt of Computer Science & Engineering
>> > University of Moratuwa
>> > Blog : http://pulasthisupun.blogspot.com/
>> > Git hub profile: https://github.com/pulasthi
>>
>
>
>
> --
> Pulasthi Supun
> Undergraduate
> Dpt of Computer Science & Engineering
> University of Moratuwa
> Blog : http://pulasthisupun.blogspot.com/
> Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
> <https://github.com/pulasthi>
>

Re: Using Cassandra as an input stream from Java

Posted by Pulasthi Supun Wickramasinghe <pu...@gmail.com>.
Hi Tal,

Just checking if you have added your code to github :). if you have could
you point me to it.

Best Regards,
Pulasthi


On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <pw...@gmail.com>wrote:

> Tal - that would be great to have open sourced if you can do it!
>
> On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
> <pu...@gmail.com> wrote:
> > Hi Tal,
> >
> > Thanks for the info will try it out and see how it goes.
> >
> >
> > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <ta...@taboola.com>
> wrote:
> >>
> >> Hi Pulasthi,
> >>
> >> I couldn't make it work, so what I ended up doing was implement 3 Java
> >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
> another
> >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
> extends
> >> org.apache.hadoop.mapreduce.RecordReader and used them to load data from
> >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
> great!
> >> I'm cleaning up the code a bit and will upload to github as an open
> source
> >> (after the summit).
> >>
> > That's great looking forward check it out after you publish on github :).
> >
> >
> > Thanks,
> > Pulasthi
> >>
> >> I hope this helps for now,
> >>
> >> Tal
> >>
> >>
> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
> >> <pu...@gmail.com> wrote:
> >>>
> >>> Hi Tal,
> >>>
> >>> I also tried doing this by converting the scala sample into Java but i
> am
> >>> getting an compile time error below is the code
> >>>
> >>>  JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
> >>>
> >>>         //Build the job configuration with ConfigHelper provided by
> >>> Cassandra
> >>>         Job job = null;
> >>>         try {
> >>>             job = new Job();
> >>>         } catch (IOException e) {
> >>>             e.printStackTrace();  //To change body of catch statement
> use
> >>> File | Settings | File Templates.
> >>>         }
> >>>         job.setInputFormatClass(ColumnFamilyInputFormat.class);
> >>>
> >>>         String host = args[1];
> >>>         String port = args[2];
> >>>
> >>>         ConfigHelper.setInputInitialAddress(job.getConfiguration(),
> >>> host);
> >>>         ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
> >>>         ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
> >>> host);
> >>>         ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
> >>>         ConfigHelper.setInputColumnFamily(job.getConfiguration(),
> >>> "casDemo", "Words");
> >>>         ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
> >>> "casDemo", "WordCount");
> >>>
> >>>         SlicePredicate predicate = new SlicePredicate();
> >>>         SliceRange sliceRange = new SliceRange();
> >>>         sliceRange.setStart(new byte[0]);
> >>>         sliceRange.setFinish(new byte[0]);
> >>>         predicate.setSlice_range(sliceRange);
> >>>         ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
> >>> predicate);
> >>>
> >>>         ConfigHelper.setInputPartitioner(job.getConfiguration(),
> >>> "Murmur3Partitioner");
> >>>         ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> >>> "Murmur3Partitioner");
> >>>
> >>>         // Make a new Hadoop RDD
> >>>         final SortedMap<ByteBuffer, IColumn>
> byteBufferIColumnSortedMap =
> >>> new TreeMap<ByteBuffer, IColumn>();
> >>>         JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
> >>> sc.newAPIHadoopRDD(job.getConfiguration(),
> ColumnFamilyInputFormat.class,
> >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
> >>>
> >>>
> >>> i also tried the code segment that you have provided but i keep getting
> >>> the following error.
> >>>
> >>> java:
> >>>
> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
> >>>
> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
> >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
> >>>
> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
> >>> of ? extends java.util.SortedMap>)
> >>>
> >>> Did you encounter this if so any help on this would be appreciated.
> >>>
> >>> Best Regards,
> >>> Pulasthi
> >>>
> >>>
> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com>
> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>>
> >>>> I'm trying to use data stored in cassandra (v1.2) and need some help.
> >>>> I've translated the the scala example - CassandraTest.scala - to
> Java, but I
> >>>> keep getting the following exception:
> >>>>
> >>>> Exception in thread "main" java.io.IOException: Could not get input
> >>>> splits
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
> >>>> at
> >>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
> >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
> >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
> >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
> >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
> >>>> at
> >>>>
> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
> >>>> at
> >>>>
> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
> >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
> >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
> >>>> Caused by: java.util.concurrent.ExecutionException:
> >>>> java.lang.RuntimeException:
> org.apache.thrift.transport.TTransportException
> >>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
> >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
> >>>> ... 9 more
> >>>> Caused by: java.lang.RuntimeException:
> >>>> org.apache.thrift.transport.TTransportException
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
> >>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>> at
> >>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> >>>> at
> >>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> >>>> at java.lang.Thread.run(Thread.java:695)
> >>>> Caused by: org.apache.thrift.transport.TTransportException
> >>>> at
> >>>>
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> >>>> at
> >>>>
> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
> >>>> at
> >>>>
> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> >>>> at
> >>>>
> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
> >>>> at
> >>>>
> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
> >>>> at
> >>>>
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
> >>>> at
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
> >>>> at
> >>>>
> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
> >>>> at
> >>>>
> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
> >>>> ... 8 more
> >>>>
> >>>>
> >>>> This is the relevant code portion:
> >>>>
> >>>>             Job job = new Job();
> >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
> >>>> String host = "<server>";
> >>>> String port = "9160";
> >>>>
> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
> >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
> >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
> >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
> >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
> >>>> "UserEvent", true);
> >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
> >>>> "Murmur3Partitioner");
> >>>>    ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> >>>> "Murmur3Partitioner");
> >>>>
> >>>>    SlicePredicate predicate = new SlicePredicate();
> >>>>     SliceRange sliceRange = new SliceRange();
> >>>>    sliceRange.setStart(new byte[0]);
> >>>>    sliceRange.setFinish(new byte[0]);
> >>>>    predicate.setSlice_range(sliceRange);
> >>>>    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
> >>>> predicate);
> >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
> >>>> IColumn>();
> >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
> >>>> ctx.newAPIHadoopRDD(job.getConfiguration(),
> ColumnFamilyInputFormat.class ,
> >>>> ByteBuffer.class, b.getClass());
> >>>>
> >>>>
> >>>> I would appreciate any input you may have.
> >>>>
> >>>> Thanks!
> >>>>
> >>>> Tal
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> Pulasthi Supun
> >>> Undergraduate
> >>> Dpt of Computer Science & Engineering
> >>> University of Moratuwa
> >>> Blog : http://pulasthisupun.blogspot.com/
> >>> Git hub profile: https://github.com/pulasthi
> >>
> >>
> >
> >
> >
> > --
> > Pulasthi Supun
> > Undergraduate
> > Dpt of Computer Science & Engineering
> > University of Moratuwa
> > Blog : http://pulasthisupun.blogspot.com/
> > Git hub profile: https://github.com/pulasthi
>



-- 
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>

Re: Using Cassandra as an input stream from Java

Posted by Patrick Wendell <pw...@gmail.com>.
Tal - that would be great to have open sourced if you can do it!

On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
<pu...@gmail.com> wrote:
> Hi Tal,
>
> Thanks for the info will try it out and see how it goes.
>
>
> On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <ta...@taboola.com> wrote:
>>
>> Hi Pulasthi,
>>
>> I couldn't make it work, so what I ended up doing was implement 3 Java
>> classes - one that extends org.apache.hadoop.mapreduce.InputFormat , another
>> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that extends
>> org.apache.hadoop.mapreduce.RecordReader and used them to load data from
>> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works great!
>> I'm cleaning up the code a bit and will upload to github as an open source
>> (after the summit).
>>
> That's great looking forward check it out after you publish on github :).
>
>
> Thanks,
> Pulasthi
>>
>> I hope this helps for now,
>>
>> Tal
>>
>>
>> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
>> <pu...@gmail.com> wrote:
>>>
>>> Hi Tal,
>>>
>>> I also tried doing this by converting the scala sample into Java but i am
>>> getting an compile time error below is the code
>>>
>>>  JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>>>
>>>         //Build the job configuration with ConfigHelper provided by
>>> Cassandra
>>>         Job job = null;
>>>         try {
>>>             job = new Job();
>>>         } catch (IOException e) {
>>>             e.printStackTrace();  //To change body of catch statement use
>>> File | Settings | File Templates.
>>>         }
>>>         job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>>
>>>         String host = args[1];
>>>         String port = args[2];
>>>
>>>         ConfigHelper.setInputInitialAddress(job.getConfiguration(),
>>> host);
>>>         ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>>         ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>>> host);
>>>         ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>>         ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>>> "casDemo", "Words");
>>>         ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>>> "casDemo", "WordCount");
>>>
>>>         SlicePredicate predicate = new SlicePredicate();
>>>         SliceRange sliceRange = new SliceRange();
>>>         sliceRange.setStart(new byte[0]);
>>>         sliceRange.setFinish(new byte[0]);
>>>         predicate.setSlice_range(sliceRange);
>>>         ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> predicate);
>>>
>>>         ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>> "Murmur3Partitioner");
>>>         ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> "Murmur3Partitioner");
>>>
>>>         // Make a new Hadoop RDD
>>>         final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap =
>>> new TreeMap<ByteBuffer, IColumn>();
>>>         JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
>>> sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class,
>>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>>>
>>>
>>> i also tried the code segment that you have provided but i keep getting
>>> the following error.
>>>
>>> java:
>>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
>>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>>> of ? extends java.util.SortedMap>)
>>>
>>> Did you encounter this if so any help on this would be appreciated.
>>>
>>> Best Regards,
>>> Pulasthi
>>>
>>>
>>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>>
>>>> I'm trying to use data stored in cassandra (v1.2) and need some help.
>>>> I've translated the the scala example - CassandraTest.scala - to Java, but I
>>>> keep getting the following exception:
>>>>
>>>> Exception in thread "main" java.io.IOException: Could not get input
>>>> splits
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>>>> at
>>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>>>> at
>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>>>> at
>>>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
>>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>>>> ... 9 more
>>>> Caused by: java.lang.RuntimeException:
>>>> org.apache.thrift.transport.TTransportException
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>>> at java.lang.Thread.run(Thread.java:695)
>>>> Caused by: org.apache.thrift.transport.TTransportException
>>>> at
>>>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>>> at
>>>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>>>> at
>>>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>>> at
>>>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>>>> at
>>>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>>>> at
>>>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>>>> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>>>> at
>>>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>>>> at
>>>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>>>> ... 8 more
>>>>
>>>>
>>>> This is the relevant code portion:
>>>>
>>>>             Job job = new Job();
>>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>>> String host = "<server>";
>>>> String port = "9160";
>>>>
>>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>>>> "UserEvent", true);
>>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>>> "Murmur3Partitioner");
>>>>    ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>>> "Murmur3Partitioner");
>>>>
>>>>    SlicePredicate predicate = new SlicePredicate();
>>>>     SliceRange sliceRange = new SliceRange();
>>>>    sliceRange.setStart(new byte[0]);
>>>>    sliceRange.setFinish(new byte[0]);
>>>>    predicate.setSlice_range(sliceRange);
>>>>    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>>> predicate);
>>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>>>> IColumn>();
>>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>>>> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
>>>> ByteBuffer.class, b.getClass());
>>>>
>>>>
>>>> I would appreciate any input you may have.
>>>>
>>>> Thanks!
>>>>
>>>> Tal
>>>
>>>
>>>
>>>
>>> --
>>> Pulasthi Supun
>>> Undergraduate
>>> Dpt of Computer Science & Engineering
>>> University of Moratuwa
>>> Blog : http://pulasthisupun.blogspot.com/
>>> Git hub profile: https://github.com/pulasthi
>>
>>
>
>
>
> --
> Pulasthi Supun
> Undergraduate
> Dpt of Computer Science & Engineering
> University of Moratuwa
> Blog : http://pulasthisupun.blogspot.com/
> Git hub profile: https://github.com/pulasthi

Re: Using Cassandra as an input stream from Java

Posted by Pulasthi Supun Wickramasinghe <pu...@gmail.com>.
Hi Tal,

Thanks for the info will try it out and see how it goes.


On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <ta...@taboola.com> wrote:

> Hi Pulasthi,
>
> I couldn't make it work, so what I ended up doing was implement 3 Java
> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
> another that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
> extends org.apache.hadoop.mapreduce.RecordReader and used them to load data
> from Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
> great! I'm cleaning up the code a bit and will upload to github as an open
> source (after the summit).
>
> That's great looking forward check it out after you publish on github :).


Thanks,
Pulasthi

> I hope this helps for now,
>
> Tal
>
>
> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe <
> pulasthi911@gmail.com> wrote:
>
>> Hi Tal,
>>
>> I also tried doing this by converting the scala sample into Java but i am
>> getting an compile time error below is the code
>>
>>  JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>>
>>         //Build the job configuration with ConfigHelper provided by
>> Cassandra
>>         Job job = null;
>>         try {
>>             job = new Job();
>>         } catch (IOException e) {
>>             e.printStackTrace();  //To change body of catch statement use
>> File | Settings | File Templates.
>>         }
>>         job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>
>>         String host = args[1];
>>         String port = args[2];
>>
>>         ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>>         ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>         ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>> host);
>>         ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>         ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>> "casDemo", "Words");
>>         ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>> "casDemo", "WordCount");
>>
>>         SlicePredicate predicate = new SlicePredicate();
>>         SliceRange sliceRange = new SliceRange();
>>         sliceRange.setStart(new byte[0]);
>>         sliceRange.setFinish(new byte[0]);
>>         predicate.setSlice_range(sliceRange);
>>         ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>> predicate);
>>
>>         ConfigHelper.setInputPartitioner(job.getConfiguration(),
>> "Murmur3Partitioner");
>>         ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> "Murmur3Partitioner");
>>
>>         // Make a new Hadoop RDD
>>         final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap =
>> new TreeMap<ByteBuffer, IColumn>();
>>         JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
>> sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class,
>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>>
>>
>> i also tried the code segment that you have provided but i keep getting
>> the following error.
>>
>> java:
>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>> of ? extends java.util.SortedMap>)
>>
>> Did you encounter this if so any help on this would be appreciated.
>>
>> Best Regards,
>> Pulasthi
>>
>>
>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> I'm trying to use data stored in cassandra (v1.2) and need some help.
>>> I've translated the the scala example - CassandraTest.scala - to Java, but
>>> I keep getting the following exception:
>>>
>>> Exception in thread "main" java.io.IOException: Could not get input
>>> splits
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>>>  at
>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>>>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>>>  at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>>>  at
>>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>>>  at com.taboola.test.spark_cassandra.App.main(App.java:65)
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
>>>  at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>>>  at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>>> ... 9 more
>>> Caused by: java.lang.RuntimeException:
>>> org.apache.thrift.transport.TTransportException
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>>>  at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>>>  at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>  at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>>  at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>> at java.lang.Thread.run(Thread.java:695)
>>> Caused by: org.apache.thrift.transport.TTransportException
>>> at
>>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>>  at
>>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>>> at
>>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>>>  at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>> at
>>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>>>  at
>>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>>> at
>>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>>>  at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>>> at
>>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>>>  at
>>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>>>  ... 8 more
>>>
>>>
>>> This is the relevant code portion:
>>>
>>>             Job job = new Job();
>>>  job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>>  String host = "<server>";
>>> String port = "9160";
>>>
>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>>>  ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>>>  ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>>> "UserEvent", true);
>>>  ConfigHelper.setInputPartitioner(job.getConfiguration(), "
>>> Murmur3Partitioner");
>>>     ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> "Murmur3Partitioner");
>>>
>>>     SlicePredicate predicate = new SlicePredicate();
>>>     SliceRange sliceRange = new SliceRange();
>>>     sliceRange.setStart(new byte[0]);
>>>     sliceRange.setFinish(new byte[0]);
>>>     predicate.setSlice_range(sliceRange);
>>>     ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> predicate);
>>>   final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>>> IColumn>();
>>>  JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>>> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
>>> ByteBuffer.class, b.getClass());
>>>
>>>
>>> I would appreciate any input you may have.
>>>
>>> Thanks!
>>>
>>> Tal
>>>
>>
>>
>>
>> --
>> Pulasthi Supun
>> Undergraduate
>> Dpt of Computer Science & Engineering
>> University of Moratuwa
>> Blog : http://pulasthisupun.blogspot.com/
>> Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
>> <https://github.com/pulasthi>
>>
>
>


-- 
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>

Re: Using Cassandra as an input stream from Java

Posted by Tal Sliwowicz <ta...@taboola.com>.
Hi Pulasthi,

I couldn't make it work, so what I ended up doing was implement 3 Java
classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
another that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
extends org.apache.hadoop.mapreduce.RecordReader and used them to load data
from Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
great! I'm cleaning up the code a bit and will upload to github as an open
source (after the summit).

I hope this helps for now,

Tal


On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe <
pulasthi911@gmail.com> wrote:

> Hi Tal,
>
> I also tried doing this by converting the scala sample into Java but i am
> getting an compile time error below is the code
>
>  JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>
>         //Build the job configuration with ConfigHelper provided by
> Cassandra
>         Job job = null;
>         try {
>             job = new Job();
>         } catch (IOException e) {
>             e.printStackTrace();  //To change body of catch statement use
> File | Settings | File Templates.
>         }
>         job.setInputFormatClass(ColumnFamilyInputFormat.class);
>
>         String host = args[1];
>         String port = args[2];
>
>         ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>         ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>         ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>         ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>         ConfigHelper.setInputColumnFamily(job.getConfiguration(),
> "casDemo", "Words");
>         ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
> "casDemo", "WordCount");
>
>         SlicePredicate predicate = new SlicePredicate();
>         SliceRange sliceRange = new SliceRange();
>         sliceRange.setStart(new byte[0]);
>         sliceRange.setFinish(new byte[0]);
>         predicate.setSlice_range(sliceRange);
>         ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
> predicate);
>
>         ConfigHelper.setInputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner");
>         ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner");
>
>         // Make a new Hadoop RDD
>         final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap =
> new TreeMap<ByteBuffer, IColumn>();
>         JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
> sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class,
> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>
>
> i also tried the code segment that you have provided but i keep getting
> the following error.
>
> java:
> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
> of ? extends java.util.SortedMap>)
>
> Did you encounter this if so any help on this would be appreciated.
>
> Best Regards,
> Pulasthi
>
>
> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com> wrote:
>
>> Hi,
>>
>>
>> I'm trying to use data stored in cassandra (v1.2) and need some help.
>> I've translated the the scala example - CassandraTest.scala - to Java, but
>> I keep getting the following exception:
>>
>> Exception in thread "main" java.io.IOException: Could not get input splits
>> at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>>  at
>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>>  at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>> at
>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>>  at
>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>>  at com.taboola.test.spark_cassandra.App.main(App.java:65)
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
>>  at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>>  at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>> ... 9 more
>> Caused by: java.lang.RuntimeException:
>> org.apache.thrift.transport.TTransportException
>> at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>>  at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>> at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>>  at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>  at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>  at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> at java.lang.Thread.run(Thread.java:695)
>> Caused by: org.apache.thrift.transport.TTransportException
>> at
>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>  at
>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>> at
>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>>  at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>> at
>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>>  at
>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>> at
>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>>  at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>> at
>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>>  at
>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>> at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>>  ... 8 more
>>
>>
>> This is the relevant code portion:
>>
>>             Job job = new Job();
>>  job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>  String host = "<server>";
>> String port = "9160";
>>
>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>>  ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>>  ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>> "UserEvent", true);
>>  ConfigHelper.setInputPartitioner(job.getConfiguration(), "
>> Murmur3Partitioner");
>>     ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> "Murmur3Partitioner");
>>
>>     SlicePredicate predicate = new SlicePredicate();
>>     SliceRange sliceRange = new SliceRange();
>>     sliceRange.setStart(new byte[0]);
>>     sliceRange.setFinish(new byte[0]);
>>     predicate.setSlice_range(sliceRange);
>>     ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>> predicate);
>>   final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>> IColumn>();
>>  JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
>> ByteBuffer.class, b.getClass());
>>
>>
>> I would appreciate any input you may have.
>>
>> Thanks!
>>
>> Tal
>>
>
>
>
> --
> Pulasthi Supun
> Undergraduate
> Dpt of Computer Science & Engineering
> University of Moratuwa
> Blog : http://pulasthisupun.blogspot.com/
> Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
> <https://github.com/pulasthi>
>

Re: Using Cassandra as an input stream from Java

Posted by Pulasthi Supun Wickramasinghe <pu...@gmail.com>.
Hi Tal,

I also tried doing this by converting the scala sample into Java but i am
getting an compile time error below is the code

 JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");

        //Build the job configuration with ConfigHelper provided by
Cassandra
        Job job = null;
        try {
            job = new Job();
        } catch (IOException e) {
            e.printStackTrace();  //To change body of catch statement use
File | Settings | File Templates.
        }
        job.setInputFormatClass(ColumnFamilyInputFormat.class);

        String host = args[1];
        String port = args[2];

        ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
        ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
        ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
        ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
        ConfigHelper.setInputColumnFamily(job.getConfiguration(),
"casDemo", "Words");
        ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
"casDemo", "WordCount");

        SlicePredicate predicate = new SlicePredicate();
        SliceRange sliceRange = new SliceRange();
        sliceRange.setStart(new byte[0]);
        sliceRange.setFinish(new byte[0]);
        predicate.setSlice_range(sliceRange);
        ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
predicate);

        ConfigHelper.setInputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
        ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");

        // Make a new Hadoop RDD
        final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap =
new TreeMap<ByteBuffer, IColumn>();
        JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class,
ByteBuffer.class, byteBufferIColumnSortedMap.getClass());


i also tried the code segment that you have provided but i keep getting the
following error.

java:
/home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
<K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
in org.apache.spark.api.java.JavaSparkContext cannot be applied to
(org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
of ? extends java.util.SortedMap>)

Did you encounter this if so any help on this would be appreciated.

Best Regards,
Pulasthi


On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com> wrote:

> Hi,
>
>
> I'm trying to use data stored in cassandra (v1.2) and need some help. I've
> translated the the scala example - CassandraTest.scala - to Java, but I
> keep getting the following exception:
>
> Exception in thread "main" java.io.IOException: Could not get input splits
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>  at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
> at
> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>  at
> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>  at com.taboola.test.spark_cassandra.App.main(App.java:65)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
>  at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>  at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
> ... 9 more
> Caused by: java.lang.RuntimeException:
> org.apache.thrift.transport.TTransportException
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>  at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>  at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> Caused by: org.apache.thrift.transport.TTransportException
> at
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>  at
> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
> at
> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>  at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>  at
> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>  at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
> at
> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>  at
> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>  ... 8 more
>
>
> This is the relevant code portion:
>
>             Job job = new Job();
>  job.setInputFormatClass(ColumnFamilyInputFormat.class);
>  String host = "<server>";
> String port = "9160";
>
> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>  ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>  ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
> "UserEvent", true);
>  ConfigHelper.setInputPartitioner(job.getConfiguration(), "
> Murmur3Partitioner");
>     ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner");
>
>     SlicePredicate predicate = new SlicePredicate();
>     SliceRange sliceRange = new SliceRange();
>     sliceRange.setStart(new byte[0]);
>     sliceRange.setFinish(new byte[0]);
>     predicate.setSlice_range(sliceRange);
>     ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
>   final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
> IColumn>();
>  JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
> ByteBuffer.class, b.getClass());
>
>
> I would appreciate any input you may have.
>
> Thanks!
>
> Tal
>



-- 
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>