You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/09 04:59:50 UTC
[4/9] hbase git commit: HBASE-18817 pull the hbase-spark module out
of branch-2.
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala
deleted file mode 100644
index 65a3bc7..0000000
--- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.datasources.hbase
-
-import org.apache.avro.Schema
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.spark.SchemaConverters
-import org.apache.hadoop.hbase.spark.datasources._
-import org.apache.hadoop.hbase.spark.hbase._
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.util.DataTypeParser
-import org.apache.spark.sql.types._
-import org.json4s.jackson.JsonMethods._
-
-import scala.collection.mutable
-
-// Due the access issue defined in spark, we have to locate the file in this package.
-// The definition of each column cell, which may be composite type
-// TODO: add avro support
-@InterfaceAudience.Private
-case class Field(
- colName: String,
- cf: String,
- col: String,
- sType: Option[String] = None,
- avroSchema: Option[String] = None,
- serdes: Option[SerDes]= None,
- len: Int = -1) extends Logging {
- override def toString = s"$colName $cf $col"
- val isRowKey = cf == HBaseTableCatalog.rowKey
- var start: Int = _
- def schema: Option[Schema] = avroSchema.map { x =>
- logDebug(s"avro: $x")
- val p = new Schema.Parser
- p.parse(x)
- }
-
- lazy val exeSchema = schema
-
- // converter from avro to catalyst structure
- lazy val avroToCatalyst: Option[Any => Any] = {
- schema.map(SchemaConverters.createConverterToSQL(_))
- }
-
- // converter from catalyst to avro
- lazy val catalystToAvro: (Any) => Any ={
- SchemaConverters.createConverterToAvro(dt, colName, "recordNamespace")
- }
-
- def cfBytes: Array[Byte] = {
- if (isRowKey) {
- Bytes.toBytes("")
- } else {
- Bytes.toBytes(cf)
- }
- }
- def colBytes: Array[Byte] = {
- if (isRowKey) {
- Bytes.toBytes("key")
- } else {
- Bytes.toBytes(col)
- }
- }
-
- val dt = {
- sType.map(DataTypeParser.parse(_)).getOrElse{
- schema.map{ x=>
- SchemaConverters.toSqlType(x).dataType
- }.get
- }
- }
-
- var length: Int = {
- if (len == -1) {
- dt match {
- case BinaryType | StringType => -1
- case BooleanType => Bytes.SIZEOF_BOOLEAN
- case ByteType => 1
- case DoubleType => Bytes.SIZEOF_DOUBLE
- case FloatType => Bytes.SIZEOF_FLOAT
- case IntegerType => Bytes.SIZEOF_INT
- case LongType => Bytes.SIZEOF_LONG
- case ShortType => Bytes.SIZEOF_SHORT
- case _ => -1
- }
- } else {
- len
- }
-
- }
-
- override def equals(other: Any): Boolean = other match {
- case that: Field =>
- colName == that.colName && cf == that.cf && col == that.col
- case _ => false
- }
-}
-
-// The row key definition, with each key refer to the col defined in Field, e.g.,
-// key1:key2:key3
-@InterfaceAudience.Private
-case class RowKey(k: String) {
- val keys = k.split(":")
- var fields: Seq[Field] = _
- var varLength = false
- def length = {
- if (varLength) {
- -1
- } else {
- fields.foldLeft(0){case (x, y) =>
- x + y.length
- }
- }
- }
-}
-// The map between the column presented to Spark and the HBase field
-@InterfaceAudience.Private
-case class SchemaMap(map: mutable.HashMap[String, Field]) {
- def toFields = map.map { case (name, field) =>
- StructField(name, field.dt)
- }.toSeq
-
- def fields = map.values
-
- def getField(name: String) = map(name)
-}
-
-
-// The definition of HBase and Relation relation schema
-@InterfaceAudience.Private
-case class HBaseTableCatalog(
- namespace: String,
- name: String,
- row: RowKey,
- sMap: SchemaMap,
- @transient params: Map[String, String]) extends Logging {
- def toDataType = StructType(sMap.toFields)
- def getField(name: String) = sMap.getField(name)
- def getRowKey: Seq[Field] = row.fields
- def getPrimaryKey= row.keys(0)
- def getColumnFamilies = {
- sMap.fields.map(_.cf).filter(_ != HBaseTableCatalog.rowKey).toSeq.distinct
- }
-
- def get(key: String) = params.get(key)
-
- // Setup the start and length for each dimension of row key at runtime.
- def dynSetupRowKey(rowKey: Array[Byte]) {
- logDebug(s"length: ${rowKey.length}")
- if(row.varLength) {
- var start = 0
- row.fields.foreach { f =>
- logDebug(s"start: $start")
- f.start = start
- f.length = {
- // If the length is not defined
- if (f.length == -1) {
- f.dt match {
- case StringType =>
- var pos = rowKey.indexOf(HBaseTableCatalog.delimiter, start)
- if (pos == -1 || pos > rowKey.length) {
- // this is at the last dimension
- pos = rowKey.length
- }
- pos - start
- // We don't know the length, assume it extend to the end of the rowkey.
- case _ => rowKey.length - start
- }
- } else {
- f.length
- }
- }
- start += f.length
- }
- }
- }
-
- def initRowKey = {
- val fields = sMap.fields.filter(_.cf == HBaseTableCatalog.rowKey)
- row.fields = row.keys.flatMap(n => fields.find(_.col == n))
- // The length is determined at run time if it is string or binary and the length is undefined.
- if (row.fields.filter(_.length == -1).isEmpty) {
- var start = 0
- row.fields.foreach { f =>
- f.start = start
- start += f.length
- }
- } else {
- row.varLength = true
- }
- }
- initRowKey
-}
-
-@InterfaceAudience.Public
-object HBaseTableCatalog {
- // If defined and larger than 3, a new table will be created with the nubmer of region specified.
- val newTable = "newtable"
- // The json string specifying hbase catalog information
- val regionStart = "regionStart"
- val defaultRegionStart = "aaaaaaa"
- val regionEnd = "regionEnd"
- val defaultRegionEnd = "zzzzzzz"
- val tableCatalog = "catalog"
- // The row key with format key1:key2 specifying table row key
- val rowKey = "rowkey"
- // The key for hbase table whose value specify namespace and table name
- val table = "table"
- // The namespace of hbase table
- val nameSpace = "namespace"
- // The name of hbase table
- val tableName = "name"
- // The name of columns in hbase catalog
- val columns = "columns"
- val cf = "cf"
- val col = "col"
- val `type` = "type"
- // the name of avro schema json string
- val avro = "avro"
- val delimiter: Byte = 0
- val serdes = "serdes"
- val length = "length"
-
- /**
- * User provide table schema definition
- * {"tablename":"name", "rowkey":"key1:key2",
- * "columns":{"col1":{"cf":"cf1", "col":"col1", "type":"type1"},
- * "col2":{"cf":"cf2", "col":"col2", "type":"type2"}}}
- * Note that any col in the rowKey, there has to be one corresponding col defined in columns
- */
- def apply(params: Map[String, String]): HBaseTableCatalog = {
- val parameters = convert(params)
- // println(jString)
- val jString = parameters(tableCatalog)
- val map = parse(jString).values.asInstanceOf[Map[String, _]]
- val tableMeta = map.get(table).get.asInstanceOf[Map[String, _]]
- val nSpace = tableMeta.get(nameSpace).getOrElse("default").asInstanceOf[String]
- val tName = tableMeta.get(tableName).get.asInstanceOf[String]
- val cIter = map.get(columns).get.asInstanceOf[Map[String, Map[String, String]]].toIterator
- val schemaMap = mutable.HashMap.empty[String, Field]
- cIter.foreach { case (name, column) =>
- val sd = {
- column.get(serdes).asInstanceOf[Option[String]].map(n =>
- Class.forName(n).newInstance().asInstanceOf[SerDes]
- )
- }
- val len = column.get(length).map(_.toInt).getOrElse(-1)
- val sAvro = column.get(avro).map(parameters(_))
- val f = Field(name, column.getOrElse(cf, rowKey),
- column.get(col).get,
- column.get(`type`),
- sAvro, sd, len)
- schemaMap.+=((name, f))
- }
- val rKey = RowKey(map.get(rowKey).get.asInstanceOf[String])
- HBaseTableCatalog(nSpace, tName, rKey, SchemaMap(schemaMap), parameters)
- }
-
- val TABLE_KEY: String = "hbase.table"
- val SCHEMA_COLUMNS_MAPPING_KEY: String = "hbase.columns.mapping"
-
- /* for backward compatibility. Convert the old definition to new json based definition formated as below
- val catalog = s"""{
- |"table":{"namespace":"default", "name":"htable"},
- |"rowkey":"key1:key2",
- |"columns":{
- |"col1":{"cf":"rowkey", "col":"key1", "type":"string"},
- |"col2":{"cf":"rowkey", "col":"key2", "type":"double"},
- |"col3":{"cf":"cf1", "col":"col2", "type":"binary"},
- |"col4":{"cf":"cf1", "col":"col3", "type":"timestamp"},
- |"col5":{"cf":"cf1", "col":"col4", "type":"double", "serdes":"${classOf[DoubleSerDes].getName}"},
- |"col6":{"cf":"cf1", "col":"col5", "type":"$map"},
- |"col7":{"cf":"cf1", "col":"col6", "type":"$array"},
- |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"}
- |}
- |}""".stripMargin
- */
- @deprecated("Please use new json format to define HBaseCatalog")
- // TODO: There is no need to deprecate since this is the first release.
- def convert(parameters: Map[String, String]): Map[String, String] = {
- val tableName = parameters.get(TABLE_KEY).getOrElse(null)
- // if the hbase.table is not defined, we assume it is json format already.
- if (tableName == null) return parameters
- val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "")
- import scala.collection.JavaConverters._
- val schemaMap = generateSchemaMappingMap(schemaMappingString).asScala.map(_._2.asInstanceOf[SchemaQualifierDefinition])
-
- val rowkey = schemaMap.filter {
- _.columnFamily == "rowkey"
- }.map(_.columnName)
- val cols = schemaMap.map { x =>
- s""""${x.columnName}":{"cf":"${x.columnFamily}", "col":"${x.qualifier}", "type":"${x.colType}"}""".stripMargin
- }
- val jsonCatalog =
- s"""{
- |"table":{"namespace":"default", "name":"${tableName}"},
- |"rowkey":"${rowkey.mkString(":")}",
- |"columns":{
- |${cols.mkString(",")}
- |}
- |}
- """.stripMargin
- parameters ++ Map(HBaseTableCatalog.tableCatalog->jsonCatalog)
- }
-
- /**
- * Reads the SCHEMA_COLUMNS_MAPPING_KEY and converts it to a map of
- * SchemaQualifierDefinitions with the original sql column name as the key
- *
- * @param schemaMappingString The schema mapping string from the SparkSQL map
- * @return A map of definitions keyed by the SparkSQL column name
- */
- @InterfaceAudience.Private
- def generateSchemaMappingMap(schemaMappingString:String):
- java.util.HashMap[String, SchemaQualifierDefinition] = {
- println(schemaMappingString)
- try {
- val columnDefinitions = schemaMappingString.split(',')
- val resultingMap = new java.util.HashMap[String, SchemaQualifierDefinition]()
- columnDefinitions.map(cd => {
- val parts = cd.trim.split(' ')
-
- //Make sure we get three parts
- //<ColumnName> <ColumnType> <ColumnFamily:Qualifier>
- if (parts.length == 3) {
- val hbaseDefinitionParts = if (parts(2).charAt(0) == ':') {
- Array[String]("rowkey", parts(0))
- } else {
- parts(2).split(':')
- }
- resultingMap.put(parts(0), new SchemaQualifierDefinition(parts(0),
- parts(1), hbaseDefinitionParts(0), hbaseDefinitionParts(1)))
- } else {
- throw new IllegalArgumentException("Invalid value for schema mapping '" + cd +
- "' should be '<columnName> <columnType> <columnFamily>:<qualifier>' " +
- "for columns and '<columnName> <columnType> :<qualifier>' for rowKeys")
- }
- })
- resultingMap
- } catch {
- case e:Exception => throw
- new IllegalArgumentException("Invalid value for " + SCHEMA_COLUMNS_MAPPING_KEY +
- " '" +
- schemaMappingString + "'", e )
- }
- }
-}
-
-/**
- * Construct to contains column data that spend SparkSQL and HBase
- *
- * @param columnName SparkSQL column name
- * @param colType SparkSQL column type
- * @param columnFamily HBase column family
- * @param qualifier HBase qualifier name
- */
-@InterfaceAudience.Private
-case class SchemaQualifierDefinition(columnName:String,
- colType:String,
- columnFamily:String,
- qualifier:String)
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala
deleted file mode 100644
index 36b8bbf..0000000
--- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/Utils.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.datasources.hbase
-
-import org.apache.hadoop.hbase.spark.AvroSerdes
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.sql.execution.SparkSqlSerializer
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-@InterfaceAudience.Private
-object Utils {
-
-
- /**
- * Parses the hbase field to it's corresponding
- * scala type which can then be put into a Spark GenericRow
- * which is then automatically converted by Spark.
- */
- def hbaseFieldToScalaType(
- f: Field,
- src: Array[Byte],
- offset: Int,
- length: Int): Any = {
- if (f.exeSchema.isDefined) {
- // If we have avro schema defined, use it to get record, and then convert them to catalyst data type
- val m = AvroSerdes.deserialize(src, f.exeSchema.get)
- val n = f.avroToCatalyst.map(_(m))
- n.get
- } else {
- // Fall back to atomic type
- f.dt match {
- case BooleanType => toBoolean(src, offset)
- case ByteType => src(offset)
- case DoubleType => Bytes.toDouble(src, offset)
- case FloatType => Bytes.toFloat(src, offset)
- case IntegerType => Bytes.toInt(src, offset)
- case LongType|TimestampType => Bytes.toLong(src, offset)
- case ShortType => Bytes.toShort(src, offset)
- case StringType => toUTF8String(src, offset, length)
- case BinaryType =>
- val newArray = new Array[Byte](length)
- System.arraycopy(src, offset, newArray, 0, length)
- newArray
- // TODO: add more data type support
- case _ => SparkSqlSerializer.deserialize[Any](src)
- }
- }
- }
-
- // convert input to data type
- def toBytes(input: Any, field: Field): Array[Byte] = {
- if (field.schema.isDefined) {
- // Here we assume the top level type is structType
- val record = field.catalystToAvro(input)
- AvroSerdes.serialize(record, field.schema.get)
- } else {
- input match {
- case data: Boolean => Bytes.toBytes(data)
- case data: Byte => Array(data)
- case data: Array[Byte] => data
- case data: Double => Bytes.toBytes(data)
- case data: Float => Bytes.toBytes(data)
- case data: Int => Bytes.toBytes(data)
- case data: Long => Bytes.toBytes(data)
- case data: Short => Bytes.toBytes(data)
- case data: UTF8String => data.getBytes
- case data: String => Bytes.toBytes(data)
- // TODO: add more data type support
- case _ => throw new Exception(s"unsupported data type ${field.dt}")
- }
- }
- }
-
- def toBoolean(input: Array[Byte], offset: Int): Boolean = {
- input(offset) != 0
- }
-
- def toUTF8String(input: Array[Byte], offset: Int, length: Int): UTF8String = {
- UTF8String.fromBytes(input.slice(offset, offset + length))
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
deleted file mode 100644
index e383b5e..0000000
--- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
+++ /dev/null
@@ -1,520 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import scala.Tuple2;
-import org.apache.hadoop.hbase.shaded.com.google.common.io.Files;
-
-@Category({MiscTests.class, MediumTests.class})
-public class TestJavaHBaseContext implements Serializable {
- private transient JavaSparkContext jsc;
- HBaseTestingUtility htu;
- protected static final Log LOG = LogFactory.getLog(TestJavaHBaseContext.class);
-
-
-
- byte[] tableName = Bytes.toBytes("t1");
- byte[] columnFamily = Bytes.toBytes("c");
- byte[] columnFamily1 = Bytes.toBytes("d");
- String columnFamilyStr = Bytes.toString(columnFamily);
- String columnFamilyStr1 = Bytes.toString(columnFamily1);
-
-
- @Before
- public void setUp() {
- jsc = new JavaSparkContext("local", "JavaHBaseContextSuite");
-
- File tempDir = Files.createTempDir();
- tempDir.deleteOnExit();
-
- htu = new HBaseTestingUtility();
- try {
- LOG.info("cleaning up test dir");
-
- htu.cleanupTestDir();
-
- LOG.info("starting minicluster");
-
- htu.startMiniZKCluster();
- htu.startMiniHBaseCluster(1, 1);
-
- LOG.info(" - minicluster started");
-
- try {
- htu.deleteTable(TableName.valueOf(tableName));
- } catch (Exception e) {
- LOG.info(" - no table " + Bytes.toString(tableName) + " found");
- }
-
- LOG.info(" - creating table " + Bytes.toString(tableName));
- htu.createTable(TableName.valueOf(tableName),
- new byte[][]{columnFamily, columnFamily1});
- LOG.info(" - created table");
- } catch (Exception e1) {
- throw new RuntimeException(e1);
- }
- }
-
- @After
- public void tearDown() {
- try {
- htu.deleteTable(TableName.valueOf(tableName));
- LOG.info("shuting down minicluster");
- htu.shutdownMiniHBaseCluster();
- htu.shutdownMiniZKCluster();
- LOG.info(" - minicluster shut down");
- htu.cleanupTestDir();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- jsc.stop();
- jsc = null;
- }
-
- @Test
- public void testBulkPut() throws IOException {
-
- List<String> list = new ArrayList<>(5);
- list.add("1," + columnFamilyStr + ",a,1");
- list.add("2," + columnFamilyStr + ",a,2");
- list.add("3," + columnFamilyStr + ",a,3");
- list.add("4," + columnFamilyStr + ",a,4");
- list.add("5," + columnFamilyStr + ",a,5");
-
- JavaRDD<String> rdd = jsc.parallelize(list);
-
- Configuration conf = htu.getConfiguration();
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- Connection conn = ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(TableName.valueOf(tableName));
-
- try {
- List<Delete> deletes = new ArrayList<>(5);
- for (int i = 1; i < 6; i++) {
- deletes.add(new Delete(Bytes.toBytes(Integer.toString(i))));
- }
- table.delete(deletes);
- } finally {
- table.close();
- }
-
- hbaseContext.bulkPut(rdd,
- TableName.valueOf(tableName),
- new PutFunction());
-
- table = conn.getTable(TableName.valueOf(tableName));
-
- try {
- Result result1 = table.get(new Get(Bytes.toBytes("1")));
- Assert.assertNotNull("Row 1 should had been deleted", result1.getRow());
-
- Result result2 = table.get(new Get(Bytes.toBytes("2")));
- Assert.assertNotNull("Row 2 should had been deleted", result2.getRow());
-
- Result result3 = table.get(new Get(Bytes.toBytes("3")));
- Assert.assertNotNull("Row 3 should had been deleted", result3.getRow());
-
- Result result4 = table.get(new Get(Bytes.toBytes("4")));
- Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
-
- Result result5 = table.get(new Get(Bytes.toBytes("5")));
- Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
- } finally {
- table.close();
- conn.close();
- }
- }
-
- public static class PutFunction implements Function<String, Put> {
-
- private static final long serialVersionUID = 1L;
-
- public Put call(String v) throws Exception {
- String[] cells = v.split(",");
- Put put = new Put(Bytes.toBytes(cells[0]));
-
- put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
- Bytes.toBytes(cells[3]));
- return put;
- }
- }
-
- @Test
- public void testBulkDelete() throws IOException {
- List<byte[]> list = new ArrayList<>(3);
- list.add(Bytes.toBytes("1"));
- list.add(Bytes.toBytes("2"));
- list.add(Bytes.toBytes("3"));
-
- JavaRDD<byte[]> rdd = jsc.parallelize(list);
-
- Configuration conf = htu.getConfiguration();
-
- populateTableWithMockData(conf, TableName.valueOf(tableName));
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- hbaseContext.bulkDelete(rdd, TableName.valueOf(tableName),
- new JavaHBaseBulkDeleteExample.DeleteFunction(), 2);
-
-
-
- try (
- Connection conn = ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(TableName.valueOf(tableName))
- ){
- Result result1 = table.get(new Get(Bytes.toBytes("1")));
- Assert.assertNull("Row 1 should had been deleted", result1.getRow());
-
- Result result2 = table.get(new Get(Bytes.toBytes("2")));
- Assert.assertNull("Row 2 should had been deleted", result2.getRow());
-
- Result result3 = table.get(new Get(Bytes.toBytes("3")));
- Assert.assertNull("Row 3 should had been deleted", result3.getRow());
-
- Result result4 = table.get(new Get(Bytes.toBytes("4")));
- Assert.assertNotNull("Row 4 should had been deleted", result4.getRow());
-
- Result result5 = table.get(new Get(Bytes.toBytes("5")));
- Assert.assertNotNull("Row 5 should had been deleted", result5.getRow());
- }
- }
-
- @Test
- public void testDistributedScan() throws IOException {
- Configuration conf = htu.getConfiguration();
-
- populateTableWithMockData(conf, TableName.valueOf(tableName));
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- Scan scan = new Scan();
- scan.setCaching(100);
-
- JavaRDD<String> javaRdd =
- hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
- .map(new ScanConvertFunction());
-
- List<String> results = javaRdd.collect();
-
- Assert.assertEquals(results.size(), 5);
- }
-
- private static class ScanConvertFunction implements
- Function<Tuple2<ImmutableBytesWritable, Result>, String> {
- @Override
- public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
- return Bytes.toString(v1._1().copyBytes());
- }
- }
-
- @Test
- public void testBulkGet() throws IOException {
- List<byte[]> list = new ArrayList<>(5);
- list.add(Bytes.toBytes("1"));
- list.add(Bytes.toBytes("2"));
- list.add(Bytes.toBytes("3"));
- list.add(Bytes.toBytes("4"));
- list.add(Bytes.toBytes("5"));
-
- JavaRDD<byte[]> rdd = jsc.parallelize(list);
-
- Configuration conf = htu.getConfiguration();
-
- populateTableWithMockData(conf, TableName.valueOf(tableName));
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- final JavaRDD<String> stringJavaRDD =
- hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd,
- new GetFunction(),
- new ResultFunction());
-
- Assert.assertEquals(stringJavaRDD.count(), 5);
- }
-
- @Test
- public void testBulkLoad() throws Exception {
-
- Path output = htu.getDataTestDir("testBulkLoad");
- // Add cell as String: "row,falmily,qualifier,value"
- List<String> list= new ArrayList<String>();
- // row1
- list.add("1," + columnFamilyStr + ",b,1");
- // row3
- list.add("3," + columnFamilyStr + ",a,2");
- list.add("3," + columnFamilyStr + ",b,1");
- list.add("3," + columnFamilyStr1 + ",a,1");
- //row2
- list.add("2," + columnFamilyStr + ",a,3");
- list.add("2," + columnFamilyStr + ",b,3");
-
- JavaRDD<String> rdd = jsc.parallelize(list);
-
- Configuration conf = htu.getConfiguration();
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
-
-
- hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName), new BulkLoadFunction(), output.toUri().getPath(),
- new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
-
- try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) {
- Table table = conn.getTable(TableName.valueOf(tableName));
- // Do bulk load
- LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
- load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName)));
-
-
-
- // Check row1
- List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells();
- Assert.assertEquals(cell1.size(), 1);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1");
-
- // Check row3
- List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells();
- Assert.assertEquals(cell3.size(), 3);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1");
-
- // Check row2
- List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells();
- Assert.assertEquals(cell2.size(), 2);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3");
- }
- }
-
- @Test
- public void testBulkLoadThinRows() throws Exception {
- Path output = htu.getDataTestDir("testBulkLoadThinRows");
- // because of the limitation of scala bulkLoadThinRows API
- // we need to provide data as <row, all cells in that row>
- List<List<String>> list= new ArrayList<List<String>>();
- // row1
- List<String> list1 = new ArrayList<String>();
- list1.add("1," + columnFamilyStr + ",b,1");
- list.add(list1);
- // row3
- List<String> list3 = new ArrayList<String>();
- list3.add("3," + columnFamilyStr + ",a,2");
- list3.add("3," + columnFamilyStr + ",b,1");
- list3.add("3," + columnFamilyStr1 + ",a,1");
- list.add(list3);
- //row2
- List<String> list2 = new ArrayList<String>();
- list2.add("2," + columnFamilyStr + ",a,3");
- list2.add("2," + columnFamilyStr + ",b,3");
- list.add(list2);
-
- JavaRDD<List<String>> rdd = jsc.parallelize(list);
-
- Configuration conf = htu.getConfiguration();
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- hbaseContext.bulkLoadThinRows(rdd, TableName.valueOf(tableName), new BulkLoadThinRowsFunction(), output.toString(),
- new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
-
-
- try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) {
- Table table = conn.getTable(TableName.valueOf(tableName));
- // Do bulk load
- LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
- load.doBulkLoad(output, admin, table, conn.getRegionLocator(TableName.valueOf(tableName)));
-
- // Check row1
- List<Cell> cell1 = table.get(new Get(Bytes.toBytes("1"))).listCells();
- Assert.assertEquals(cell1.size(), 1);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell1.get(0))), columnFamilyStr);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell1.get(0))), "b");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell1.get(0))), "1");
-
- // Check row3
- List<Cell> cell3 = table.get(new Get(Bytes.toBytes("3"))).listCells();
- Assert.assertEquals(cell3.size(), 3);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(0))), columnFamilyStr);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(0))), "a");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(0))), "2");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(1))), columnFamilyStr);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(1))), "b");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(1))), "1");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell3.get(2))), columnFamilyStr1);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell3.get(2))), "a");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell3.get(2))), "1");
-
- // Check row2
- List<Cell> cell2 = table.get(new Get(Bytes.toBytes("2"))).listCells();
- Assert.assertEquals(cell2.size(), 2);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(0))), columnFamilyStr);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(0))), "a");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(0))), "3");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneFamily(cell2.get(1))), columnFamilyStr);
- Assert.assertEquals(Bytes.toString(CellUtil.cloneQualifier(cell2.get(1))), "b");
- Assert.assertEquals(Bytes.toString(CellUtil.cloneValue(cell2.get(1))), "3");
- }
-
- }
- public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> {
-
- @Override public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception {
- if (v1 == null)
- return null;
- String[] strs = v1.split(",");
- if(strs.length != 4)
- return null;
- KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]),
- Bytes.toBytes(strs[2]));
- return new Pair(kfq, Bytes.toBytes(strs[3]));
- }
- }
-
- public static class BulkLoadThinRowsFunction implements Function<List<String>, Pair<ByteArrayWrapper, FamiliesQualifiersValues>> {
-
- @Override public Pair<ByteArrayWrapper, FamiliesQualifiersValues> call(List<String> list) throws Exception {
- if (list == null)
- return null;
- ByteArrayWrapper rowKey = null;
- FamiliesQualifiersValues fqv = new FamiliesQualifiersValues();
- for (String cell : list) {
- String[] strs = cell.split(",");
- if (rowKey == null) {
- rowKey = new ByteArrayWrapper(Bytes.toBytes(strs[0]));
- }
- fqv.add(Bytes.toBytes(strs[1]), Bytes.toBytes(strs[2]), Bytes.toBytes(strs[3]));
- }
- return new Pair(rowKey, fqv);
- }
- }
-
- public static class GetFunction implements Function<byte[], Get> {
-
- private static final long serialVersionUID = 1L;
-
- public Get call(byte[] v) throws Exception {
- return new Get(v);
- }
- }
-
- public static class ResultFunction implements Function<Result, String> {
-
- private static final long serialVersionUID = 1L;
-
- public String call(Result result) throws Exception {
- Iterator<Cell> it = result.listCells().iterator();
- StringBuilder b = new StringBuilder();
-
- b.append(Bytes.toString(result.getRow())).append(":");
-
- while (it.hasNext()) {
- Cell cell = it.next();
- String q = Bytes.toString(CellUtil.cloneQualifier(cell));
- if ("counter".equals(q)) {
- b.append("(")
- .append(q)
- .append(",")
- .append(Bytes.toLong(CellUtil.cloneValue(cell)))
- .append(")");
- } else {
- b.append("(")
- .append(q)
- .append(",")
- .append(Bytes.toString(CellUtil.cloneValue(cell)))
- .append(")");
- }
- }
- return b.toString();
- }
- }
-
- private void populateTableWithMockData(Configuration conf, TableName tableName)
- throws IOException {
- try (
- Connection conn = ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(tableName)) {
-
- List<Put> puts = new ArrayList<>(5);
-
- for (int i = 1; i < 6; i++) {
- Put put = new Put(Bytes.toBytes(Integer.toString(i)));
- put.addColumn(columnFamily, columnFamily, columnFamily);
- puts.add(put);
- }
- table.put(puts);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/resources/hbase-site.xml b/hbase-spark/src/test/resources/hbase-site.xml
deleted file mode 100644
index b3fb0d9..0000000
--- a/hbase-spark/src/test/resources/hbase-site.xml
+++ /dev/null
@@ -1,157 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!--
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
--->
-<configuration>
- <property>
- <name>hbase.regionserver.msginterval</name>
- <value>1000</value>
- <description>Interval between messages from the RegionServer to HMaster
- in milliseconds. Default is 15. Set this value low if you want unit
- tests to be responsive.
- </description>
- </property>
- <property>
- <name>hbase.defaults.for.version.skip</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.server.thread.wakefrequency</name>
- <value>1000</value>
- <description>Time to sleep in between searches for work (in milliseconds).
- Used as sleep interval by service threads such as hbase:meta scanner and log roller.
- </description>
- </property>
- <property>
- <name>hbase.master.event.waiting.time</name>
- <value>50</value>
- <description>Time to sleep between checks to see if a table event took place.
- </description>
- </property>
- <property>
- <name>hbase.regionserver.handler.count</name>
- <value>5</value>
- </property>
- <property>
- <name>hbase.regionserver.metahandler.count</name>
- <value>5</value>
- </property>
- <property>
- <name>hbase.ipc.server.read.threadpool.size</name>
- <value>3</value>
- </property>
- <property>
- <name>hbase.master.info.port</name>
- <value>-1</value>
- <description>The port for the hbase master web UI
- Set to -1 if you do not want the info server to run.
- </description>
- </property>
- <property>
- <name>hbase.master.port</name>
- <value>0</value>
- <description>Always have masters and regionservers come up on port '0' so we don't clash over
- default ports.
- </description>
- </property>
- <property>
- <name>hbase.regionserver.port</name>
- <value>0</value>
- <description>Always have masters and regionservers come up on port '0' so we don't clash over
- default ports.
- </description>
- </property>
- <property>
- <name>hbase.ipc.client.fallback-to-simple-auth-allowed</name>
- <value>true</value>
- </property>
-
- <property>
- <name>hbase.regionserver.info.port</name>
- <value>-1</value>
- <description>The port for the hbase regionserver web UI
- Set to -1 if you do not want the info server to run.
- </description>
- </property>
- <property>
- <name>hbase.regionserver.info.port.auto</name>
- <value>true</value>
- <description>Info server auto port bind. Enables automatic port
- search if hbase.regionserver.info.port is already in use.
- Enabled for testing to run multiple tests on one machine.
- </description>
- </property>
- <property>
- <name>hbase.regionserver.safemode</name>
- <value>false</value>
- <description>
- Turn on/off safe mode in region server. Always on for production, always off
- for tests.
- </description>
- </property>
- <property>
- <name>hbase.hregion.max.filesize</name>
- <value>67108864</value>
- <description>
- Maximum desired file size for an HRegion. If filesize exceeds
- value + (value / 2), the HRegion is split in two. Default: 256M.
-
- Keep the maximum filesize small so we split more often in tests.
- </description>
- </property>
- <property>
- <name>hadoop.log.dir</name>
- <value>${user.dir}/../logs</value>
- </property>
- <property>
- <name>hbase.zookeeper.property.clientPort</name>
- <value>21818</value>
- <description>Property from ZooKeeper's config zoo.cfg.
- The port at which the clients will connect.
- </description>
- </property>
- <property>
- <name>hbase.defaults.for.version.skip</name>
- <value>true</value>
- <description>
- Set to true to skip the 'hbase.defaults.for.version'.
- Setting this to true can be useful in contexts other than
- the other side of a maven generation; i.e. running in an
- ide. You'll want to set this boolean to true to avoid
- seeing the RuntimeException complaint: "hbase-default.xml file
- seems to be for and old version of HBase (@@@VERSION@@@), this
- version is X.X.X-SNAPSHOT"
- </description>
- </property>
- <property>
- <name>hbase.table.sanity.checks</name>
- <value>false</value>
- <description>Skip sanity checks in tests
- </description>
- </property>
- <property>
- <name>hbase.procedure.fail.on.corruption</name>
- <value>true</value>
- <description>
- Enable replay sanity checks on procedure tests.
- </description>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/resources/log4j.properties b/hbase-spark/src/test/resources/log4j.properties
deleted file mode 100644
index cd3b8e9..0000000
--- a/hbase-spark/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Define some default values that can be overridden by system properties
-hbase.root.logger=INFO,FA
-hbase.log.dir=.
-hbase.log.file=hbase.log
-
-# Define the root logger to the system property "hbase.root.logger".
-log4j.rootLogger=${hbase.root.logger}
-
-# Logging Threshold
-log4j.threshold=ALL
-
-#
-# Daily Rolling File Appender
-#
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file}
-
-# Rollver at midnight
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-
-# 30-day backup
-#log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-# Debugging Pattern format
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
-
-
-#
-# console
-# Add "console" to rootlogger above if you want to use this
-#
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
-
-#File Appender
-log4j.appender.FA=org.apache.log4j.FileAppender
-log4j.appender.FA.append=false
-log4j.appender.FA.file=target/log-output.txt
-log4j.appender.FA.layout=org.apache.log4j.PatternLayout
-log4j.appender.FA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
-log4j.appender.FA.Threshold = INFO
-
-# Custom Logging levels
-
-#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
-
-log4j.logger.org.apache.hadoop=WARN
-log4j.logger.org.apache.zookeeper=ERROR
-log4j.logger.org.apache.hadoop.hbase=DEBUG
-
-#These settings are workarounds against spurious logs from the minicluster.
-#See HBASE-4709
-log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN
-log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN
-log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN
-log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN
-# Enable this to get detailed connection error/retry logging.
-# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE