You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by "Shinya Yoshida (Jira)" <ji...@apache.org> on 2021/02/25 10:34:00 UTC

[jira] [Updated] (HBASE-25608) Support HFileOutputFormat locality sensitive even destination cluster is different from source cluster

     [ https://issues.apache.org/jira/browse/HBASE-25608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Shinya Yoshida updated HBASE-25608:
-----------------------------------
    Description: 
Sometimes, we want to perform MR job which is source cluster and destination cluster is different like following for data migration, batch job and so on.

 
{code:java}
        Configuration conf = HBaseConfiguration.createClusterConf(HBaseConfiguration.create(), sourceClusterKey);

        final Job job = Job.getInstance(conf, jobName);
        // ...
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        Scan scan = createScanner();

        TableMapReduceUtil.initTableMapperJob(
                sourceTableName, scan,
                Mapper.class,
                ImmutableBytesWritable.class, Put.class, job);

        try (Connection con = ConnectionFactory.createConnection(destinationClusterKey); 
             Table table = con.getTable(destinationTableName); 
             RegionLocator regionLocator = con.getRegionLocator(destinationTableName)) {
            HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
        }
        return job.waitForCompletion(true) ? 0 : 1;
{code}

HFileOutputFormat2 doesn't create locality-sensitive hfiles.

We got following exception
{code:java}
2021-02-24 19:55:48,298 WARN [main] com.linecorp.talk.repackaged.org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2: there's something wrong when locating rowkey: xxxxxxxxxxxx
org.apache.hadoop.hbase.TableNotFoundException: Table 'table' was not found, got: XXXX.
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1302)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1181)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1165)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1122)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getRegionLocation(ConnectionManager.java:957)
        at org.apache.hadoop.hbase.client.HRegionLocator.getRegionLocation(HRegionLocator.java:74)
        at com.linecorp.talk.repackaged.org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:216)
        at com.linecorp.talk.repackaged.org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:167)
        at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
        at org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:78)
        at org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:43)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
{code}

Because it creates connection using task configuration which is configured for source cluster.
Thus, it tried to connect to the source cluster and get locations for the table that should exist in the destination.

{code:java}
          InetSocketAddress[] favoredNodes = null;
          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
            HRegionLocation loc = null;
            String tableName = Bytes.toString(tableNameBytes);
            if (tableName != null) {
              try (Connection connection = ConnectionFactory.createConnection(conf);
                  RegionLocator locator =
                      connection.getRegionLocator(TableName.valueOf(tableName))) {
                loc = locator.getRegionLocation(rowKey);
              } catch (Throwable e) {
                LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
                  tableName, e);
                loc = null;
              }
            }
            if (null == loc) {
              LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
            } else {
              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
              InetSocketAddress initialIsa =
                  new InetSocketAddress(loc.getHostname(), loc.getPort());
              if (initialIsa.isUnresolved()) {
                LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
              } else {
                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
                favoredNodes = new InetSocketAddress[] { initialIsa };
              }
            }
          }
          wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
{code}

HFileOutputFormat2 should be aware of destination cluster correctly when source and destination is different for proper location-sensitive HFile generation

> Support HFileOutputFormat locality sensitive even destination cluster is different from source cluster
> ------------------------------------------------------------------------------------------------------
>
>                 Key: HBASE-25608
>                 URL: https://issues.apache.org/jira/browse/HBASE-25608
>             Project: HBase
>          Issue Type: Improvement
>            Reporter: Shinya Yoshida
>            Assignee: Shinya Yoshida
>            Priority: Major
>
> Sometimes, we want to perform MR job which is source cluster and destination cluster is different like following for data migration, batch job and so on.
>  
> {code:java}
>         Configuration conf = HBaseConfiguration.createClusterConf(HBaseConfiguration.create(), sourceClusterKey);
>         final Job job = Job.getInstance(conf, jobName);
>         // ...
>         FileOutputFormat.setOutputPath(job, new Path(outputPath));
>         Scan scan = createScanner();
>         TableMapReduceUtil.initTableMapperJob(
>                 sourceTableName, scan,
>                 Mapper.class,
>                 ImmutableBytesWritable.class, Put.class, job);
>         try (Connection con = ConnectionFactory.createConnection(destinationClusterKey); 
>              Table table = con.getTable(destinationTableName); 
>              RegionLocator regionLocator = con.getRegionLocator(destinationTableName)) {
>             HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
>         }
>         return job.waitForCompletion(true) ? 0 : 1;
> {code}
> HFileOutputFormat2 doesn't create locality-sensitive hfiles.
> We got following exception
> {code:java}
> 2021-02-24 19:55:48,298 WARN [main] com.linecorp.talk.repackaged.org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2: there's something wrong when locating rowkey: xxxxxxxxxxxx
> org.apache.hadoop.hbase.TableNotFoundException: Table 'table' was not found, got: XXXX.
>         at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1302)
>         at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1181)
>         at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1165)
>         at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1122)
>         at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getRegionLocation(ConnectionManager.java:957)
>         at org.apache.hadoop.hbase.client.HRegionLocator.getRegionLocation(HRegionLocator.java:74)
>         at com.linecorp.talk.repackaged.org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:216)
>         at com.linecorp.talk.repackaged.org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:167)
>         at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
>         at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
>         at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
>         at org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:78)
>         at org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:43)
>         at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
>         at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
>         at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
>         at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>         at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
> {code}
> Because it creates connection using task configuration which is configured for source cluster.
> Thus, it tried to connect to the source cluster and get locations for the table that should exist in the destination.
> {code:java}
>           InetSocketAddress[] favoredNodes = null;
>           if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
>             HRegionLocation loc = null;
>             String tableName = Bytes.toString(tableNameBytes);
>             if (tableName != null) {
>               try (Connection connection = ConnectionFactory.createConnection(conf);
>                   RegionLocator locator =
>                       connection.getRegionLocator(TableName.valueOf(tableName))) {
>                 loc = locator.getRegionLocation(rowKey);
>               } catch (Throwable e) {
>                 LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
>                   tableName, e);
>                 loc = null;
>               }
>             }
>             if (null == loc) {
>               LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
>             } else {
>               LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
>               InetSocketAddress initialIsa =
>                   new InetSocketAddress(loc.getHostname(), loc.getPort());
>               if (initialIsa.isUnresolved()) {
>                 LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
>               } else {
>                 LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
>                 favoredNodes = new InetSocketAddress[] { initialIsa };
>               }
>             }
>           }
>           wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
> {code}
> HFileOutputFormat2 should be aware of destination cluster correctly when source and destination is different for proper location-sensitive HFile generation



--
This message was sent by Atlassian Jira
(v8.3.4#803005)