You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by "ZhanxiongWang (Jira)" <ji...@apache.org> on 2019/09/21 07:11:00 UTC

[jira] [Updated] (HBASE-23062) Use TableInputFormat to read data from Hbase, when Scan.setCaching(size) the size is too big, some rowkeys will lost without exctpions.

     [ https://issues.apache.org/jira/browse/HBASE-23062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ZhanxiongWang updated HBASE-23062:
----------------------------------
    Description: 
I did the experiment in two ways. One way I use spark to read hbase, the other I use mapreduce to read hbase. In both cases, when I increase the Scan Caching size, some data will be lost. To be more accurately, When I set scan.setCaching(500), I can receive 7622 rows of data, but when I set scan.setCaching(50000), I can receive only 4226 rows of data. The seriousness of the problem is that the data is lost but there is no exceptions, it is difficult to find the reason.

My spark code is like this:
{code:java}
Configuration hbaseConfiguration = HBaseConfiguration.create();
hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
hbaseConfiguration.set(TableInputFormat.INPUT_TABLE,hbaseTableName);
hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
final Scan hbaseScan = new Scan();
hbaseScan.addFamily(familyName);
hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
for(String[] cell:cellNames){ 
String column = cell[0]; 
hbaseScan.addColumn(familyName,Bytes.toBytes(column));
}
hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
try { 
ClientProtos.Scan scanProto = ProtobufUtil.toScan(hbaseScan); hbaseConfiguration.set(TableInputFormat.SCAN, Base64.encodeBytes(scanProto.toByteArray())); JavaPairRDD<ImmutableBytesWritable, Result> pairRDD = jsc.<ImmutableBytesWritable, Result, TableInputFormat>newAPIHadoopRDD( hbaseConfiguration, TableInputFormat.class, ImmutableBytesWritable.class, Result.class );
System.out.println("pairRDD.count(): " + pairRDD.count());
} 
catch (IOException e) { 
System.out.println("Scan Exception!!!!!! " + e.getMessage());
}
{code}
My mapreduce code is like this:
{code:java}
static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
@Override protected void map(ImmutableBytesWritable key, Result value,Mapper.Context context) throws IOException, InterruptedException {
 for(Cell cell :value.rawCells()){ 
context.write(new ImmutableBytesWritable("A".getBytes()),new Text("max")); } }} 

public static void main(String[] args) throws Exception { 
org.apache.hadoop.conf.Configuration hbaseConfiguration = HBaseConfiguration.create(); hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort); hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster); hbaseConfiguration.set("zookeeper.znode.parent", zkPath); hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000); hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
Job job = Job.getInstance(hbaseConfiguration); job.setJarByClass(App.class);
List<Scan> list = new ArrayList<Scan>(); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(familyName)); 
scan.setCaching(50000);//if Caching is too big, some rowkeys will lost! 
for (String[] cell : cellNames) { 
String column = cell[0]; 
scan.addColumn(familyName,Bytes.toBytes(column)); 
} 
scan.setStartRow(Bytes.toBytes(startRowkeyStr)); 
scan.setStopRow(Bytes.toBytes(endRowkeyStr)); scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(hbaseTableName)); list.add(scan);
System.out.println("size: "+list.size()); 
TableMapReduceUtil.initTableMapperJob(list,HbaseMapper.class,ImmutableBytesWritable.class,Text.class, job);
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Text.class); 
job.setOutputKeyClass(ImmutableBytesWritable.class); 
job.setOutputValueClass(Text.class); 
FileOutputFormat.setOutputPath(job, new Path("maxTestOutput")); System.exit(job.waitForCompletion(true) ? 0 : 1);
}{code}
The pom.xml for mapreduce code is like this:

[^pom.xml]

 

  was:
I did the experiment in two ways. One way I use spark to read hbase, the other I use mapreduce to read hbase. In both cases, when I increase the Scan Caching size, some data will be lost. To be more accurately, When I set scan.setCaching(500), I can receive 7622 rows of data, but when I set scan.setCaching(50000), I can receive only 4226 rows of data. The seriousness of the problem is that the data is lost but there is no exceptions, it is difficult to find the reason.

 

 

My spark code is like this:
{code:java}
Configuration hbaseConfiguration = HBaseConfiguration.create();
hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
hbaseConfiguration.set(TableInputFormat.INPUT_TABLE,hbaseTableName);
hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
final Scan hbaseScan = new Scan();
hbaseScan.addFamily(familyName);
hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
for(String[] cell:cellNames){ 
String column = cell[0]; 
hbaseScan.addColumn(familyName,Bytes.toBytes(column));
}
hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
try { 
ClientProtos.Scan scanProto = ProtobufUtil.toScan(hbaseScan); hbaseConfiguration.set(TableInputFormat.SCAN, Base64.encodeBytes(scanProto.toByteArray())); JavaPairRDD<ImmutableBytesWritable, Result> pairRDD = jsc.<ImmutableBytesWritable, Result, TableInputFormat>newAPIHadoopRDD( hbaseConfiguration, TableInputFormat.class, ImmutableBytesWritable.class, Result.class );
System.out.println("pairRDD.count(): " + pairRDD.count());
} 
catch (IOException e) { 
System.out.println("Scan Exception!!!!!! " + e.getMessage());
}
{code}
 

My mapreduce code is like this:

 
{code:java}
static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
@Override protected void map(ImmutableBytesWritable key, Result value,Mapper.Context context) throws IOException, InterruptedException {
 for(Cell cell :value.rawCells()){ 
context.write(new ImmutableBytesWritable("A".getBytes()),new Text("max")); } }} 

public static void main(String[] args) throws Exception { 
org.apache.hadoop.conf.Configuration hbaseConfiguration = HBaseConfiguration.create(); hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort); hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster); hbaseConfiguration.set("zookeeper.znode.parent", zkPath); hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000); hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
Job job = Job.getInstance(hbaseConfiguration); job.setJarByClass(App.class);
List<Scan> list = new ArrayList<Scan>(); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(familyName)); 
scan.setCaching(50000);//if Caching is too big, some rowkeys will lost! 
for (String[] cell : cellNames) { 
String column = cell[0]; 
scan.addColumn(familyName,Bytes.toBytes(column)); 
} 
scan.setStartRow(Bytes.toBytes(startRowkeyStr)); 
scan.setStopRow(Bytes.toBytes(endRowkeyStr)); scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(hbaseTableName)); list.add(scan);
System.out.println("size: "+list.size()); 
TableMapReduceUtil.initTableMapperJob(list,HbaseMapper.class,ImmutableBytesWritable.class,Text.class, job);
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Text.class); 
job.setOutputKeyClass(ImmutableBytesWritable.class); 
job.setOutputValueClass(Text.class); 
FileOutputFormat.setOutputPath(job, new Path("maxTestOutput")); System.exit(job.waitForCompletion(true) ? 0 : 1);
}{code}
 

 

The pom.xml for mapreduce code is like this:

[^pom.xml]

 


> Use TableInputFormat to read data from Hbase, when Scan.setCaching(size) the size is too big, some rowkeys will lost without exctpions.
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HBASE-23062
>                 URL: https://issues.apache.org/jira/browse/HBASE-23062
>             Project: HBase
>          Issue Type: Bug
>    Affects Versions: 0.98.6.1
>            Reporter: ZhanxiongWang
>            Priority: Major
>         Attachments: pom.xml
>
>
> I did the experiment in two ways. One way I use spark to read hbase, the other I use mapreduce to read hbase. In both cases, when I increase the Scan Caching size, some data will be lost. To be more accurately, When I set scan.setCaching(500), I can receive 7622 rows of data, but when I set scan.setCaching(50000), I can receive only 4226 rows of data. The seriousness of the problem is that the data is lost but there is no exceptions, it is difficult to find the reason.
> My spark code is like this:
> {code:java}
> Configuration hbaseConfiguration = HBaseConfiguration.create();
> hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
> hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
> hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
> hbaseConfiguration.set(TableInputFormat.INPUT_TABLE,hbaseTableName);
> hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
> hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
> final Scan hbaseScan = new Scan();
> hbaseScan.addFamily(familyName);
> hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
> for(String[] cell:cellNames){ 
> String column = cell[0]; 
> hbaseScan.addColumn(familyName,Bytes.toBytes(column));
> }
> hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
> hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
> try { 
> ClientProtos.Scan scanProto = ProtobufUtil.toScan(hbaseScan); hbaseConfiguration.set(TableInputFormat.SCAN, Base64.encodeBytes(scanProto.toByteArray())); JavaPairRDD<ImmutableBytesWritable, Result> pairRDD = jsc.<ImmutableBytesWritable, Result, TableInputFormat>newAPIHadoopRDD( hbaseConfiguration, TableInputFormat.class, ImmutableBytesWritable.class, Result.class );
> System.out.println("pairRDD.count(): " + pairRDD.count());
> } 
> catch (IOException e) { 
> System.out.println("Scan Exception!!!!!! " + e.getMessage());
> }
> {code}
> My mapreduce code is like this:
> {code:java}
> static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
> @Override protected void map(ImmutableBytesWritable key, Result value,Mapper.Context context) throws IOException, InterruptedException {
>  for(Cell cell :value.rawCells()){ 
> context.write(new ImmutableBytesWritable("A".getBytes()),new Text("max")); } }} 
> public static void main(String[] args) throws Exception { 
> org.apache.hadoop.conf.Configuration hbaseConfiguration = HBaseConfiguration.create(); hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort); hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster); hbaseConfiguration.set("zookeeper.znode.parent", zkPath); hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000); hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
> Job job = Job.getInstance(hbaseConfiguration); job.setJarByClass(App.class);
> List<Scan> list = new ArrayList<Scan>(); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(familyName)); 
> scan.setCaching(50000);//if Caching is too big, some rowkeys will lost! 
> for (String[] cell : cellNames) { 
> String column = cell[0]; 
> scan.addColumn(familyName,Bytes.toBytes(column)); 
> } 
> scan.setStartRow(Bytes.toBytes(startRowkeyStr)); 
> scan.setStopRow(Bytes.toBytes(endRowkeyStr)); scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(hbaseTableName)); list.add(scan);
> System.out.println("size: "+list.size()); 
> TableMapReduceUtil.initTableMapperJob(list,HbaseMapper.class,ImmutableBytesWritable.class,Text.class, job);
> job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Text.class); 
> job.setOutputKeyClass(ImmutableBytesWritable.class); 
> job.setOutputValueClass(Text.class); 
> FileOutputFormat.setOutputPath(job, new Path("maxTestOutput")); System.exit(job.waitForCompletion(true) ? 0 : 1);
> }{code}
> The pom.xml for mapreduce code is like this:
> [^pom.xml]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)