You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/09 04:59:51 UTC
[5/9] hbase git commit: HBASE-18817 pull the hbase-spark module out
of branch-2.
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala
deleted file mode 100644
index 9eeabc5..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.io.ByteArrayInputStream
-import java.nio.ByteBuffer
-import java.sql.Timestamp
-import java.util
-import java.util.HashMap
-
-import org.apache.avro.SchemaBuilder.BaseFieldTypeBuilder
-import org.apache.avro.SchemaBuilder.BaseTypeBuilder
-import org.apache.avro.SchemaBuilder.FieldAssembler
-import org.apache.avro.SchemaBuilder.FieldDefault
-import org.apache.avro.SchemaBuilder.RecordBuilder
-import org.apache.avro.io._
-import org.apache.commons.io.output.ByteArrayOutputStream
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes
-
-import scala.collection.JavaConversions._
-
-import org.apache.avro.{SchemaBuilder, Schema}
-import org.apache.avro.Schema.Type._
-import org.apache.avro.generic.GenericData.{Record, Fixed}
-import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericData, GenericRecord}
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.types._
-
-import scala.collection.immutable.Map
-
-@InterfaceAudience.Private
-abstract class AvroException(msg: String) extends Exception(msg)
-
-@InterfaceAudience.Private
-case class SchemaConversionException(msg: String) extends AvroException(msg)
-
-/***
- * On top level, the converters provide three high level interface.
- * 1. toSqlType: This function takes an avro schema and returns a sql schema.
- * 2. createConverterToSQL: Returns a function that is used to convert avro types to their
- * corresponding sparkSQL representations.
- * 3. convertTypeToAvro: This function constructs converter function for a given sparkSQL
- * datatype. This is used in writing Avro records out to disk
- */
-@InterfaceAudience.Private
-object SchemaConverters {
-
- case class SchemaType(dataType: DataType, nullable: Boolean)
-
- /**
- * This function takes an avro schema and returns a sql schema.
- */
- def toSqlType(avroSchema: Schema): SchemaType = {
- avroSchema.getType match {
- case INT => SchemaType(IntegerType, nullable = false)
- case STRING => SchemaType(StringType, nullable = false)
- case BOOLEAN => SchemaType(BooleanType, nullable = false)
- case BYTES => SchemaType(BinaryType, nullable = false)
- case DOUBLE => SchemaType(DoubleType, nullable = false)
- case FLOAT => SchemaType(FloatType, nullable = false)
- case LONG => SchemaType(LongType, nullable = false)
- case FIXED => SchemaType(BinaryType, nullable = false)
- case ENUM => SchemaType(StringType, nullable = false)
-
- case RECORD =>
- val fields = avroSchema.getFields.map { f =>
- val schemaType = toSqlType(f.schema())
- StructField(f.name, schemaType.dataType, schemaType.nullable)
- }
-
- SchemaType(StructType(fields), nullable = false)
-
- case ARRAY =>
- val schemaType = toSqlType(avroSchema.getElementType)
- SchemaType(
- ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
- nullable = false)
-
- case MAP =>
- val schemaType = toSqlType(avroSchema.getValueType)
- SchemaType(
- MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
- nullable = false)
-
- case UNION =>
- if (avroSchema.getTypes.exists(_.getType == NULL)) {
- // In case of a union with null, eliminate it and make a recursive call
- val remainingUnionTypes = avroSchema.getTypes.filterNot(_.getType == NULL)
- if (remainingUnionTypes.size == 1) {
- toSqlType(remainingUnionTypes.get(0)).copy(nullable = true)
- } else {
- toSqlType(Schema.createUnion(remainingUnionTypes)).copy(nullable = true)
- }
- } else avroSchema.getTypes.map(_.getType) match {
- case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
- SchemaType(LongType, nullable = false)
- case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
- SchemaType(DoubleType, nullable = false)
- case other => throw new SchemaConversionException(
- s"This mix of union types is not supported: $other")
- }
-
- case other => throw new SchemaConversionException(s"Unsupported type $other")
- }
- }
-
- /**
- * This function converts sparkSQL StructType into avro schema. This method uses two other
- * converter methods in order to do the conversion.
- */
- private def convertStructToAvro[T](
- structType: StructType,
- schemaBuilder: RecordBuilder[T],
- recordNamespace: String): T = {
- val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields()
- structType.fields.foreach { field =>
- val newField = fieldsAssembler.name(field.name).`type`()
-
- if (field.nullable) {
- convertFieldTypeToAvro(field.dataType, newField.nullable(), field.name, recordNamespace)
- .noDefault
- } else {
- convertFieldTypeToAvro(field.dataType, newField, field.name, recordNamespace)
- .noDefault
- }
- }
- fieldsAssembler.endRecord()
- }
-
- /**
- * Returns a function that is used to convert avro types to their
- * corresponding sparkSQL representations.
- */
- def createConverterToSQL(schema: Schema): Any => Any = {
- schema.getType match {
- // Avro strings are in Utf8, so we have to call toString on them
- case STRING | ENUM => (item: Any) => if (item == null) null else item.toString
- case INT | BOOLEAN | DOUBLE | FLOAT | LONG => identity
- // Byte arrays are reused by avro, so we have to make a copy of them.
- case FIXED => (item: Any) => if (item == null) {
- null
- } else {
- item.asInstanceOf[Fixed].bytes().clone()
- }
- case BYTES => (item: Any) => if (item == null) {
- null
- } else {
- val bytes = item.asInstanceOf[ByteBuffer]
- val javaBytes = new Array[Byte](bytes.remaining)
- bytes.get(javaBytes)
- javaBytes
- }
- case RECORD =>
- val fieldConverters = schema.getFields.map(f => createConverterToSQL(f.schema))
- (item: Any) => if (item == null) {
- null
- } else {
- val record = item.asInstanceOf[GenericRecord]
- val converted = new Array[Any](fieldConverters.size)
- var idx = 0
- while (idx < fieldConverters.size) {
- converted(idx) = fieldConverters.apply(idx)(record.get(idx))
- idx += 1
- }
- Row.fromSeq(converted.toSeq)
- }
- case ARRAY =>
- val elementConverter = createConverterToSQL(schema.getElementType)
- (item: Any) => if (item == null) {
- null
- } else {
- try {
- item.asInstanceOf[GenericData.Array[Any]].map(elementConverter)
- } catch {
- case e: Throwable =>
- item.asInstanceOf[util.ArrayList[Any]].map(elementConverter)
- }
- }
- case MAP =>
- val valueConverter = createConverterToSQL(schema.getValueType)
- (item: Any) => if (item == null) {
- null
- } else {
- item.asInstanceOf[HashMap[Any, Any]].map(x => (x._1.toString, valueConverter(x._2))).toMap
- }
- case UNION =>
- if (schema.getTypes.exists(_.getType == NULL)) {
- val remainingUnionTypes = schema.getTypes.filterNot(_.getType == NULL)
- if (remainingUnionTypes.size == 1) {
- createConverterToSQL(remainingUnionTypes.get(0))
- } else {
- createConverterToSQL(Schema.createUnion(remainingUnionTypes))
- }
- } else schema.getTypes.map(_.getType) match {
- case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
- (item: Any) => {
- item match {
- case l: Long => l
- case i: Int => i.toLong
- case null => null
- }
- }
- case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
- (item: Any) => {
- item match {
- case d: Double => d
- case f: Float => f.toDouble
- case null => null
- }
- }
- case other => throw new SchemaConversionException(
- s"This mix of union types is not supported (see README): $other")
- }
- case other => throw new SchemaConversionException(s"invalid avro type: $other")
- }
- }
-
- /**
- * This function is used to convert some sparkSQL type to avro type. Note that this function won't
- * be used to construct fields of avro record (convertFieldTypeToAvro is used for that).
- */
- private def convertTypeToAvro[T](
- dataType: DataType,
- schemaBuilder: BaseTypeBuilder[T],
- structName: String,
- recordNamespace: String): T = {
- dataType match {
- case ByteType => schemaBuilder.intType()
- case ShortType => schemaBuilder.intType()
- case IntegerType => schemaBuilder.intType()
- case LongType => schemaBuilder.longType()
- case FloatType => schemaBuilder.floatType()
- case DoubleType => schemaBuilder.doubleType()
- case _: DecimalType => schemaBuilder.stringType()
- case StringType => schemaBuilder.stringType()
- case BinaryType => schemaBuilder.bytesType()
- case BooleanType => schemaBuilder.booleanType()
- case TimestampType => schemaBuilder.longType()
-
- case ArrayType(elementType, _) =>
- val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull)
- val elementSchema = convertTypeToAvro(elementType, builder, structName, recordNamespace)
- schemaBuilder.array().items(elementSchema)
-
- case MapType(StringType, valueType, _) =>
- val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull)
- val valueSchema = convertTypeToAvro(valueType, builder, structName, recordNamespace)
- schemaBuilder.map().values(valueSchema)
-
- case structType: StructType =>
- convertStructToAvro(
- structType,
- schemaBuilder.record(structName).namespace(recordNamespace),
- recordNamespace)
-
- case other => throw new IllegalArgumentException(s"Unexpected type $dataType.")
- }
- }
-
- /**
- * This function is used to construct fields of the avro record, where schema of the field is
- * specified by avro representation of dataType. Since builders for record fields are different
- * from those for everything else, we have to use a separate method.
- */
- private def convertFieldTypeToAvro[T](
- dataType: DataType,
- newFieldBuilder: BaseFieldTypeBuilder[T],
- structName: String,
- recordNamespace: String): FieldDefault[T, _] = {
- dataType match {
- case ByteType => newFieldBuilder.intType()
- case ShortType => newFieldBuilder.intType()
- case IntegerType => newFieldBuilder.intType()
- case LongType => newFieldBuilder.longType()
- case FloatType => newFieldBuilder.floatType()
- case DoubleType => newFieldBuilder.doubleType()
- case _: DecimalType => newFieldBuilder.stringType()
- case StringType => newFieldBuilder.stringType()
- case BinaryType => newFieldBuilder.bytesType()
- case BooleanType => newFieldBuilder.booleanType()
- case TimestampType => newFieldBuilder.longType()
-
- case ArrayType(elementType, _) =>
- val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull)
- val elementSchema = convertTypeToAvro(elementType, builder, structName, recordNamespace)
- newFieldBuilder.array().items(elementSchema)
-
- case MapType(StringType, valueType, _) =>
- val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull)
- val valueSchema = convertTypeToAvro(valueType, builder, structName, recordNamespace)
- newFieldBuilder.map().values(valueSchema)
-
- case structType: StructType =>
- convertStructToAvro(
- structType,
- newFieldBuilder.record(structName).namespace(recordNamespace),
- recordNamespace)
-
- case other => throw new IllegalArgumentException(s"Unexpected type $dataType.")
- }
- }
-
- private def getSchemaBuilder(isNullable: Boolean): BaseTypeBuilder[Schema] = {
- if (isNullable) {
- SchemaBuilder.builder().nullable()
- } else {
- SchemaBuilder.builder()
- }
- }
- /**
- * This function constructs converter function for a given sparkSQL datatype. This is used in
- * writing Avro records out to disk
- */
- def createConverterToAvro(
- dataType: DataType,
- structName: String,
- recordNamespace: String): (Any) => Any = {
- dataType match {
- case BinaryType => (item: Any) => item match {
- case null => null
- case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
- }
- case ByteType | ShortType | IntegerType | LongType |
- FloatType | DoubleType | StringType | BooleanType => identity
- case _: DecimalType => (item: Any) => if (item == null) null else item.toString
- case TimestampType => (item: Any) =>
- if (item == null) null else item.asInstanceOf[Timestamp].getTime
- case ArrayType(elementType, _) =>
- val elementConverter = createConverterToAvro(elementType, structName, recordNamespace)
- (item: Any) => {
- if (item == null) {
- null
- } else {
- val sourceArray = item.asInstanceOf[Seq[Any]]
- val sourceArraySize = sourceArray.size
- val targetArray = new util.ArrayList[Any](sourceArraySize)
- var idx = 0
- while (idx < sourceArraySize) {
- targetArray.add(elementConverter(sourceArray(idx)))
- idx += 1
- }
- targetArray
- }
- }
- case MapType(StringType, valueType, _) =>
- val valueConverter = createConverterToAvro(valueType, structName, recordNamespace)
- (item: Any) => {
- if (item == null) {
- null
- } else {
- val javaMap = new HashMap[String, Any]()
- item.asInstanceOf[Map[String, Any]].foreach { case (key, value) =>
- javaMap.put(key, valueConverter(value))
- }
- javaMap
- }
- }
- case structType: StructType =>
- val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
- val schema: Schema = SchemaConverters.convertStructToAvro(
- structType, builder, recordNamespace)
- val fieldConverters = structType.fields.map(field =>
- createConverterToAvro(field.dataType, field.name, recordNamespace))
- (item: Any) => {
- if (item == null) {
- null
- } else {
- val record = new Record(schema)
- val convertersIterator = fieldConverters.iterator
- val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator
- val rowIterator = item.asInstanceOf[Row].toSeq.iterator
-
- while (convertersIterator.hasNext) {
- val converter = convertersIterator.next()
- record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
- }
- record
- }
- }
- }
- }
-}
-
-@InterfaceAudience.Private
-object AvroSerdes {
- // We only handle top level is record or primary type now
- def serialize(input: Any, schema: Schema): Array[Byte]= {
- schema.getType match {
- case BOOLEAN => Bytes.toBytes(input.asInstanceOf[Boolean])
- case BYTES | FIXED=> input.asInstanceOf[Array[Byte]]
- case DOUBLE => Bytes.toBytes(input.asInstanceOf[Double])
- case FLOAT => Bytes.toBytes(input.asInstanceOf[Float])
- case INT => Bytes.toBytes(input.asInstanceOf[Int])
- case LONG => Bytes.toBytes(input.asInstanceOf[Long])
- case STRING => Bytes.toBytes(input.asInstanceOf[String])
- case RECORD =>
- val gr = input.asInstanceOf[GenericRecord]
- val writer2 = new GenericDatumWriter[GenericRecord](schema)
- val bao2 = new ByteArrayOutputStream()
- val encoder2: BinaryEncoder = EncoderFactory.get().directBinaryEncoder(bao2, null)
- writer2.write(gr, encoder2)
- bao2.toByteArray()
- case _ => throw new Exception(s"unsupported data type ${schema.getType}") //TODO
- }
- }
-
- def deserialize(input: Array[Byte], schema: Schema): GenericRecord = {
- val reader2: DatumReader[GenericRecord] = new GenericDatumReader[GenericRecord](schema)
- val bai2 = new ByteArrayInputStream(input)
- val decoder2: BinaryDecoder = DecoderFactory.get().directBinaryDecoder(bai2, null)
- val gr2: GenericRecord = reader2.read(null, decoder2)
- gr2
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala
deleted file mode 100644
index 98cc871..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerDes.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import java.io.ByteArrayInputStream
-
-import org.apache.avro.Schema
-import org.apache.avro.Schema.Type._
-import org.apache.avro.generic.GenericDatumReader
-import org.apache.avro.generic.GenericDatumWriter
-import org.apache.avro.generic.GenericRecord
-import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
-import org.apache.avro.io._
-import org.apache.commons.io.output.ByteArrayOutputStream
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.sql.types._
-
-// TODO: This is not really used in code.
-trait SerDes {
- def serialize(value: Any): Array[Byte]
- def deserialize(bytes: Array[Byte], start: Int, end: Int): Any
-}
-
-// TODO: This is not really used in code.
-class DoubleSerDes extends SerDes {
- override def serialize(value: Any): Array[Byte] = Bytes.toBytes(value.asInstanceOf[Double])
- override def deserialize(bytes: Array[Byte], start: Int, end: Int): Any = {
- Bytes.toDouble(bytes, start)
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
deleted file mode 100644
index 0e2b6f4..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SerializableConfiguration.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.datasources
-
-import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.yetus.audience.InterfaceAudience;
-
-import scala.util.control.NonFatal
-
-@InterfaceAudience.Private
-class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
- private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException {
- out.defaultWriteObject()
- value.write(out)
- }
-
- private def readObject(in: ObjectInputStream): Unit = tryOrIOException {
- value = new Configuration(false)
- value.readFields(in)
- }
-
- def tryOrIOException(block: => Unit) {
- try {
- block
- } catch {
- case e: IOException => throw e
- case NonFatal(t) => throw new IOException(t)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
deleted file mode 100644
index ce7b55a..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import org.apache.hadoop.hbase.util.Bytes
-
-import scala.math.Ordering
-
-package object hbase {
- type HBaseType = Array[Byte]
- def bytesMin = new Array[Byte](0)
- def bytesMax = null
- val ByteMax = -1.asInstanceOf[Byte]
- val ByteMin = 0.asInstanceOf[Byte]
- val ord: Ordering[HBaseType] = new Ordering[HBaseType] {
- def compare(x: Array[Byte], y: Array[Byte]): Int = {
- return Bytes.compareTo(x, y)
- }
- }
- //Do not use BinaryType.ordering
- implicit val order: Ordering[HBaseType] = ord
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala
deleted file mode 100644
index c09e99d..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.datasources
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.hadoop.hbase.spark.AvroSerdes
-import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{SparkConf, SparkContext}
-
-/**
- * @param col0 Column #0, Type is String
- * @param col1 Column #1, Type is Array[Byte]
- */
-case class AvroHBaseRecord(col0: String,
- col1: Array[Byte])
-
-object AvroHBaseRecord {
- val schemaString =
- s"""{"namespace": "example.avro",
- | "type": "record", "name": "User",
- | "fields": [
- | {"name": "name", "type": "string"},
- | {"name": "favorite_number", "type": ["int", "null"]},
- | {"name": "favorite_color", "type": ["string", "null"]},
- | {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
- | {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
- | ] }""".stripMargin
-
- val avroSchema: Schema = {
- val p = new Schema.Parser
- p.parse(schemaString)
- }
-
- def apply(i: Int): AvroHBaseRecord = {
-
- val user = new GenericData.Record(avroSchema);
- user.put("name", s"name${"%03d".format(i)}")
- user.put("favorite_number", i)
- user.put("favorite_color", s"color${"%03d".format(i)}")
- val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
- favoriteArray.add(s"number${i}")
- favoriteArray.add(s"number${i+1}")
- user.put("favorite_array", favoriteArray)
- import collection.JavaConverters._
- val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
- user.put("favorite_map", favoriteMap)
- val avroByte = AvroSerdes.serialize(user, avroSchema)
- AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
- }
-}
-
-object AvroSource {
- def catalog = s"""{
- |"table":{"namespace":"default", "name":"ExampleAvrotable"},
- |"rowkey":"key",
- |"columns":{
- |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
- |"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
- |}
- |}""".stripMargin
-
- def avroCatalog = s"""{
- |"table":{"namespace":"default", "name":"ExampleAvrotable"},
- |"rowkey":"key",
- |"columns":{
- |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
- |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
- |}
- |}""".stripMargin
-
- def avroCatalogInsert = s"""{
- |"table":{"namespace":"default", "name":"ExampleAvrotableInsert"},
- |"rowkey":"key",
- |"columns":{
- |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
- |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
- |}
- |}""".stripMargin
-
- def main(args: Array[String]) {
- val sparkConf = new SparkConf().setAppName("AvroSourceExample")
- val sc = new SparkContext(sparkConf)
- val sqlContext = new SQLContext(sc)
-
- import sqlContext.implicits._
-
- def withCatalog(cat: String): DataFrame = {
- sqlContext
- .read
- .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
- .format("org.apache.hadoop.hbase.spark")
- .load()
- }
-
- val data = (0 to 255).map { i =>
- AvroHBaseRecord(i)
- }
-
- sc.parallelize(data).toDF.write.options(
- Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
- .format("org.apache.hadoop.hbase.spark")
- .save()
-
- val df = withCatalog(catalog)
- df.show()
- df.printSchema()
- df.registerTempTable("ExampleAvrotable")
- val c = sqlContext.sql("select count(1) from ExampleAvrotable")
- c.show()
-
- val filtered = df.select($"col0", $"col1.favorite_array").where($"col0" === "name001")
- filtered.show()
- val collected = filtered.collect()
- if (collected(0).getSeq[String](1)(0) != "number1") {
- throw new UserCustomizedSampleException("value invalid")
- }
- if (collected(0).getSeq[String](1)(1) != "number2") {
- throw new UserCustomizedSampleException("value invalid")
- }
-
- df.write.options(
- Map("avroSchema"->AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog->avroCatalogInsert,
- HBaseTableCatalog.newTable -> "5"))
- .format("org.apache.hadoop.hbase.spark")
- .save()
- val newDF = withCatalog(avroCatalogInsert)
- newDF.show()
- newDF.printSchema()
- if(newDF.count() != 256) {
- throw new UserCustomizedSampleException("value invalid")
- }
-
- df.filter($"col1.name" === "name005" || $"col1.name" <= "name005")
- .select("col0", "col1.favorite_color", "col1.favorite_number")
- .show()
-
- df.filter($"col1.name" <= "name005" || $"col1.name".contains("name007"))
- .select("col0", "col1.favorite_color", "col1.favorite_number")
- .show()
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/DataType.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/DataType.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/DataType.scala
deleted file mode 100644
index 96c6d6e..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/DataType.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.datasources
-
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
-
-class UserCustomizedSampleException(message: String = null, cause: Throwable = null) extends
- RuntimeException(UserCustomizedSampleException.message(message, cause), cause)
-
-object UserCustomizedSampleException {
- def message(message: String, cause: Throwable) =
- if (message != null) message
- else if (cause != null) cause.toString()
- else null
-}
-
-case class IntKeyRecord(
- col0: Integer,
- col1: Boolean,
- col2: Double,
- col3: Float,
- col4: Int,
- col5: Long,
- col6: Short,
- col7: String,
- col8: Byte)
-
-object IntKeyRecord {
- def apply(i: Int): IntKeyRecord = {
- IntKeyRecord(if (i % 2 == 0) i else -i,
- i % 2 == 0,
- i.toDouble,
- i.toFloat,
- i,
- i.toLong,
- i.toShort,
- s"String$i extra",
- i.toByte)
- }
-}
-
-object DataType {
- val cat = s"""{
- |"table":{"namespace":"default", "name":"DataTypeExampleTable"},
- |"rowkey":"key",
- |"columns":{
- |"col0":{"cf":"rowkey", "col":"key", "type":"int"},
- |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
- |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
- |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
- |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
- |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
- |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
- |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
- |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
- |}
- |}""".stripMargin
-
- def main(args: Array[String]){
- val sparkConf = new SparkConf().setAppName("DataTypeExample")
- val sc = new SparkContext(sparkConf)
- val sqlContext = new SQLContext(sc)
-
- import sqlContext.implicits._
-
- def withCatalog(cat: String): DataFrame = {
- sqlContext
- .read
- .options(Map(HBaseTableCatalog.tableCatalog->cat))
- .format("org.apache.hadoop.hbase.spark")
- .load()
- }
-
- // test populate table
- val data = (0 until 32).map { i =>
- IntKeyRecord(i)
- }
- sc.parallelize(data).toDF.write.options(
- Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5"))
- .format("org.apache.hadoop.hbase.spark")
- .save()
-
- // test less than 0
- val df = withCatalog(cat)
- val s = df.filter($"col0" < 0)
- s.show()
- if(s.count() != 16){
- throw new UserCustomizedSampleException("value invalid")
- }
-
- //test less or equal than -10. The number of results is 11
- val num1 = df.filter($"col0" <= -10)
- num1.show()
- val c1 = num1.count()
- println(s"test result count should be 11: $c1")
-
- //test less or equal than -9. The number of results is 12
- val num2 = df.filter($"col0" <= -9)
- num2.show()
- val c2 = num2.count()
- println(s"test result count should be 12: $c2")
-
- //test greater or equal than -9". The number of results is 21
- val num3 = df.filter($"col0" >= -9)
- num3.show()
- val c3 = num3.count()
- println(s"test result count should be 21: $c3")
-
- //test greater or equal than 0. The number of results is 16
- val num4 = df.filter($"col0" >= 0)
- num4.show()
- val c4 = num4.count()
- println(s"test result count should be 16: $c4")
-
- //test greater than 10. The number of results is 10
- val num5 = df.filter($"col0" > 10)
- num5.show()
- val c5 = num5.count()
- println(s"test result count should be 10: $c5")
-
- // test "and". The number of results is 11
- val num6 = df.filter($"col0" > -10 && $"col0" <= 10)
- num6.show()
- val c6 = num6.count()
- println(s"test result count should be 11: $c6")
-
- //test "or". The number of results is 21
- val num7 = df.filter($"col0" <= -10 || $"col0" > 10)
- num7.show()
- val c7 = num7.count()
- println(s"test result count should be 21: $c7")
-
- //test "all". The number of results is 32
- val num8 = df.filter($"col0" >= -100)
- num8.show()
- val c8 = num8.count()
- println(s"test result count should be 32: $c8")
-
- //test "full query"
- val df1 = withCatalog(cat)
- df1.show()
- val c_df = df1.count()
- println(s"df count should be 32: $c_df")
- if(c_df != 32){
- throw new UserCustomizedSampleException("value invalid")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/HBaseSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/HBaseSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/HBaseSource.scala
deleted file mode 100644
index 056c071..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/HBaseSource.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.datasources
-
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
-
-case class HBaseRecord(
- col0: String,
- col1: Boolean,
- col2: Double,
- col3: Float,
- col4: Int,
- col5: Long,
- col6: Short,
- col7: String,
- col8: Byte)
-
-object HBaseRecord {
- def apply(i: Int): HBaseRecord = {
- val s = s"""row${"%03d".format(i)}"""
- HBaseRecord(s,
- i % 2 == 0,
- i.toDouble,
- i.toFloat,
- i,
- i.toLong,
- i.toShort,
- s"String$i extra",
- i.toByte)
- }
-}
-
-object HBaseSource {
- val cat = s"""{
- |"table":{"namespace":"default", "name":"HBaseSourceExampleTable"},
- |"rowkey":"key",
- |"columns":{
- |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
- |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
- |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
- |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
- |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
- |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
- |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
- |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
- |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
- |}
- |}""".stripMargin
-
- def main(args: Array[String]) {
- val sparkConf = new SparkConf().setAppName("HBaseSourceExample")
- val sc = new SparkContext(sparkConf)
- val sqlContext = new SQLContext(sc)
-
- import sqlContext.implicits._
-
- def withCatalog(cat: String): DataFrame = {
- sqlContext
- .read
- .options(Map(HBaseTableCatalog.tableCatalog->cat))
- .format("org.apache.hadoop.hbase.spark")
- .load()
- }
-
- val data = (0 to 255).map { i =>
- HBaseRecord(i)
- }
-
- sc.parallelize(data).toDF.write.options(
- Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5"))
- .format("org.apache.hadoop.hbase.spark")
- .save()
-
- val df = withCatalog(cat)
- df.show()
- df.filter($"col0" <= "row005")
- .select($"col0", $"col1").show
- df.filter($"col0" === "row005" || $"col0" <= "row005")
- .select($"col0", $"col1").show
- df.filter($"col0" > "row250")
- .select($"col0", $"col1").show
- df.registerTempTable("table1")
- val c = sqlContext.sql("select count(col1) from table1 where col0 < 'row050'")
- c.show()
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala
deleted file mode 100644
index 46135a5..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.hbasecontext
-
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.spark.SparkContext
-import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.client.Delete
-import org.apache.spark.SparkConf
-
-/**
- * This is a simple example of deleting records in HBase
- * with the bulkDelete function.
- */
-object HBaseBulkDeleteExample {
- def main(args: Array[String]) {
- if (args.length < 1) {
- println("HBaseBulkDeleteExample {tableName} missing an argument")
- return
- }
-
- val tableName = args(0)
-
- val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName)
- val sc = new SparkContext(sparkConf)
- try {
- //[Array[Byte]]
- val rdd = sc.parallelize(Array(
- Bytes.toBytes("1"),
- Bytes.toBytes("2"),
- Bytes.toBytes("3"),
- Bytes.toBytes("4"),
- Bytes.toBytes("5")
- ))
-
- val conf = HBaseConfiguration.create()
-
- val hbaseContext = new HBaseContext(sc, conf)
- hbaseContext.bulkDelete[Array[Byte]](rdd,
- TableName.valueOf(tableName),
- putRecord => new Delete(putRecord),
- 4)
- } finally {
- sc.stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala
deleted file mode 100644
index 1bdc90d..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkGetExample.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.hbasecontext
-
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.spark.SparkContext
-import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.client.Get
-import org.apache.hadoop.hbase.client.Result
-import org.apache.spark.SparkConf
-
-/**
- * This is a simple example of getting records from HBase
- * with the bulkGet function.
- */
-object HBaseBulkGetExample {
- def main(args: Array[String]) {
- if (args.length < 1) {
- println("HBaseBulkGetExample {tableName} missing an argument")
- return
- }
-
- val tableName = args(0)
-
- val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName)
- val sc = new SparkContext(sparkConf)
-
- try {
-
- //[(Array[Byte])]
- val rdd = sc.parallelize(Array(
- Bytes.toBytes("1"),
- Bytes.toBytes("2"),
- Bytes.toBytes("3"),
- Bytes.toBytes("4"),
- Bytes.toBytes("5"),
- Bytes.toBytes("6"),
- Bytes.toBytes("7")))
-
- val conf = HBaseConfiguration.create()
-
- val hbaseContext = new HBaseContext(sc, conf)
-
- val getRdd = hbaseContext.bulkGet[Array[Byte], String](
- TableName.valueOf(tableName),
- 2,
- rdd,
- record => {
- System.out.println("making Get")
- new Get(record)
- },
- (result: Result) => {
-
- val it = result.listCells().iterator()
- val b = new StringBuilder
-
- b.append(Bytes.toString(result.getRow) + ":")
-
- while (it.hasNext) {
- val cell = it.next()
- val q = Bytes.toString(CellUtil.cloneQualifier(cell))
- if (q.equals("counter")) {
- b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
- } else {
- b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
- }
- }
- b.toString()
- })
-
- getRdd.collect().foreach(v => println(v))
-
- } finally {
- sc.stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala
deleted file mode 100644
index 063f2c2..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExample.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.hbasecontext
-
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.spark.SparkContext
-import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.client.Put
-import org.apache.spark.SparkConf
-
-/**
- * This is a simple example of putting records in HBase
- * with the bulkPut function.
- */
-object HBaseBulkPutExample {
- def main(args: Array[String]) {
- if (args.length < 2) {
- println("HBaseBulkPutExample {tableName} {columnFamily} are missing an arguments")
- return
- }
-
- val tableName = args(0)
- val columnFamily = args(1)
-
- val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " +
- tableName + " " + columnFamily)
- val sc = new SparkContext(sparkConf)
-
- try {
- //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
- val rdd = sc.parallelize(Array(
- (Bytes.toBytes("1"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
- (Bytes.toBytes("2"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
- (Bytes.toBytes("3"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
- (Bytes.toBytes("4"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
- (Bytes.toBytes("5"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
- ))
-
- val conf = HBaseConfiguration.create()
-
- val hbaseContext = new HBaseContext(sc, conf)
- hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
- TableName.valueOf(tableName),
- (putRecord) => {
- val put = new Put(putRecord._1)
- putRecord._2.foreach((putValue) =>
- put.addColumn(putValue._1, putValue._2, putValue._3))
- put
- });
- } finally {
- sc.stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala
deleted file mode 100644
index 37a0358..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutExampleFromFile.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.hbasecontext
-
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.spark.SparkContext
-import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.client.Put
-import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.Text
-import org.apache.spark.SparkConf
-
-/**
- * This is a simple example of putting records in HBase
- * with the bulkPut function. In this example we are
- * getting the put information from a file
- */
-object HBaseBulkPutExampleFromFile {
- def main(args: Array[String]) {
- if (args.length < 3) {
- println("HBaseBulkPutExampleFromFile {tableName} {columnFamily} {inputFile} are missing an argument")
- return
- }
-
- val tableName = args(0)
- val columnFamily = args(1)
- val inputFile = args(2)
-
- val sparkConf = new SparkConf().setAppName("HBaseBulkPutExampleFromFile " +
- tableName + " " + columnFamily + " " + inputFile)
- val sc = new SparkContext(sparkConf)
-
- try {
- var rdd = sc.hadoopFile(
- inputFile,
- classOf[TextInputFormat],
- classOf[LongWritable],
- classOf[Text]).map(v => {
- System.out.println("reading-" + v._2.toString)
- v._2.toString
- })
-
- val conf = HBaseConfiguration.create()
-
- val hbaseContext = new HBaseContext(sc, conf)
- hbaseContext.bulkPut[String](rdd,
- TableName.valueOf(tableName),
- (putRecord) => {
- System.out.println("hbase-" + putRecord)
- val put = new Put(Bytes.toBytes("Value- " + putRecord))
- put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"),
- Bytes.toBytes(putRecord.length()))
- put
- });
- } finally {
- sc.stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala
deleted file mode 100644
index fa78216..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkPutTimestampExample.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.hbasecontext
-
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.spark.SparkContext
-import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.client.Put
-import org.apache.spark.SparkConf
-
-/**
- * This is a simple example of putting records in HBase
- * with the bulkPut function. In this example we are
- * also setting the timestamp in the put
- */
-object HBaseBulkPutTimestampExample {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.out.println("HBaseBulkPutTimestampExample {tableName} {columnFamily} are missing an argument")
- return
- }
-
- val tableName = args(0)
- val columnFamily = args(1)
-
- val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " +
- tableName + " " + columnFamily)
- val sc = new SparkContext(sparkConf)
-
- try {
-
- val rdd = sc.parallelize(Array(
- (Bytes.toBytes("6"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
- (Bytes.toBytes("7"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
- (Bytes.toBytes("8"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
- (Bytes.toBytes("9"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
- (Bytes.toBytes("10"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))))
-
- val conf = HBaseConfiguration.create()
-
- val timeStamp = System.currentTimeMillis()
-
- val hbaseContext = new HBaseContext(sc, conf)
- hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
- TableName.valueOf(tableName),
- (putRecord) => {
- val put = new Put(putRecord._1)
- putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2,
- timeStamp, putValue._3))
- put
- })
- } finally {
- sc.stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala
deleted file mode 100644
index bb2e79d..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseDistributedScanExample.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext
-
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.spark.SparkContext
-import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.client.Scan
-import org.apache.spark.SparkConf
-/**
- * This is a simple example of scanning records from HBase
- * with the hbaseRDD function in Distributed fashion.
- */
-object HBaseDistributedScanExample {
- def main(args: Array[String]) {
- if (args.length < 1) {
- println("HBaseDistributedScanExample {tableName} missing an argument")
- return
- }
-
- val tableName = args(0)
-
- val sparkConf = new SparkConf().setAppName("HBaseDistributedScanExample " + tableName )
- val sc = new SparkContext(sparkConf)
-
- try {
- val conf = HBaseConfiguration.create()
-
- val hbaseContext = new HBaseContext(sc, conf)
-
- val scan = new Scan()
- scan.setCaching(100)
-
- val getRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
-
- getRdd.foreach(v => println(Bytes.toString(v._1.get())))
-
- println("Length: " + getRdd.map(r => r._1.copyBytes()).collect().length);
- } finally {
- sc.stop()
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala
deleted file mode 100644
index 8ac93ef..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseStreamingBulkPutExample.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.hbasecontext
-
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.spark.SparkContext
-import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.client.Put
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.Seconds
-import org.apache.spark.SparkConf
-
-/**
- * This is a simple example of BulkPut with Spark Streaming
- */
-object HBaseStreamingBulkPutExample {
- def main(args: Array[String]) {
- if (args.length < 4) {
- println("HBaseStreamingBulkPutExample " +
- "{host} {port} {tableName} {columnFamily} are missing an argument")
- return
- }
-
- val host = args(0)
- val port = args(1)
- val tableName = args(2)
- val columnFamily = args(3)
-
- val sparkConf = new SparkConf().setAppName("HBaseStreamingBulkPutExample " +
- tableName + " " + columnFamily)
- val sc = new SparkContext(sparkConf)
- try {
- val ssc = new StreamingContext(sc, Seconds(1))
-
- val lines = ssc.socketTextStream(host, port.toInt)
-
- val conf = HBaseConfiguration.create()
-
- val hbaseContext = new HBaseContext(sc, conf)
-
- hbaseContext.streamBulkPut[String](lines,
- TableName.valueOf(tableName),
- (putRecord) => {
- if (putRecord.length() > 0) {
- val put = new Put(Bytes.toBytes(putRecord))
- put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("foo"), Bytes.toBytes("bar"))
- put
- } else {
- null
- }
- })
- ssc.start()
- ssc.awaitTerminationOrTimeout(60000)
- } finally {
- sc.stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala
deleted file mode 100644
index 83d3f9e..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkDeleteExample.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.rdd
-
-import org.apache.hadoop.hbase.client.Delete
-import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
-import org.apache.hadoop.hbase.util.Bytes
-
-import org.apache.spark.{SparkContext, SparkConf}
-
-/**
- * This is a simple example of deleting records in HBase
- * with the bulkDelete function.
- */
-object HBaseBulkDeleteExample {
- def main(args: Array[String]) {
- if (args.length < 1) {
- println("HBaseBulkDeleteExample {tableName} are missing an argument")
- return
- }
-
- val tableName = args(0)
-
- val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName)
- val sc = new SparkContext(sparkConf)
- try {
- //[Array[Byte]]
- val rdd = sc.parallelize(Array(
- Bytes.toBytes("1"),
- Bytes.toBytes("2"),
- Bytes.toBytes("3"),
- Bytes.toBytes("4"),
- Bytes.toBytes("5")
- ))
-
- val conf = HBaseConfiguration.create()
-
- val hbaseContext = new HBaseContext(sc, conf)
-
- rdd.hbaseBulkDelete(hbaseContext, TableName.valueOf(tableName),
- putRecord => new Delete(putRecord),
- 4)
-
- } finally {
- sc.stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala
deleted file mode 100644
index eedabc3..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkGetExample.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.rdd
-
-import org.apache.hadoop.hbase.client.{Result, Get}
-import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseConfiguration}
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
-import org.apache.spark.{SparkContext, SparkConf}
-
-/**
- * This is a simple example of getting records from HBase
- * with the bulkGet function.
- */
-object HBaseBulkGetExample {
- def main(args: Array[String]) {
- if (args.length < 1) {
- println("HBaseBulkGetExample {tableName} is missing an argument")
- return
- }
-
- val tableName = args(0)
-
- val sparkConf = new SparkConf().setAppName("HBaseBulkGetExample " + tableName)
- val sc = new SparkContext(sparkConf)
-
- try {
-
- //[(Array[Byte])]
- val rdd = sc.parallelize(Array(
- Bytes.toBytes("1"),
- Bytes.toBytes("2"),
- Bytes.toBytes("3"),
- Bytes.toBytes("4"),
- Bytes.toBytes("5"),
- Bytes.toBytes("6"),
- Bytes.toBytes("7")))
-
- val conf = HBaseConfiguration.create()
-
- val hbaseContext = new HBaseContext(sc, conf)
-
- val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2,
- record => {
- System.out.println("making Get")
- new Get(record)
- },
- (result: Result) => {
-
- val it = result.listCells().iterator()
- val b = new StringBuilder
-
- b.append(Bytes.toString(result.getRow) + ":")
-
- while (it.hasNext) {
- val cell = it.next()
- val q = Bytes.toString(CellUtil.cloneQualifier(cell))
- if (q.equals("counter")) {
- b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
- } else {
- b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
- }
- }
- b.toString()
- })
-
- getRdd.collect().foreach(v => println(v))
-
- } finally {
- sc.stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala
deleted file mode 100644
index 28711b8..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseBulkPutExample.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.rdd
-
-import org.apache.hadoop.hbase.client.Put
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
-import org.apache.spark.{SparkConf, SparkContext}
-
-/**
- * This is a simple example of putting records in HBase
- * with the bulkPut function.
- */
-object HBaseBulkPutExample {
- def main(args: Array[String]) {
- if (args.length < 2) {
- println("HBaseBulkPutExample {tableName} {columnFamily} are missing an arguments")
- return
- }
-
- val tableName = args(0)
- val columnFamily = args(1)
-
- val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " +
- tableName + " " + columnFamily)
- val sc = new SparkContext(sparkConf)
-
- try {
- //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
- val rdd = sc.parallelize(Array(
- (Bytes.toBytes("1"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
- (Bytes.toBytes("2"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
- (Bytes.toBytes("3"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
- (Bytes.toBytes("4"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
- (Bytes.toBytes("5"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
- ))
-
- val conf = HBaseConfiguration.create()
-
- val hbaseContext = new HBaseContext(sc, conf)
-
- rdd.hbaseBulkPut(hbaseContext, TableName.valueOf(tableName),
- (putRecord) => {
- val put = new Put(putRecord._1)
- putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2,
- putValue._3))
- put
- })
-
- } finally {
- sc.stop()
- }
- }
- }
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala
deleted file mode 100644
index 8dfefc2..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseForeachPartitionExample.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.rdd
-
-import org.apache.hadoop.hbase.client.Put
-import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.{SparkContext, SparkConf}
-
-/**
- * This is a simple example of using the foreachPartition
- * method with a HBase connection
- */
-object HBaseForeachPartitionExample {
- def main(args: Array[String]) {
- if (args.length < 2) {
- println("HBaseForeachPartitionExample {tableName} {columnFamily} are missing an arguments")
- return
- }
-
- val tableName = args(0)
- val columnFamily = args(1)
-
- val sparkConf = new SparkConf().setAppName("HBaseForeachPartitionExample " +
- tableName + " " + columnFamily)
- val sc = new SparkContext(sparkConf)
-
- try {
- //[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])]
- val rdd = sc.parallelize(Array(
- (Bytes.toBytes("1"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
- (Bytes.toBytes("2"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
- (Bytes.toBytes("3"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
- (Bytes.toBytes("4"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
- (Bytes.toBytes("5"),
- Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
- ))
-
- val conf = HBaseConfiguration.create()
-
- val hbaseContext = new HBaseContext(sc, conf)
-
-
- rdd.hbaseForeachPartition(hbaseContext,
- (it, connection) => {
- val m = connection.getBufferedMutator(TableName.valueOf(tableName))
-
- it.foreach(r => {
- val put = new Put(r._1)
- r._2.foreach((putValue) =>
- put.addColumn(putValue._1, putValue._2, putValue._3))
- m.mutate(put)
- })
- m.flush()
- m.close()
- })
-
- } finally {
- sc.stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala
deleted file mode 100644
index 0d0b314..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/rdd/HBaseMapPartitionExample.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark.example.rdd
-
-import org.apache.hadoop.hbase.client.Get
-import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
-import org.apache.hadoop.hbase.spark.HBaseContext
-import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.{SparkContext, SparkConf}
-
-/**
- * This is a simple example of using the mapPartitions
- * method with a HBase connection
- */
-object HBaseMapPartitionExample {
- def main(args: Array[String]) {
- if (args.length < 1) {
- println("HBaseMapPartitionExample {tableName} is missing an argument")
- return
- }
-
- val tableName = args(0)
-
- val sparkConf = new SparkConf().setAppName("HBaseMapPartitionExample " + tableName)
- val sc = new SparkContext(sparkConf)
-
- try {
-
- //[(Array[Byte])]
- val rdd = sc.parallelize(Array(
- Bytes.toBytes("1"),
- Bytes.toBytes("2"),
- Bytes.toBytes("3"),
- Bytes.toBytes("4"),
- Bytes.toBytes("5"),
- Bytes.toBytes("6"),
- Bytes.toBytes("7")))
-
- val conf = HBaseConfiguration.create()
-
- val hbaseContext = new HBaseContext(sc, conf)
-
- val getRdd = rdd.hbaseMapPartitions[String](hbaseContext, (it, connection) => {
- val table = connection.getTable(TableName.valueOf(tableName))
- it.map{r =>
- //batching would be faster. This is just an example
- val result = table.get(new Get(r))
-
- val it = result.listCells().iterator()
- val b = new StringBuilder
-
- b.append(Bytes.toString(result.getRow) + ":")
-
- while (it.hasNext) {
- val cell = it.next()
- val q = Bytes.toString(cell.getQualifierArray)
- if (q.equals("counter")) {
- b.append("(" + q + "," + Bytes.toLong(cell.getValueArray) + ")")
- } else {
- b.append("(" + q + "," + Bytes.toString(cell.getValueArray) + ")")
- }
- }
- b.toString()
- }
- })
-
- getRdd.collect().foreach(v => println(v))
-
- } finally {
- sc.stop()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala
deleted file mode 100644
index 3df23f9..0000000
--- a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/DataTypeParserWrapper.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.datasources.hbase
-
-import org.apache.spark.sql.catalyst.SqlLexical
-import org.apache.spark.sql.catalyst.util.DataTypeParser
-import org.apache.spark.sql.types.DataType
-
-// TODO: Only used in test suite.
-object DataTypeParserWrapper {
- lazy val dataTypeParser = new DataTypeParser {
- override val lexical = new SqlLexical
- }
-
- def parse(dataTypeString: String): DataType = dataTypeParser.toDataType(dataTypeString)
-}