You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tal Sliwowicz <ta...@taboola.com> on 2013/11/19 17:12:51 UTC
Using Cassandra as an input stream from Java
Hi,
I'm trying to use data stored in cassandra (v1.2) and need some help. I've
translated the the scala example - CassandraTest.scala - to Java, but I
keep getting the following exception:
Exception in thread "main" java.io.IOException: Could not get input splits
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
at
org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
at com.taboola.test.spark_cassandra.App.calc(App.java:119)
at com.taboola.test.spark_cassandra.App.main(App.java:65)
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
... 9 more
Caused by: java.lang.RuntimeException:
org.apache.thrift.transport.TTransportException
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
Caused by: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
at
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at
org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
at
org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
at
org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
... 8 more
This is the relevant code portion:
Job job = new Job();
job.setInputFormatClass(ColumnFamilyInputFormat.class);
String host = "<server>";
String port = "9160";
ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
"UserEvent", true);
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner
");
ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[0]);
sliceRange.setFinish(new byte[0]);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
IColumn>();
JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
ByteBuffer.class, b.getClass());
I would appreciate any input you may have.
Thanks!
Tal
Re: Using Cassandra as an input stream from Java
Posted by Pulasthi Supun Wickramasinghe <pu...@gmail.com>.
Hi Lucas,
That did the trick just had to change JavaPairRDD<ByteBuffer,
SortedMap<ByteBuffer, IColumn>> to JavaPairRDD<ByteBuffer,* ? extends
* SortedMap<ByteBuffer,
IColumn>> thanks for the help.
Regards,
Pulasthi
On Thu, Dec 5, 2013 at 10:40 AM, Lucas Fernandes Brunialti <
lbrunialti@igcorp.com.br> wrote:
> Hi all,
>
> This should work:
>
> JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd =
> context.newAPIHadoopRDD(job.getConfiguration(),
>
> ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
> ByteBuffer.class, SortedMap.class);
>
> I have translated the word count written in scala to java, i just can't
> send it right now...
>
> Best Regards.
>
> Lucas.
> On Dec 5, 2013 1:51 AM, "Pulasthi Supun Wickramasinghe" <
> pulasthi911@gmail.com> wrote:
>
>> Hi Tal,
>>
>> Just checking if you have added your code to github :). if you have could
>> you point me to it.
>>
>> Best Regards,
>> Pulasthi
>>
>>
>> On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <pw...@gmail.com>wrote:
>>
>>> Tal - that would be great to have open sourced if you can do it!
>>>
>>> On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
>>> <pu...@gmail.com> wrote:
>>> > Hi Tal,
>>> >
>>> > Thanks for the info will try it out and see how it goes.
>>> >
>>> >
>>> > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <ta...@taboola.com>
>>> wrote:
>>> >>
>>> >> Hi Pulasthi,
>>> >>
>>> >> I couldn't make it work, so what I ended up doing was implement 3 Java
>>> >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
>>> another
>>> >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
>>> extends
>>> >> org.apache.hadoop.mapreduce.RecordReader and used them to load data
>>> from
>>> >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
>>> great!
>>> >> I'm cleaning up the code a bit and will upload to github as an open
>>> source
>>> >> (after the summit).
>>> >>
>>> > That's great looking forward check it out after you publish on github
>>> :).
>>> >
>>> >
>>> > Thanks,
>>> > Pulasthi
>>> >>
>>> >> I hope this helps for now,
>>> >>
>>> >> Tal
>>> >>
>>> >>
>>> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
>>> >> <pu...@gmail.com> wrote:
>>> >>>
>>> >>> Hi Tal,
>>> >>>
>>> >>> I also tried doing this by converting the scala sample into Java but
>>> i am
>>> >>> getting an compile time error below is the code
>>> >>>
>>> >>> JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>>> >>>
>>> >>> //Build the job configuration with ConfigHelper provided by
>>> >>> Cassandra
>>> >>> Job job = null;
>>> >>> try {
>>> >>> job = new Job();
>>> >>> } catch (IOException e) {
>>> >>> e.printStackTrace(); //To change body of catch
>>> statement use
>>> >>> File | Settings | File Templates.
>>> >>> }
>>> >>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>> >>>
>>> >>> String host = args[1];
>>> >>> String port = args[2];
>>> >>>
>>> >>> ConfigHelper.setInputInitialAddress(job.getConfiguration(),
>>> >>> host);
>>> >>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>> >>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>>> >>> host);
>>> >>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>> >>> ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>>> >>> "casDemo", "Words");
>>> >>> ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>>> >>> "casDemo", "WordCount");
>>> >>>
>>> >>> SlicePredicate predicate = new SlicePredicate();
>>> >>> SliceRange sliceRange = new SliceRange();
>>> >>> sliceRange.setStart(new byte[0]);
>>> >>> sliceRange.setFinish(new byte[0]);
>>> >>> predicate.setSlice_range(sliceRange);
>>> >>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> >>> predicate);
>>> >>>
>>> >>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>> >>> "Murmur3Partitioner");
>>> >>> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> >>> "Murmur3Partitioner");
>>> >>>
>>> >>> // Make a new Hadoop RDD
>>> >>> final SortedMap<ByteBuffer, IColumn>
>>> byteBufferIColumnSortedMap =
>>> >>> new TreeMap<ByteBuffer, IColumn>();
>>> >>> JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
>>> >>> sc.newAPIHadoopRDD(job.getConfiguration(),
>>> ColumnFamilyInputFormat.class,
>>> >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>>> >>>
>>> >>>
>>> >>> i also tried the code segment that you have provided but i keep
>>> getting
>>> >>> the following error.
>>> >>>
>>> >>> java:
>>> >>>
>>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>>> >>>
>>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>>> >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
>>> >>>
>>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>>> >>> of ? extends java.util.SortedMap>)
>>> >>>
>>> >>> Did you encounter this if so any help on this would be appreciated.
>>> >>>
>>> >>> Best Regards,
>>> >>> Pulasthi
>>> >>>
>>> >>>
>>> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com>
>>> wrote:
>>> >>>>
>>> >>>> Hi,
>>> >>>>
>>> >>>>
>>> >>>> I'm trying to use data stored in cassandra (v1.2) and need some
>>> help.
>>> >>>> I've translated the the scala example - CassandraTest.scala - to
>>> Java, but I
>>> >>>> keep getting the following exception:
>>> >>>>
>>> >>>> Exception in thread "main" java.io.IOException: Could not get input
>>> >>>> splits
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>>> >>>> at
>>> >>>>
>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>>> >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>>> >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>>> >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>>> >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>>> >>>> at
>>> >>>>
>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>>> >>>> at
>>> >>>>
>>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>>> >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>>> >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
>>> >>>> Caused by: java.util.concurrent.ExecutionException:
>>> >>>> java.lang.RuntimeException:
>>> org.apache.thrift.transport.TTransportException
>>> >>>> at
>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>>> >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>>> >>>> ... 9 more
>>> >>>> Caused by: java.lang.RuntimeException:
>>> >>>> org.apache.thrift.transport.TTransportException
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>>> >>>> at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>> >>>> at java.lang.Thread.run(Thread.java:695)
>>> >>>> Caused by: org.apache.thrift.transport.TTransportException
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>>> >>>> at
>>> org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>>> >>>> at
>>> org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>>> >>>> at
>>> >>>>
>>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>>> >>>> at
>>> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>>> >>>> at
>>> >>>>
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>>> >>>> ... 8 more
>>> >>>>
>>> >>>>
>>> >>>> This is the relevant code portion:
>>> >>>>
>>> >>>> Job job = new Job();
>>> >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>> >>>> String host = "<server>";
>>> >>>> String port = "9160";
>>> >>>>
>>> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>>> >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>> >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>>> >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>> >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>>> >>>> "UserEvent", true);
>>> >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>> >>>> "Murmur3Partitioner");
>>> >>>> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> >>>> "Murmur3Partitioner");
>>> >>>>
>>> >>>> SlicePredicate predicate = new SlicePredicate();
>>> >>>> SliceRange sliceRange = new SliceRange();
>>> >>>> sliceRange.setStart(new byte[0]);
>>> >>>> sliceRange.setFinish(new byte[0]);
>>> >>>> predicate.setSlice_range(sliceRange);
>>> >>>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> >>>> predicate);
>>> >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>>> >>>> IColumn>();
>>> >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>>> >>>> ctx.newAPIHadoopRDD(job.getConfiguration(),
>>> ColumnFamilyInputFormat.class ,
>>> >>>> ByteBuffer.class, b.getClass());
>>> >>>>
>>> >>>>
>>> >>>> I would appreciate any input you may have.
>>> >>>>
>>> >>>> Thanks!
>>> >>>>
>>> >>>> Tal
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> Pulasthi Supun
>>> >>> Undergraduate
>>> >>> Dpt of Computer Science & Engineering
>>> >>> University of Moratuwa
>>> >>> Blog : http://pulasthisupun.blogspot.com/
>>> >>> Git hub profile: https://github.com/pulasthi
>>> >>
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > Pulasthi Supun
>>> > Undergraduate
>>> > Dpt of Computer Science & Engineering
>>> > University of Moratuwa
>>> > Blog : http://pulasthisupun.blogspot.com/
>>> > Git hub profile: https://github.com/pulasthi
>>>
>>
>>
>>
>> --
>> Pulasthi Supun
>> Undergraduate
>> Dpt of Computer Science & Engineering
>> University of Moratuwa
>> Blog : http://pulasthisupun.blogspot.com/
>> Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
>> <https://github.com/pulasthi>
>>
>
--
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>
Re: Using Cassandra as an input stream from Java
Posted by Lucas Fernandes Brunialti <lb...@igcorp.com.br>.
Hi all,
This should work:
JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd =
context.newAPIHadoopRDD(job.getConfiguration(),
ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
ByteBuffer.class, SortedMap.class);
I have translated the word count written in scala to java, i just can't
send it right now...
Best Regards.
Lucas.
On Dec 5, 2013 1:51 AM, "Pulasthi Supun Wickramasinghe" <
pulasthi911@gmail.com> wrote:
> Hi Tal,
>
> Just checking if you have added your code to github :). if you have could
> you point me to it.
>
> Best Regards,
> Pulasthi
>
>
> On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <pw...@gmail.com>wrote:
>
>> Tal - that would be great to have open sourced if you can do it!
>>
>> On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
>> <pu...@gmail.com> wrote:
>> > Hi Tal,
>> >
>> > Thanks for the info will try it out and see how it goes.
>> >
>> >
>> > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <ta...@taboola.com>
>> wrote:
>> >>
>> >> Hi Pulasthi,
>> >>
>> >> I couldn't make it work, so what I ended up doing was implement 3 Java
>> >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
>> another
>> >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
>> extends
>> >> org.apache.hadoop.mapreduce.RecordReader and used them to load data
>> from
>> >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
>> great!
>> >> I'm cleaning up the code a bit and will upload to github as an open
>> source
>> >> (after the summit).
>> >>
>> > That's great looking forward check it out after you publish on github
>> :).
>> >
>> >
>> > Thanks,
>> > Pulasthi
>> >>
>> >> I hope this helps for now,
>> >>
>> >> Tal
>> >>
>> >>
>> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
>> >> <pu...@gmail.com> wrote:
>> >>>
>> >>> Hi Tal,
>> >>>
>> >>> I also tried doing this by converting the scala sample into Java but
>> i am
>> >>> getting an compile time error below is the code
>> >>>
>> >>> JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>> >>>
>> >>> //Build the job configuration with ConfigHelper provided by
>> >>> Cassandra
>> >>> Job job = null;
>> >>> try {
>> >>> job = new Job();
>> >>> } catch (IOException e) {
>> >>> e.printStackTrace(); //To change body of catch statement
>> use
>> >>> File | Settings | File Templates.
>> >>> }
>> >>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>> >>>
>> >>> String host = args[1];
>> >>> String port = args[2];
>> >>>
>> >>> ConfigHelper.setInputInitialAddress(job.getConfiguration(),
>> >>> host);
>> >>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>> >>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>> >>> host);
>> >>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>> >>> ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>> >>> "casDemo", "Words");
>> >>> ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>> >>> "casDemo", "WordCount");
>> >>>
>> >>> SlicePredicate predicate = new SlicePredicate();
>> >>> SliceRange sliceRange = new SliceRange();
>> >>> sliceRange.setStart(new byte[0]);
>> >>> sliceRange.setFinish(new byte[0]);
>> >>> predicate.setSlice_range(sliceRange);
>> >>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>> >>> predicate);
>> >>>
>> >>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>> >>> "Murmur3Partitioner");
>> >>> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> >>> "Murmur3Partitioner");
>> >>>
>> >>> // Make a new Hadoop RDD
>> >>> final SortedMap<ByteBuffer, IColumn>
>> byteBufferIColumnSortedMap =
>> >>> new TreeMap<ByteBuffer, IColumn>();
>> >>> JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
>> >>> sc.newAPIHadoopRDD(job.getConfiguration(),
>> ColumnFamilyInputFormat.class,
>> >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>> >>>
>> >>>
>> >>> i also tried the code segment that you have provided but i keep
>> getting
>> >>> the following error.
>> >>>
>> >>> java:
>> >>>
>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>> >>>
>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>> >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
>> >>>
>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>> >>> of ? extends java.util.SortedMap>)
>> >>>
>> >>> Did you encounter this if so any help on this would be appreciated.
>> >>>
>> >>> Best Regards,
>> >>> Pulasthi
>> >>>
>> >>>
>> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com>
>> wrote:
>> >>>>
>> >>>> Hi,
>> >>>>
>> >>>>
>> >>>> I'm trying to use data stored in cassandra (v1.2) and need some help.
>> >>>> I've translated the the scala example - CassandraTest.scala - to
>> Java, but I
>> >>>> keep getting the following exception:
>> >>>>
>> >>>> Exception in thread "main" java.io.IOException: Could not get input
>> >>>> splits
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>> >>>> at
>> >>>>
>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>> >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>> >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>> >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>> >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>> >>>> at
>> >>>>
>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>> >>>> at
>> >>>>
>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>> >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>> >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
>> >>>> Caused by: java.util.concurrent.ExecutionException:
>> >>>> java.lang.RuntimeException:
>> org.apache.thrift.transport.TTransportException
>> >>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>> >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>> >>>> ... 9 more
>> >>>> Caused by: java.lang.RuntimeException:
>> >>>> org.apache.thrift.transport.TTransportException
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>> >>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> >>>> at
>> >>>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> >>>> at
>> >>>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> >>>> at java.lang.Thread.run(Thread.java:695)
>> >>>> Caused by: org.apache.thrift.transport.TTransportException
>> >>>> at
>> >>>>
>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>> >>>> at
>> >>>>
>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>> >>>> at
>> >>>>
>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>> >>>> at
>> >>>>
>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>> >>>> at
>> >>>>
>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>> >>>> at
>> >>>>
>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>> >>>> at
>> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>> >>>> at
>> >>>>
>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>> >>>> at
>> >>>>
>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>> >>>> at
>> >>>>
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>> >>>> ... 8 more
>> >>>>
>> >>>>
>> >>>> This is the relevant code portion:
>> >>>>
>> >>>> Job job = new Job();
>> >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>> >>>> String host = "<server>";
>> >>>> String port = "9160";
>> >>>>
>> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>> >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>> >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>> >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>> >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>> >>>> "UserEvent", true);
>> >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>> >>>> "Murmur3Partitioner");
>> >>>> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> >>>> "Murmur3Partitioner");
>> >>>>
>> >>>> SlicePredicate predicate = new SlicePredicate();
>> >>>> SliceRange sliceRange = new SliceRange();
>> >>>> sliceRange.setStart(new byte[0]);
>> >>>> sliceRange.setFinish(new byte[0]);
>> >>>> predicate.setSlice_range(sliceRange);
>> >>>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>> >>>> predicate);
>> >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>> >>>> IColumn>();
>> >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>> >>>> ctx.newAPIHadoopRDD(job.getConfiguration(),
>> ColumnFamilyInputFormat.class ,
>> >>>> ByteBuffer.class, b.getClass());
>> >>>>
>> >>>>
>> >>>> I would appreciate any input you may have.
>> >>>>
>> >>>> Thanks!
>> >>>>
>> >>>> Tal
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Pulasthi Supun
>> >>> Undergraduate
>> >>> Dpt of Computer Science & Engineering
>> >>> University of Moratuwa
>> >>> Blog : http://pulasthisupun.blogspot.com/
>> >>> Git hub profile: https://github.com/pulasthi
>> >>
>> >>
>> >
>> >
>> >
>> > --
>> > Pulasthi Supun
>> > Undergraduate
>> > Dpt of Computer Science & Engineering
>> > University of Moratuwa
>> > Blog : http://pulasthisupun.blogspot.com/
>> > Git hub profile: https://github.com/pulasthi
>>
>
>
>
> --
> Pulasthi Supun
> Undergraduate
> Dpt of Computer Science & Engineering
> University of Moratuwa
> Blog : http://pulasthisupun.blogspot.com/
> Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
> <https://github.com/pulasthi>
>
Re: Using Cassandra as an input stream from Java
Posted by Pulasthi Supun Wickramasinghe <pu...@gmail.com>.
Hi Tal,
Just checking if you have added your code to github :). if you have could
you point me to it.
Best Regards,
Pulasthi
On Thu, Nov 28, 2013 at 11:54 PM, Patrick Wendell <pw...@gmail.com>wrote:
> Tal - that would be great to have open sourced if you can do it!
>
> On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
> <pu...@gmail.com> wrote:
> > Hi Tal,
> >
> > Thanks for the info will try it out and see how it goes.
> >
> >
> > On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <ta...@taboola.com>
> wrote:
> >>
> >> Hi Pulasthi,
> >>
> >> I couldn't make it work, so what I ended up doing was implement 3 Java
> >> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
> another
> >> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
> extends
> >> org.apache.hadoop.mapreduce.RecordReader and used them to load data from
> >> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
> great!
> >> I'm cleaning up the code a bit and will upload to github as an open
> source
> >> (after the summit).
> >>
> > That's great looking forward check it out after you publish on github :).
> >
> >
> > Thanks,
> > Pulasthi
> >>
> >> I hope this helps for now,
> >>
> >> Tal
> >>
> >>
> >> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
> >> <pu...@gmail.com> wrote:
> >>>
> >>> Hi Tal,
> >>>
> >>> I also tried doing this by converting the scala sample into Java but i
> am
> >>> getting an compile time error below is the code
> >>>
> >>> JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
> >>>
> >>> //Build the job configuration with ConfigHelper provided by
> >>> Cassandra
> >>> Job job = null;
> >>> try {
> >>> job = new Job();
> >>> } catch (IOException e) {
> >>> e.printStackTrace(); //To change body of catch statement
> use
> >>> File | Settings | File Templates.
> >>> }
> >>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
> >>>
> >>> String host = args[1];
> >>> String port = args[2];
> >>>
> >>> ConfigHelper.setInputInitialAddress(job.getConfiguration(),
> >>> host);
> >>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
> >>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
> >>> host);
> >>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
> >>> ConfigHelper.setInputColumnFamily(job.getConfiguration(),
> >>> "casDemo", "Words");
> >>> ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
> >>> "casDemo", "WordCount");
> >>>
> >>> SlicePredicate predicate = new SlicePredicate();
> >>> SliceRange sliceRange = new SliceRange();
> >>> sliceRange.setStart(new byte[0]);
> >>> sliceRange.setFinish(new byte[0]);
> >>> predicate.setSlice_range(sliceRange);
> >>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
> >>> predicate);
> >>>
> >>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
> >>> "Murmur3Partitioner");
> >>> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> >>> "Murmur3Partitioner");
> >>>
> >>> // Make a new Hadoop RDD
> >>> final SortedMap<ByteBuffer, IColumn>
> byteBufferIColumnSortedMap =
> >>> new TreeMap<ByteBuffer, IColumn>();
> >>> JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
> >>> sc.newAPIHadoopRDD(job.getConfiguration(),
> ColumnFamilyInputFormat.class,
> >>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
> >>>
> >>>
> >>> i also tried the code segment that you have provided but i keep getting
> >>> the following error.
> >>>
> >>> java:
> >>>
> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
> >>>
> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
> >>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
> >>>
> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
> >>> of ? extends java.util.SortedMap>)
> >>>
> >>> Did you encounter this if so any help on this would be appreciated.
> >>>
> >>> Best Regards,
> >>> Pulasthi
> >>>
> >>>
> >>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com>
> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>>
> >>>> I'm trying to use data stored in cassandra (v1.2) and need some help.
> >>>> I've translated the the scala example - CassandraTest.scala - to
> Java, but I
> >>>> keep getting the following exception:
> >>>>
> >>>> Exception in thread "main" java.io.IOException: Could not get input
> >>>> splits
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
> >>>> at
> >>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
> >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
> >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
> >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
> >>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
> >>>> at
> >>>>
> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
> >>>> at
> >>>>
> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
> >>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
> >>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
> >>>> Caused by: java.util.concurrent.ExecutionException:
> >>>> java.lang.RuntimeException:
> org.apache.thrift.transport.TTransportException
> >>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
> >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
> >>>> ... 9 more
> >>>> Caused by: java.lang.RuntimeException:
> >>>> org.apache.thrift.transport.TTransportException
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
> >>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>> at
> >>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> >>>> at
> >>>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> >>>> at java.lang.Thread.run(Thread.java:695)
> >>>> Caused by: org.apache.thrift.transport.TTransportException
> >>>> at
> >>>>
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> >>>> at
> >>>>
> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
> >>>> at
> >>>>
> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
> >>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> >>>> at
> >>>>
> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
> >>>> at
> >>>>
> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
> >>>> at
> >>>>
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
> >>>> at
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
> >>>> at
> >>>>
> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
> >>>> at
> >>>>
> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
> >>>> at
> >>>>
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
> >>>> ... 8 more
> >>>>
> >>>>
> >>>> This is the relevant code portion:
> >>>>
> >>>> Job job = new Job();
> >>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
> >>>> String host = "<server>";
> >>>> String port = "9160";
> >>>>
> >>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
> >>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
> >>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
> >>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
> >>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
> >>>> "UserEvent", true);
> >>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
> >>>> "Murmur3Partitioner");
> >>>> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> >>>> "Murmur3Partitioner");
> >>>>
> >>>> SlicePredicate predicate = new SlicePredicate();
> >>>> SliceRange sliceRange = new SliceRange();
> >>>> sliceRange.setStart(new byte[0]);
> >>>> sliceRange.setFinish(new byte[0]);
> >>>> predicate.setSlice_range(sliceRange);
> >>>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
> >>>> predicate);
> >>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
> >>>> IColumn>();
> >>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
> >>>> ctx.newAPIHadoopRDD(job.getConfiguration(),
> ColumnFamilyInputFormat.class ,
> >>>> ByteBuffer.class, b.getClass());
> >>>>
> >>>>
> >>>> I would appreciate any input you may have.
> >>>>
> >>>> Thanks!
> >>>>
> >>>> Tal
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> Pulasthi Supun
> >>> Undergraduate
> >>> Dpt of Computer Science & Engineering
> >>> University of Moratuwa
> >>> Blog : http://pulasthisupun.blogspot.com/
> >>> Git hub profile: https://github.com/pulasthi
> >>
> >>
> >
> >
> >
> > --
> > Pulasthi Supun
> > Undergraduate
> > Dpt of Computer Science & Engineering
> > University of Moratuwa
> > Blog : http://pulasthisupun.blogspot.com/
> > Git hub profile: https://github.com/pulasthi
>
--
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>
Re: Using Cassandra as an input stream from Java
Posted by Patrick Wendell <pw...@gmail.com>.
Tal - that would be great to have open sourced if you can do it!
On Thu, Nov 28, 2013 at 1:00 AM, Pulasthi Supun Wickramasinghe
<pu...@gmail.com> wrote:
> Hi Tal,
>
> Thanks for the info will try it out and see how it goes.
>
>
> On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <ta...@taboola.com> wrote:
>>
>> Hi Pulasthi,
>>
>> I couldn't make it work, so what I ended up doing was implement 3 Java
>> classes - one that extends org.apache.hadoop.mapreduce.InputFormat , another
>> that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that extends
>> org.apache.hadoop.mapreduce.RecordReader and used them to load data from
>> Cassandra to an RDD (using the newAPIHadoopRDD() method). It works great!
>> I'm cleaning up the code a bit and will upload to github as an open source
>> (after the summit).
>>
> That's great looking forward check it out after you publish on github :).
>
>
> Thanks,
> Pulasthi
>>
>> I hope this helps for now,
>>
>> Tal
>>
>>
>> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe
>> <pu...@gmail.com> wrote:
>>>
>>> Hi Tal,
>>>
>>> I also tried doing this by converting the scala sample into Java but i am
>>> getting an compile time error below is the code
>>>
>>> JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>>>
>>> //Build the job configuration with ConfigHelper provided by
>>> Cassandra
>>> Job job = null;
>>> try {
>>> job = new Job();
>>> } catch (IOException e) {
>>> e.printStackTrace(); //To change body of catch statement use
>>> File | Settings | File Templates.
>>> }
>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>>
>>> String host = args[1];
>>> String port = args[2];
>>>
>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(),
>>> host);
>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>>> host);
>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>>> "casDemo", "Words");
>>> ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>>> "casDemo", "WordCount");
>>>
>>> SlicePredicate predicate = new SlicePredicate();
>>> SliceRange sliceRange = new SliceRange();
>>> sliceRange.setStart(new byte[0]);
>>> sliceRange.setFinish(new byte[0]);
>>> predicate.setSlice_range(sliceRange);
>>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> predicate);
>>>
>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>> "Murmur3Partitioner");
>>> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> "Murmur3Partitioner");
>>>
>>> // Make a new Hadoop RDD
>>> final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap =
>>> new TreeMap<ByteBuffer, IColumn>();
>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
>>> sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class,
>>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>>>
>>>
>>> i also tried the code segment that you have provided but i keep getting
>>> the following error.
>>>
>>> java:
>>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
>>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>>> of ? extends java.util.SortedMap>)
>>>
>>> Did you encounter this if so any help on this would be appreciated.
>>>
>>> Best Regards,
>>> Pulasthi
>>>
>>>
>>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>>
>>>> I'm trying to use data stored in cassandra (v1.2) and need some help.
>>>> I've translated the the scala example - CassandraTest.scala - to Java, but I
>>>> keep getting the following exception:
>>>>
>>>> Exception in thread "main" java.io.IOException: Could not get input
>>>> splits
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>>>> at
>>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>>>> at
>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>>>> at
>>>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
>>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>>>> ... 9 more
>>>> Caused by: java.lang.RuntimeException:
>>>> org.apache.thrift.transport.TTransportException
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>>> at java.lang.Thread.run(Thread.java:695)
>>>> Caused by: org.apache.thrift.transport.TTransportException
>>>> at
>>>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>>> at
>>>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>>>> at
>>>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>>> at
>>>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>>>> at
>>>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>>>> at
>>>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>>>> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>>>> at
>>>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>>>> at
>>>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>>>> at
>>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>>>> ... 8 more
>>>>
>>>>
>>>> This is the relevant code portion:
>>>>
>>>> Job job = new Job();
>>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>>> String host = "<server>";
>>>> String port = "9160";
>>>>
>>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>>>> "UserEvent", true);
>>>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>>>> "Murmur3Partitioner");
>>>> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>>> "Murmur3Partitioner");
>>>>
>>>> SlicePredicate predicate = new SlicePredicate();
>>>> SliceRange sliceRange = new SliceRange();
>>>> sliceRange.setStart(new byte[0]);
>>>> sliceRange.setFinish(new byte[0]);
>>>> predicate.setSlice_range(sliceRange);
>>>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>>> predicate);
>>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>>>> IColumn>();
>>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>>>> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
>>>> ByteBuffer.class, b.getClass());
>>>>
>>>>
>>>> I would appreciate any input you may have.
>>>>
>>>> Thanks!
>>>>
>>>> Tal
>>>
>>>
>>>
>>>
>>> --
>>> Pulasthi Supun
>>> Undergraduate
>>> Dpt of Computer Science & Engineering
>>> University of Moratuwa
>>> Blog : http://pulasthisupun.blogspot.com/
>>> Git hub profile: https://github.com/pulasthi
>>
>>
>
>
>
> --
> Pulasthi Supun
> Undergraduate
> Dpt of Computer Science & Engineering
> University of Moratuwa
> Blog : http://pulasthisupun.blogspot.com/
> Git hub profile: https://github.com/pulasthi
Re: Using Cassandra as an input stream from Java
Posted by Pulasthi Supun Wickramasinghe <pu...@gmail.com>.
Hi Tal,
Thanks for the info will try it out and see how it goes.
On Thu, Nov 28, 2013 at 2:19 PM, Tal Sliwowicz <ta...@taboola.com> wrote:
> Hi Pulasthi,
>
> I couldn't make it work, so what I ended up doing was implement 3 Java
> classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
> another that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
> extends org.apache.hadoop.mapreduce.RecordReader and used them to load data
> from Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
> great! I'm cleaning up the code a bit and will upload to github as an open
> source (after the summit).
>
> That's great looking forward check it out after you publish on github :).
Thanks,
Pulasthi
> I hope this helps for now,
>
> Tal
>
>
> On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe <
> pulasthi911@gmail.com> wrote:
>
>> Hi Tal,
>>
>> I also tried doing this by converting the scala sample into Java but i am
>> getting an compile time error below is the code
>>
>> JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>>
>> //Build the job configuration with ConfigHelper provided by
>> Cassandra
>> Job job = null;
>> try {
>> job = new Job();
>> } catch (IOException e) {
>> e.printStackTrace(); //To change body of catch statement use
>> File | Settings | File Templates.
>> }
>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>
>> String host = args[1];
>> String port = args[2];
>>
>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>> host);
>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>> ConfigHelper.setInputColumnFamily(job.getConfiguration(),
>> "casDemo", "Words");
>> ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
>> "casDemo", "WordCount");
>>
>> SlicePredicate predicate = new SlicePredicate();
>> SliceRange sliceRange = new SliceRange();
>> sliceRange.setStart(new byte[0]);
>> sliceRange.setFinish(new byte[0]);
>> predicate.setSlice_range(sliceRange);
>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>> predicate);
>>
>> ConfigHelper.setInputPartitioner(job.getConfiguration(),
>> "Murmur3Partitioner");
>> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> "Murmur3Partitioner");
>>
>> // Make a new Hadoop RDD
>> final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap =
>> new TreeMap<ByteBuffer, IColumn>();
>> JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
>> sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class,
>> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>>
>>
>> i also tried the code segment that you have provided but i keep getting
>> the following error.
>>
>> java:
>> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
>> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
>> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
>> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
>> of ? extends java.util.SortedMap>)
>>
>> Did you encounter this if so any help on this would be appreciated.
>>
>> Best Regards,
>> Pulasthi
>>
>>
>> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> I'm trying to use data stored in cassandra (v1.2) and need some help.
>>> I've translated the the scala example - CassandraTest.scala - to Java, but
>>> I keep getting the following exception:
>>>
>>> Exception in thread "main" java.io.IOException: Could not get input
>>> splits
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>>> at
>>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>>> at
>>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
>>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>>> ... 9 more
>>> Caused by: java.lang.RuntimeException:
>>> org.apache.thrift.transport.TTransportException
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>> at java.lang.Thread.run(Thread.java:695)
>>> Caused by: org.apache.thrift.transport.TTransportException
>>> at
>>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>> at
>>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>>> at
>>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>>> at
>>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>>> at
>>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>>> at
>>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>>> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>>> at
>>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>>> at
>>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>>> at
>>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>>> ... 8 more
>>>
>>>
>>> This is the relevant code portion:
>>>
>>> Job job = new Job();
>>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>>> String host = "<server>";
>>> String port = "9160";
>>>
>>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>>> "UserEvent", true);
>>> ConfigHelper.setInputPartitioner(job.getConfiguration(), "
>>> Murmur3Partitioner");
>>> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>>> "Murmur3Partitioner");
>>>
>>> SlicePredicate predicate = new SlicePredicate();
>>> SliceRange sliceRange = new SliceRange();
>>> sliceRange.setStart(new byte[0]);
>>> sliceRange.setFinish(new byte[0]);
>>> predicate.setSlice_range(sliceRange);
>>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>>> predicate);
>>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>>> IColumn>();
>>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>>> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
>>> ByteBuffer.class, b.getClass());
>>>
>>>
>>> I would appreciate any input you may have.
>>>
>>> Thanks!
>>>
>>> Tal
>>>
>>
>>
>>
>> --
>> Pulasthi Supun
>> Undergraduate
>> Dpt of Computer Science & Engineering
>> University of Moratuwa
>> Blog : http://pulasthisupun.blogspot.com/
>> Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
>> <https://github.com/pulasthi>
>>
>
>
--
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>
Re: Using Cassandra as an input stream from Java
Posted by Tal Sliwowicz <ta...@taboola.com>.
Hi Pulasthi,
I couldn't make it work, so what I ended up doing was implement 3 Java
classes - one that extends org.apache.hadoop.mapreduce.InputFormat ,
another that extends org.apache.hadoop.mapreduce.InputSplit and a 3rd that
extends org.apache.hadoop.mapreduce.RecordReader and used them to load data
from Cassandra to an RDD (using the newAPIHadoopRDD() method). It works
great! I'm cleaning up the code a bit and will upload to github as an open
source (after the summit).
I hope this helps for now,
Tal
On Thu, Nov 28, 2013 at 10:21 AM, Pulasthi Supun Wickramasinghe <
pulasthi911@gmail.com> wrote:
> Hi Tal,
>
> I also tried doing this by converting the scala sample into Java but i am
> getting an compile time error below is the code
>
> JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
>
> //Build the job configuration with ConfigHelper provided by
> Cassandra
> Job job = null;
> try {
> job = new Job();
> } catch (IOException e) {
> e.printStackTrace(); //To change body of catch statement use
> File | Settings | File Templates.
> }
> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>
> String host = args[1];
> String port = args[2];
>
> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
> ConfigHelper.setInputColumnFamily(job.getConfiguration(),
> "casDemo", "Words");
> ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
> "casDemo", "WordCount");
>
> SlicePredicate predicate = new SlicePredicate();
> SliceRange sliceRange = new SliceRange();
> sliceRange.setStart(new byte[0]);
> sliceRange.setFinish(new byte[0]);
> predicate.setSlice_range(sliceRange);
> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
> predicate);
>
> ConfigHelper.setInputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner");
> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner");
>
> // Make a new Hadoop RDD
> final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap =
> new TreeMap<ByteBuffer, IColumn>();
> JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
> sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class,
> ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
>
>
> i also tried the code segment that you have provided but i keep getting
> the following error.
>
> java:
> /home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
> <K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
> in org.apache.spark.api.java.JavaSparkContext cannot be applied to
> (org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
> of ? extends java.util.SortedMap>)
>
> Did you encounter this if so any help on this would be appreciated.
>
> Best Regards,
> Pulasthi
>
>
> On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com> wrote:
>
>> Hi,
>>
>>
>> I'm trying to use data stored in cassandra (v1.2) and need some help.
>> I've translated the the scala example - CassandraTest.scala - to Java, but
>> I keep getting the following exception:
>>
>> Exception in thread "main" java.io.IOException: Could not get input splits
>> at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
>> at
>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
>> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
>> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
>> at
>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
>> at
>> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
>> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
>> at com.taboola.test.spark_cassandra.App.main(App.java:65)
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
>> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
>> at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
>> ... 9 more
>> Caused by: java.lang.RuntimeException:
>> org.apache.thrift.transport.TTransportException
>> at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
>> at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
>> at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
>> at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> at java.lang.Thread.run(Thread.java:695)
>> Caused by: org.apache.thrift.transport.TTransportException
>> at
>> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>> at
>> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
>> at
>> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>> at
>> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>> at
>> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>> at
>> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>> at
>> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
>> at
>> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
>> at
>> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
>> ... 8 more
>>
>>
>> This is the relevant code portion:
>>
>> Job job = new Job();
>> job.setInputFormatClass(ColumnFamilyInputFormat.class);
>> String host = "<server>";
>> String port = "9160";
>>
>> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
>> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
>> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
>> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
>> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
>> "UserEvent", true);
>> ConfigHelper.setInputPartitioner(job.getConfiguration(), "
>> Murmur3Partitioner");
>> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> "Murmur3Partitioner");
>>
>> SlicePredicate predicate = new SlicePredicate();
>> SliceRange sliceRange = new SliceRange();
>> sliceRange.setStart(new byte[0]);
>> sliceRange.setFinish(new byte[0]);
>> predicate.setSlice_range(sliceRange);
>> ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
>> predicate);
>> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
>> IColumn>();
>> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
>> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
>> ByteBuffer.class, b.getClass());
>>
>>
>> I would appreciate any input you may have.
>>
>> Thanks!
>>
>> Tal
>>
>
>
>
> --
> Pulasthi Supun
> Undergraduate
> Dpt of Computer Science & Engineering
> University of Moratuwa
> Blog : http://pulasthisupun.blogspot.com/
> Git hub profile: <http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
> <https://github.com/pulasthi>
>
Re: Using Cassandra as an input stream from Java
Posted by Pulasthi Supun Wickramasinghe <pu...@gmail.com>.
Hi Tal,
I also tried doing this by converting the scala sample into Java but i am
getting an compile time error below is the code
JavaSparkContext sc = new JavaSparkContext("local[3]", "casDemo");
//Build the job configuration with ConfigHelper provided by
Cassandra
Job job = null;
try {
job = new Job();
} catch (IOException e) {
e.printStackTrace(); //To change body of catch statement use
File | Settings | File Templates.
}
job.setInputFormatClass(ColumnFamilyInputFormat.class);
String host = args[1];
String port = args[2];
ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
ConfigHelper.setInputColumnFamily(job.getConfiguration(),
"casDemo", "Words");
ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
"casDemo", "WordCount");
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[0]);
sliceRange.setFinish(new byte[0]);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
predicate);
ConfigHelper.setInputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
// Make a new Hadoop RDD
final SortedMap<ByteBuffer, IColumn> byteBufferIColumnSortedMap =
new TreeMap<ByteBuffer, IColumn>();
JavaPairRDD<ByteBuffer, ? extends SortedMap> casRdd =
sc.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class,
ByteBuffer.class, byteBufferIColumnSortedMap.getClass());
i also tried the code segment that you have provided but i keep getting the
following error.
java:
/home/pulasthi/work/spark/sparkcasstest/src/main/java/org/wso2/spark/CassSpark.java:66:
<K,V,F>newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>)
in org.apache.spark.api.java.JavaSparkContext cannot be applied to
(org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.ColumnFamilyInputFormat>,java.lang.Class<java.nio.ByteBuffer>,java.lang.Class<capture#92
of ? extends java.util.SortedMap>)
Did you encounter this if so any help on this would be appreciated.
Best Regards,
Pulasthi
On Tue, Nov 19, 2013 at 9:42 PM, Tal Sliwowicz <ta...@taboola.com> wrote:
> Hi,
>
>
> I'm trying to use data stored in cassandra (v1.2) and need some help. I've
> translated the the scala example - CassandraTest.scala - to Java, but I
> keep getting the following exception:
>
> Exception in thread "main" java.io.IOException: Could not get input splits
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:191)
> at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:66)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:199)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:772)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:560)
> at org.apache.spark.rdd.RDD.toArray(RDD.scala:567)
> at
> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:378)
> at
> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:386)
> at com.taboola.test.spark_cassandra.App.calc(App.java:119)
> at com.taboola.test.spark_cassandra.App.main(App.java:65)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
> at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
> at java.util.concurrent.FutureTask.get(FutureTask.java:83)
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:187)
> ... 9 more
> Caused by: java.lang.RuntimeException:
> org.apache.thrift.transport.TTransportException
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:302)
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.access$200(AbstractColumnFamilyInputFormat.java:64)
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:224)
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat$SplitCallable.call(AbstractColumnFamilyInputFormat.java:209)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> Caused by: org.apache.thrift.transport.TTransportException
> at
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> at
> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
> at
> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
> at
> org.apache.cassandra.thrift.Cassandra$Client.recv_describe_splits_ex(Cassandra.java:1324)
> at
> org.apache.cassandra.thrift.Cassandra$Client.describe_splits_ex(Cassandra.java:1308)
> at
> org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSubSplits(AbstractColumnFamilyInputFormat.java:279)
> ... 8 more
>
>
> This is the relevant code portion:
>
> Job job = new Job();
> job.setInputFormatClass(ColumnFamilyInputFormat.class);
> String host = "<server>";
> String port = "9160";
>
> ConfigHelper.setInputInitialAddress(job.getConfiguration(), host);
> ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host);
> ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
> ConfigHelper.setInputColumnFamily(job.getConfiguration(), "taltest",
> "UserEvent", true);
> ConfigHelper.setInputPartitioner(job.getConfiguration(), "
> Murmur3Partitioner");
> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner");
>
> SlicePredicate predicate = new SlicePredicate();
> SliceRange sliceRange = new SliceRange();
> sliceRange.setStart(new byte[0]);
> sliceRange.setFinish(new byte[0]);
> predicate.setSlice_range(sliceRange);
> ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
> final SortedMap<ByteBuffer, IColumn> b = new TreeMap<ByteBuffer,
> IColumn>();
> JavaPairRDD<ByteBuffer, ? extends SortedMap> rdd =
> ctx.newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class ,
> ByteBuffer.class, b.getClass());
>
>
> I would appreciate any input you may have.
>
> Thanks!
>
> Tal
>
--
Pulasthi Supun
Undergraduate
Dpt of Computer Science & Engineering
University of Moratuwa
Blog : http://pulasthisupun.blogspot.com/
Git hub profile:
<http://pulasthisupun.blogspot.com/>https://github.com/pulasthi
<https://github.com/pulasthi>