You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "zhanglibing000 (JIRA)" <ji...@apache.org> on 2017/12/12 13:39:00 UTC

[jira] [Created] (FLINK-8244) There are two zookeeper client created when read data from hbase in the flink yarn session model

zhanglibing000 created FLINK-8244:
-------------------------------------

             Summary: There are two zookeeper client created when read data from hbase in the flink yarn session model 
                 Key: FLINK-8244
                 URL: https://issues.apache.org/jira/browse/FLINK-8244
             Project: Flink
          Issue Type: Bug
          Components: Batch Connectors and Input/Output Formats
    Affects Versions: 1.3.2
            Reporter: zhanglibing000


I want to use flink HbaseTableInputFormat api to read data from hbase, and that is ok when I try to run my code in the local model, but when I try to run the code use the flink yarn session model,there some problems in task manager,and it does't has any error message,and no data output from the datasource ,finally I find the zookeeper client Create two times in the task manager log as attachment show,I think there has some problem in flink.
thanks 


{code:java}
import com.alibaba.fastjson.JSON
import com.google.gson.Gson
import com.mininglamp.shmetro.RuleEnginerStreamingFlinkApp.logger
import com.mininglamp.shmetro.bean.json.JsonBeanGenerator
import com.mininglamp.shmetro.model.TrainData
import com.mininglamp.shmetro.output.GlobalEntity
import com.mininglamp.shmetro.rules.{BatchRuleEngineerByScannerExecutor, RuleEngineerByRedisExecutor}
import com.mininglamp.shmetro.source.HBaseTableInputFormat
import com.mininglamp.shmetro.util.RedisUtil
import com.mininglamp.shmetro.utils.{MysqlUtil}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.operators.{DataSource, GroupReduceOperator, MapOperator}
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes

import scala.collection.JavaConversions._

object RuleEnginerBatchFlinkAppCopy {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)
    env.getConfig.setGlobalJobParameters(params)

    val appName = params.get("batch.name")
    val updateTime = params.getLong("update.time")
    val tableName = params.get("hbase.table.name")
    val columnFamily = params.get("hbase.table.cf")
    val columns = params.get("hbase.table.columns").split(",")
    val startTime = params.get("start.time")
    val endTime = params.get("end.time")
    val trainNo = params.get("train.no")
    val startRow = new StringBuilder
    startRow.append(trainNo).append("\0").append(startTime)
    val endRow = new StringBuilder
    endRow.append(trainNo).append("\0").append(endTime)

    val hBaseDataSource: DataSource[Tuple2[String, String]] = env.createInput(new HBaseTableInputFormat[Tuple2[String, String]](tableName, columnFamily, columns, null, startRow.toString(), endRow.toString()) {
      private val reuse = new Tuple2[String, String]

      override
      protected def mapResultToTuple(r: Result): Tuple2[String, String] = {
        logger.error("**********hbase row: " + reuse)
        val key = Bytes.toString(r.getRow)
        val value = getMapResult(r)
        reuse.setField(key, 0)
        val data = value.get(key)
        reuse.setField(JSON.toJSON(data).toString, 1)
        return reuse
      }
    })
    hBaseDataSource.collect()
  }

}
{code}


{code:java}
    import org.apache.commons.lang.StringUtils;
    import org.apache.flink.addons.hbase.AbstractTableInputFormat;
    import org.apache.flink.addons.hbase.TableInputFormat;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HConstants;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.filter.CompareFilter;
    import org.apache.hadoop.hbase.filter.Filter;
    import org.apache.hadoop.hbase.filter.RegexStringComparator;
    import org.apache.hadoop.hbase.filter.RowFilter;
    import org.apache.hadoop.hbase.util.Bytes;

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    public  abstract class HBaseTableInputFormat<T extends Tuple> extends AbstractTableInputFormat<T> {

        private String tableName;
        private String columnFamily;
        private String[] columns;
        private String filterName;
        private String startRow;
        private String stopRow;

        public HBaseTableInputFormat(String tableName, String columnFamily, String[] columns, String filterName, String startRow, String stopRow) {
            this.tableName = tableName;
            this.columnFamily = columnFamily;
            this.columns = columns;
            this.filterName = filterName;
            this.startRow = startRow;
            this.stopRow = stopRow;
        }

        protected Scan getScanner() {
            Scan scan = new Scan();
            if (!StringUtils.isEmpty(columnFamily)) {
                scan.addFamily(columnFamily.getBytes());
            }
            if (!StringUtils.isEmpty(filterName)) {
                Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(filterName));
                scan.setFilter(filter);
            }
            if (columns != null && !StringUtils.isEmpty(columnFamily)) {
                for (String column : columns) {
                    scan.addColumn(columnFamily.getBytes(), column.getBytes());
                }
            }
            if (!StringUtils.isEmpty(startRow)) {
                scan.setStartRow(startRow.getBytes());
            }
            if (!StringUtils.isEmpty(stopRow)) {
                scan.setStopRow(stopRow.getBytes());
            }
            return scan;
        }

        protected String getTableName() {
            return tableName;
        }

        protected abstract T mapResultToTuple(Result r);

        @Override
        public void configure(Configuration parameters) {
            table = createTable();
            if (table != null) {
                scan = getScanner();
            }
        }

        private HTable createTable() {
            LOG.info("Initializing HBaseConfiguration");
            org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
            configuration.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1200000);
            configuration.set("hbase.zookeeper.quorum", "x.x.x.x");
            configuration.set("hbase.master.info.port", "2181");
            configuration.set("hbase.master", "172.17.1.21:60000");
            configuration.setInt("hbase.rpc.timeout", 20000);
            configuration.setInt("hbase.client.operation.timeout", 30000);

            try {
                return new HTable(configuration, getTableName());
            } catch (Exception e) {
                LOG.error("Error instantiating a new HTable instance", e);
            }
            return null;
        }

        protected T mapResultToOutType(Result r) {
            return mapResultToTuple(r);
        }

        public Map<String, Map<String, String>> getMapResult(Result result) {

            Map<String, Map<String, String>> resMap = new HashMap<>();
            Cell[] cells = result.rawCells();
            Map<String, String> cellMap = new HashMap<>();
            for (Cell cell : cells) {
                cellMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
            }
            resMap.put(Bytes.toString(result.getRow()), cellMap);
            return resMap;
        }

    }

{code}









--
This message was sent by Atlassian JIRA
(v6.4.14#64029)