You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Madabhattula Rajesh Kumar <mr...@gmail.com> on 2014/07/31 18:49:16 UTC

Hbase

Hi Team,

I'm using below code to read table from hbase

Configuration conf = HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE, "table1");

JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(
            conf,
    TableInputFormat.class,
    ImmutableBytesWritable.class,
    Result.class);

I got hBaseRDD. I'm not able to read the column values from hBaseRDD.

*Could you please let me know, how to read the column values from hBaseRDD?*

Thank you for your help.

Regards,
Rajesh

Re: Hbase

Posted by Madabhattula Rajesh Kumar <mr...@gmail.com>.
Hi Akhil,

Thank you very much for your help and support.

Regards,
Rajesh


On Fri, Aug 1, 2014 at 7:57 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Here's a piece of code. In your case, you are missing the call() method
> inside the map function.
>
>
> import java.util.Iterator;
>
> import java.util.List;
>
> import org.apache.commons.configuration.Configuration;
>
> import org.apache.hadoop.hbase.HBaseConfiguration;
>
> import org.apache.hadoop.hbase.KeyValue;
>
> import org.apache.hadoop.hbase.client.Get;
>
> import org.apache.hadoop.hbase.client.HTable;
>
> import org.apache.hadoop.hbase.client.Result;
>
> import org.apache.hadoop.hbase.util.Bytes;
>
> import org.apache.spark.SparkConf;
>
> import org.apache.spark.SparkContext;
>
> import org.apache.spark.api.java.JavaRDD;
>
> import org.apache.spark.api.java.JavaSparkContext;
>
> import org.apache.spark.api.java.function.Function;
>
> import org.apache.spark.rdd.NewHadoopRDD;
>
> import org.apache.spark.streaming.Duration;
>
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
>
> import com.google.common.collect.Lists;
>
> import scala.Function1;
>
> import scala.Tuple2;
>
> import scala.collection.JavaConversions;
>
> import scala.collection.Seq;
>
> import scala.collection.JavaConverters.*;
>
> import scala.reflect.ClassTag;
>
> public class SparkHBaseMain {
>
>   @SuppressWarnings("deprecation")
>
>  public static void main(String[] arg){
>
>   try{
>
>   List<String> jars =
>> Lists.newArrayList("/home/akhld/Desktop/tools/spark-9/jars/spark-assembly-0.9.0-incubating-hadoop2.3.0-mr1-cdh5.0.0.jar",
>
>  "/home/akhld/Downloads/sparkhbasecode/hbase-server-0.96.0-hadoop2.jar",
>
>  "/home/akhld/Downloads/sparkhbasecode/hbase-protocol-0.96.0-hadoop2.jar",
>
>
>> "/home/akhld/Downloads/sparkhbasecode/hbase-hadoop2-compat-0.96.0-hadoop2.jar",
>
>  "/home/akhld/Downloads/sparkhbasecode/hbase-common-0.96.0-hadoop2.jar",
>
>  "/home/akhld/Downloads/sparkhbasecode/hbase-client-0.96.0-hadoop2.jar",
>
>  "/home/akhld/Downloads/sparkhbasecode/htrace-core-2.02.jar");
>
>  SparkConf spconf = new SparkConf();
>
>  spconf.setMaster("local");
>
>  spconf.setAppName("SparkHBase");
>
>  spconf.setSparkHome("/home/akhld/Desktop/tools/spark-9");
>
>  spconf.setJars(jars.toArray(new String[jars.size()]));
>
>  spconf.set("spark.executor.memory", "1g");
>
>  final JavaSparkContext sc = new JavaSparkContext(spconf);
>
>   org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
>
>  conf.addResource("/home/akhld/Downloads/sparkhbasecode/hbase-site.xml");
>
>  conf.set(TableInputFormat.INPUT_TABLE, "blogposts");
>
>    NewHadoopRDD<ImmutableBytesWritable, Result> rdd = new
>> NewHadoopRDD<ImmutableBytesWritable,
>> Result>(JavaSparkContext.toSparkContext(sc), TableInputFormat.class,
>> ImmutableBytesWritable.class, Result.class, conf);
>
>   JavaRDD<Tuple2<ImmutableBytesWritable, Result>> jrdd = rdd.toJavaRDD();
>
>   *ForEachFunction f = new ForEachFunction();*
>
> * JavaRDD<Iterator<String>> retrdd = jrdd.map(f);*
>
>
>
>> System.out.println("Count =>" + retrdd.count());
>
>   }catch(Exception e){
>
>   e.printStackTrace();
>
>  System.out.println("Craaaashed : " + e);
>
>   }
>
>   }
>
>   @SuppressWarnings("serial")
>
>     private static class ForEachFunction extends
>> Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{
>
>             *public Iterator<String> call(Tuple2<ImmutableBytesWritable,
>> Result> test) {*
>
> *            Result tmp = (Result) test._2;*
>
> * List<KeyValue> kvl = tmp.getColumn("post".getBytes(),
>> "title".getBytes());*
>
> * for(KeyValue kl:kvl){*
>
> * String sb = new String(kl.getValue());*
>
> * System.out.println("Value :" + sb);*
>
> * }*
>
> *            return null;*
>
> *            }*
>
>      }
>
>
>>  }
>
>
> Hope it helps.
>
>
> Thanks
> Best Regards
>
>
> On Fri, Aug 1, 2014 at 4:44 PM, Madabhattula Rajesh Kumar <
> mrajaforu@gmail.com> wrote:
>
>> Hi Akhil,
>>
>> Thank you for your response. I'm facing below issues.
>>
>> I'm not able to print the values. Am I missing any thing. Could you
>> please look into this issue.
>>
>>     JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
>> sc.newAPIHadoopRDD(
>>             conf,
>>     TableInputFormat.class,
>>     ImmutableBytesWritable.class,
>>     Result.class);
>>
>>     System.out.println(" ROWS COUNT = "+ hBaseRDD.count());
>>
>>   JavaRDD R = hBaseRDD.map(new Function<Tuple2<ImmutableBytesWritable,
>> Result>, Iterator<String>>(){
>>
>>         public Iterator<String> call(Tuple2<ImmutableBytesWritable,
>> Result> test)
>>         {
>>             Result tmp = (Result) test._2;
>>
>>             System.out.println("Inside ");
>>
>>         //    List<KeyValue> kvl = tmp.getColumn("post".getBytes(),
>> "title".getBytes());
>>             for(KeyValue kl:tmp.raw())
>>
>>             {
>>             String sb = new String(kl.getValue());
>>             System.out.println(sb);
>>             }
>>             return null;
>>         }
>>     }
>>     );
>>
>> *Output :*
>>
>> ROWS COUNT = 8
>>
>> It is not printing "Inside" statement also. I think it is not going into
>> this function.
>>
>> Could you please help me on this issue.
>>
>> Thank you for your support and help
>>
>> Regards,
>> Rajesh
>>
>>
>>
>> On Fri, Aug 1, 2014 at 12:17 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> You can use a map function like the following and do whatever you want
>>> with the Result.
>>>
>>> Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{
>>>>             public Iterator<String>
>>>> call(Tuple2<ImmutableBytesWritable, Result> test) {
>>>>             Result tmp = (Result) test._2;
>>>>  List<KeyValue> kvl = *tmp.getColumn("post".getBytes(),
>>>> "title".getBytes());*
>>>> for(KeyValue kl:kvl){
>>>>  String sb = new String(kl.getValue());
>>>> System.out.println(sb);
>>>>  }
>>>
>>>
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>>
>>> On Thu, Jul 31, 2014 at 10:19 PM, Madabhattula Rajesh Kumar <
>>> mrajaforu@gmail.com> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I'm using below code to read table from hbase
>>>>
>>>> Configuration conf = HBaseConfiguration.create();
>>>> conf.set(TableInputFormat.INPUT_TABLE, "table1");
>>>>
>>>> JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(
>>>>             conf,
>>>>     TableInputFormat.class,
>>>>     ImmutableBytesWritable.class,
>>>>     Result.class);
>>>>
>>>> I got hBaseRDD. I'm not able to read the column values from hBaseRDD.
>>>>
>>>> *Could you please let me know, how to read the column values from
>>>> hBaseRDD?*
>>>>
>>>> Thank you for your help.
>>>>
>>>> Regards,
>>>> Rajesh
>>>>
>>>>
>>>
>>
>

Re: Hbase

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Here's a piece of code. In your case, you are missing the call() method
inside the map function.


import java.util.Iterator;

import java.util.List;

import org.apache.commons.configuration.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.spark.SparkConf;

import org.apache.spark.SparkContext;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.rdd.NewHadoopRDD;

import org.apache.spark.streaming.Duration;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import com.google.common.collect.Lists;

import scala.Function1;

import scala.Tuple2;

import scala.collection.JavaConversions;

import scala.collection.Seq;

import scala.collection.JavaConverters.*;

import scala.reflect.ClassTag;

public class SparkHBaseMain {

 @SuppressWarnings("deprecation")

public static void main(String[] arg){

 try{

 List<String> jars =
> Lists.newArrayList("/home/akhld/Desktop/tools/spark-9/jars/spark-assembly-0.9.0-incubating-hadoop2.3.0-mr1-cdh5.0.0.jar",

"/home/akhld/Downloads/sparkhbasecode/hbase-server-0.96.0-hadoop2.jar",

"/home/akhld/Downloads/sparkhbasecode/hbase-protocol-0.96.0-hadoop2.jar",

"/home/akhld/Downloads/sparkhbasecode/hbase-hadoop2-compat-0.96.0-hadoop2.jar",

"/home/akhld/Downloads/sparkhbasecode/hbase-common-0.96.0-hadoop2.jar",

"/home/akhld/Downloads/sparkhbasecode/hbase-client-0.96.0-hadoop2.jar",

"/home/akhld/Downloads/sparkhbasecode/htrace-core-2.02.jar");

SparkConf spconf = new SparkConf();

spconf.setMaster("local");

spconf.setAppName("SparkHBase");

spconf.setSparkHome("/home/akhld/Desktop/tools/spark-9");

spconf.setJars(jars.toArray(new String[jars.size()]));

spconf.set("spark.executor.memory", "1g");

final JavaSparkContext sc = new JavaSparkContext(spconf);

 org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();

conf.addResource("/home/akhld/Downloads/sparkhbasecode/hbase-site.xml");

conf.set(TableInputFormat.INPUT_TABLE, "blogposts");

 NewHadoopRDD<ImmutableBytesWritable, Result> rdd = new
> NewHadoopRDD<ImmutableBytesWritable,
> Result>(JavaSparkContext.toSparkContext(sc), TableInputFormat.class,
> ImmutableBytesWritable.class, Result.class, conf);

 JavaRDD<Tuple2<ImmutableBytesWritable, Result>> jrdd = rdd.toJavaRDD();

 *ForEachFunction f = new ForEachFunction();*

* JavaRDD<Iterator<String>> retrdd = jrdd.map(f);*



> System.out.println("Count =>" + retrdd.count());

 }catch(Exception e){

 e.printStackTrace();

System.out.println("Craaaashed : " + e);

 }

 }

 @SuppressWarnings("serial")

    private static class ForEachFunction extends
> Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{

            *public Iterator<String> call(Tuple2<ImmutableBytesWritable,
> Result> test) {*

*            Result tmp = (Result) test._2;*

* List<KeyValue> kvl = tmp.getColumn("post".getBytes(),
> "title".getBytes());*

* for(KeyValue kl:kvl){*

* String sb = new String(kl.getValue());*

* System.out.println("Value :" + sb);*

* }*

*            return null;*

*            }*

     }


> }


Hope it helps.


Thanks
Best Regards


On Fri, Aug 1, 2014 at 4:44 PM, Madabhattula Rajesh Kumar <
mrajaforu@gmail.com> wrote:

> Hi Akhil,
>
> Thank you for your response. I'm facing below issues.
>
> I'm not able to print the values. Am I missing any thing. Could you please
> look into this issue.
>
>     JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
> sc.newAPIHadoopRDD(
>             conf,
>     TableInputFormat.class,
>     ImmutableBytesWritable.class,
>     Result.class);
>
>     System.out.println(" ROWS COUNT = "+ hBaseRDD.count());
>
>   JavaRDD R = hBaseRDD.map(new Function<Tuple2<ImmutableBytesWritable,
> Result>, Iterator<String>>(){
>
>         public Iterator<String> call(Tuple2<ImmutableBytesWritable,
> Result> test)
>         {
>             Result tmp = (Result) test._2;
>
>             System.out.println("Inside ");
>
>         //    List<KeyValue> kvl = tmp.getColumn("post".getBytes(),
> "title".getBytes());
>             for(KeyValue kl:tmp.raw())
>
>             {
>             String sb = new String(kl.getValue());
>             System.out.println(sb);
>             }
>             return null;
>         }
>     }
>     );
>
> *Output :*
>
> ROWS COUNT = 8
>
> It is not printing "Inside" statement also. I think it is not going into
> this function.
>
> Could you please help me on this issue.
>
> Thank you for your support and help
>
> Regards,
> Rajesh
>
>
>
> On Fri, Aug 1, 2014 at 12:17 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> You can use a map function like the following and do whatever you want
>> with the Result.
>>
>> Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{
>>>             public Iterator<String> call(Tuple2<ImmutableBytesWritable,
>>> Result> test) {
>>>             Result tmp = (Result) test._2;
>>>  List<KeyValue> kvl = *tmp.getColumn("post".getBytes(),
>>> "title".getBytes());*
>>> for(KeyValue kl:kvl){
>>>  String sb = new String(kl.getValue());
>>> System.out.println(sb);
>>>  }
>>
>>
>>
>>
>> Thanks
>> Best Regards
>>
>>
>> On Thu, Jul 31, 2014 at 10:19 PM, Madabhattula Rajesh Kumar <
>> mrajaforu@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>> I'm using below code to read table from hbase
>>>
>>> Configuration conf = HBaseConfiguration.create();
>>> conf.set(TableInputFormat.INPUT_TABLE, "table1");
>>>
>>> JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(
>>>             conf,
>>>     TableInputFormat.class,
>>>     ImmutableBytesWritable.class,
>>>     Result.class);
>>>
>>> I got hBaseRDD. I'm not able to read the column values from hBaseRDD.
>>>
>>> *Could you please let me know, how to read the column values from
>>> hBaseRDD?*
>>>
>>> Thank you for your help.
>>>
>>> Regards,
>>> Rajesh
>>>
>>>
>>
>

Re: Hbase

Posted by Madabhattula Rajesh Kumar <mr...@gmail.com>.
Hi Akhil,

Thank you for your response. I'm facing below issues.

I'm not able to print the values. Am I missing any thing. Could you please
look into this issue.

    JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
sc.newAPIHadoopRDD(
            conf,
    TableInputFormat.class,
    ImmutableBytesWritable.class,
    Result.class);

    System.out.println(" ROWS COUNT = "+ hBaseRDD.count());

  JavaRDD R = hBaseRDD.map(new Function<Tuple2<ImmutableBytesWritable,
Result>, Iterator<String>>(){
        public Iterator<String> call(Tuple2<ImmutableBytesWritable, Result>
test)
        {
            Result tmp = (Result) test._2;

            System.out.println("Inside ");

        //    List<KeyValue> kvl = tmp.getColumn("post".getBytes(),
"title".getBytes());
            for(KeyValue kl:tmp.raw())
            {
            String sb = new String(kl.getValue());
            System.out.println(sb);
            }
            return null;
        }
    }
    );

*Output :*

ROWS COUNT = 8

It is not printing "Inside" statement also. I think it is not going into
this function.

Could you please help me on this issue.

Thank you for your support and help

Regards,
Rajesh



On Fri, Aug 1, 2014 at 12:17 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> You can use a map function like the following and do whatever you want
> with the Result.
>
> Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{
>>             public Iterator<String> call(Tuple2<ImmutableBytesWritable,
>> Result> test) {
>>             Result tmp = (Result) test._2;
>>  List<KeyValue> kvl = *tmp.getColumn("post".getBytes(),
>> "title".getBytes());*
>> for(KeyValue kl:kvl){
>>  String sb = new String(kl.getValue());
>> System.out.println(sb);
>>  }
>
>
>
>
> Thanks
> Best Regards
>
>
> On Thu, Jul 31, 2014 at 10:19 PM, Madabhattula Rajesh Kumar <
> mrajaforu@gmail.com> wrote:
>
>> Hi Team,
>>
>> I'm using below code to read table from hbase
>>
>> Configuration conf = HBaseConfiguration.create();
>> conf.set(TableInputFormat.INPUT_TABLE, "table1");
>>
>> JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(
>>             conf,
>>     TableInputFormat.class,
>>     ImmutableBytesWritable.class,
>>     Result.class);
>>
>> I got hBaseRDD. I'm not able to read the column values from hBaseRDD.
>>
>> *Could you please let me know, how to read the column values from
>> hBaseRDD?*
>>
>> Thank you for your help.
>>
>> Regards,
>> Rajesh
>>
>>
>

Re: Hbase

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
You can use a map function like the following and do whatever you want with
the Result.

Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{
>             public Iterator<String> call(Tuple2<ImmutableBytesWritable,
> Result> test) {
>             Result tmp = (Result) test._2;
> List<KeyValue> kvl = *tmp.getColumn("post".getBytes(),
> "title".getBytes());*
> for(KeyValue kl:kvl){
> String sb = new String(kl.getValue());
> System.out.println(sb);
> }




Thanks
Best Regards


On Thu, Jul 31, 2014 at 10:19 PM, Madabhattula Rajesh Kumar <
mrajaforu@gmail.com> wrote:

> Hi Team,
>
> I'm using below code to read table from hbase
>
> Configuration conf = HBaseConfiguration.create();
> conf.set(TableInputFormat.INPUT_TABLE, "table1");
>
> JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(
>             conf,
>     TableInputFormat.class,
>     ImmutableBytesWritable.class,
>     Result.class);
>
> I got hBaseRDD. I'm not able to read the column values from hBaseRDD.
>
> *Could you please let me know, how to read the column values from
> hBaseRDD?*
>
> Thank you for your help.
>
> Regards,
> Rajesh
>
>