You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/09 04:59:54 UTC
[8/9] hbase git commit: HBASE-18817 pull the hbase-spark module out
of branch-2.
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
deleted file mode 100644
index 54ff658..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.spark.FamilyHFileWriteOptions;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.spark.KeyFamilyQualifier;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * Run this example using command below:
- *
- * SPARK_HOME/bin/spark-submit --master local[2] --class org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkLoadExample
- * path/to/hbase-spark.jar {path/to/output/HFiles}
- *
- * This example will output put hfiles in {path/to/output/HFiles}, and user can run
- * 'hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
- */
-final public class JavaHBaseBulkLoadExample {
- private JavaHBaseBulkLoadExample() {}
-
- public static void main(String[] args) {
- if (args.length < 1) {
- System.out.println("JavaHBaseBulkLoadExample " + "{outputPath}");
- return;
- }
-
- String tableName = "bulkload-table-test";
- String columnFamily1 = "f1";
- String columnFamily2 = "f2";
-
- SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkLoadExample " + tableName);
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
- try {
- List<String> list= new ArrayList<String>();
- // row1
- list.add("1," + columnFamily1 + ",b,1");
- // row3
- list.add("3," + columnFamily1 + ",a,2");
- list.add("3," + columnFamily1 + ",b,1");
- list.add("3," + columnFamily2 + ",a,1");
- /* row2 */
- list.add("2," + columnFamily2 + ",a,3");
- list.add("2," + columnFamily2 + ",b,3");
-
- JavaRDD<String> rdd = jsc.parallelize(list);
-
- Configuration conf = HBaseConfiguration.create();
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
-
-
- hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName),new BulkLoadFunction(), args[0],
- new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
- } finally {
- jsc.stop();
- }
- }
-
- public static class BulkLoadFunction implements Function<String, Pair<KeyFamilyQualifier, byte[]>> {
-
- @Override
- public Pair<KeyFamilyQualifier, byte[]> call(String v1) throws Exception {
- if (v1 == null)
- return null;
- String[] strs = v1.split(",");
- if(strs.length != 4)
- return null;
- KeyFamilyQualifier kfq = new KeyFamilyQualifier(Bytes.toBytes(strs[0]), Bytes.toBytes(strs[1]),
- Bytes.toBytes(strs[2]));
- return new Pair(kfq, Bytes.toBytes(strs[3]));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
deleted file mode 100644
index 5821c19..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkPutExample.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-/**
- * This is a simple example of putting records in HBase
- * with the bulkPut function.
- */
-final public class JavaHBaseBulkPutExample {
-
- private JavaHBaseBulkPutExample() {}
-
- public static void main(String[] args) {
- if (args.length < 2) {
- System.out.println("JavaHBaseBulkPutExample " +
- "{tableName} {columnFamily}");
- return;
- }
-
- String tableName = args[0];
- String columnFamily = args[1];
-
- SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkPutExample " + tableName);
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
- try {
- List<String> list = new ArrayList<>(5);
- list.add("1," + columnFamily + ",a,1");
- list.add("2," + columnFamily + ",a,2");
- list.add("3," + columnFamily + ",a,3");
- list.add("4," + columnFamily + ",a,4");
- list.add("5," + columnFamily + ",a,5");
-
- JavaRDD<String> rdd = jsc.parallelize(list);
-
- Configuration conf = HBaseConfiguration.create();
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- hbaseContext.bulkPut(rdd,
- TableName.valueOf(tableName),
- new PutFunction());
- } finally {
- jsc.stop();
- }
- }
-
- public static class PutFunction implements Function<String, Put> {
-
- private static final long serialVersionUID = 1L;
-
- public Put call(String v) throws Exception {
- String[] cells = v.split(",");
- Put put = new Put(Bytes.toBytes(cells[0]));
-
- put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
- Bytes.toBytes(cells[3]));
- return put;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
deleted file mode 100644
index 8d4c092..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseDistributedScan.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import org.apache.spark.api.java.function.Function;
-import scala.Tuple2;
-
-/**
- * This is a simple example of scanning records from HBase
- * with the hbaseRDD function.
- */
-final public class JavaHBaseDistributedScan {
-
- private JavaHBaseDistributedScan() {}
-
- public static void main(String[] args) {
- if (args.length < 1) {
- System.out.println("JavaHBaseDistributedScan {tableName}");
- return;
- }
-
- String tableName = args[0];
-
- SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseDistributedScan " + tableName);
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
- try {
- Configuration conf = HBaseConfiguration.create();
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- Scan scan = new Scan();
- scan.setCaching(100);
-
- JavaRDD<Tuple2<ImmutableBytesWritable, Result>> javaRdd =
- hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan);
-
- List<String> results = javaRdd.map(new ScanConvertFunction()).collect();
-
- System.out.println("Result Size: " + results.size());
- } finally {
- jsc.stop();
- }
- }
-
- private static class ScanConvertFunction implements
- Function<Tuple2<ImmutableBytesWritable, Result>, String> {
- @Override
- public String call(Tuple2<ImmutableBytesWritable, Result> v1) throws Exception {
- return Bytes.toString(v1._1().copyBytes());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
deleted file mode 100644
index 316f8a1..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseMapGetPutExample.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
-
-import scala.Tuple2;
-
-/**
- * This is a simple example of using the foreachPartition
- * method with a HBase connection
- */
-final public class JavaHBaseMapGetPutExample {
-
- private JavaHBaseMapGetPutExample() {}
-
- public static void main(String[] args) {
- if (args.length < 1) {
- System.out.println("JavaHBaseBulkGetExample {tableName}");
- return;
- }
-
- final String tableName = args[0];
-
- SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
- try {
- List<byte[]> list = new ArrayList<>(5);
- list.add(Bytes.toBytes("1"));
- list.add(Bytes.toBytes("2"));
- list.add(Bytes.toBytes("3"));
- list.add(Bytes.toBytes("4"));
- list.add(Bytes.toBytes("5"));
-
- JavaRDD<byte[]> rdd = jsc.parallelize(list);
- Configuration conf = HBaseConfiguration.create();
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- hbaseContext.foreachPartition(rdd,
- new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
- public void call(Tuple2<Iterator<byte[]>, Connection> t)
- throws Exception {
- Table table = t._2().getTable(TableName.valueOf(tableName));
- BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
-
- while (t._1().hasNext()) {
- byte[] b = t._1().next();
- Result r = table.get(new Get(b));
- if (r.getExists()) {
- mutator.mutate(new Put(b));
- }
- }
-
- mutator.flush();
- mutator.close();
- table.close();
- }
- });
- } finally {
- jsc.stop();
- }
- }
-
- public static class GetFunction implements Function<byte[], Get> {
- private static final long serialVersionUID = 1L;
- public Get call(byte[] v) throws Exception {
- return new Get(v);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
deleted file mode 100644
index cd4cf24..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseStreamingBulkPutExample.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-/**
- * This is a simple example of BulkPut with Spark Streaming
- */
-final public class JavaHBaseStreamingBulkPutExample {
-
- private JavaHBaseStreamingBulkPutExample() {}
-
- public static void main(String[] args) {
- if (args.length < 4) {
- System.out.println("JavaHBaseBulkPutExample " +
- "{host} {port} {tableName}");
- return;
- }
-
- String host = args[0];
- String port = args[1];
- String tableName = args[2];
-
- SparkConf sparkConf =
- new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
- tableName + ":" + port + ":" + tableName);
-
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
- try {
- JavaStreamingContext jssc =
- new JavaStreamingContext(jsc, new Duration(1000));
-
- JavaReceiverInputDStream<String> javaDstream =
- jssc.socketTextStream(host, Integer.parseInt(port));
-
- Configuration conf = HBaseConfiguration.create();
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- hbaseContext.streamBulkPut(javaDstream,
- TableName.valueOf(tableName),
- new PutFunction());
- } finally {
- jsc.stop();
- }
- }
-
- public static class PutFunction implements Function<String, Put> {
-
- private static final long serialVersionUID = 1L;
-
- public Put call(String v) throws Exception {
- String[] part = v.split(",");
- Put put = new Put(Bytes.toBytes(part[0]));
-
- put.addColumn(Bytes.toBytes(part[1]),
- Bytes.toBytes(part[2]),
- Bytes.toBytes(part[3]));
- return put;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/protobuf/SparkFilter.proto
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/protobuf/SparkFilter.proto b/hbase-spark/src/main/protobuf/SparkFilter.proto
deleted file mode 100644
index e16c551..0000000
--- a/hbase-spark/src/main/protobuf/SparkFilter.proto
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// This file contains protocol buffers that are used for Spark filters
-// over in the hbase-spark module
-package hbase.pb;
-
-option java_package = "org.apache.hadoop.hbase.spark.protobuf.generated";
-option java_outer_classname = "SparkFilterProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-option optimize_for = SPEED;
-
-message SQLPredicatePushDownCellToColumnMapping {
- required bytes column_family = 1;
- required bytes qualifier = 2;
- required string column_name = 3;
-}
-
-message SQLPredicatePushDownFilter {
- required string dynamic_logic_expression = 1;
- repeated bytes value_from_query_array = 2;
- repeated SQLPredicatePushDownCellToColumnMapping cell_to_column_mapping = 3;
- optional string encoderClassName = 4;
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
deleted file mode 100644
index 9442c50..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/BulkLoadPartitioner.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util
-import java.util.Comparator
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.Partitioner
-
-/**
- * A Partitioner implementation that will separate records to different
- * HBase Regions based on region splits
- *
- * @param startKeys The start keys for the given table
- */
-@InterfaceAudience.Public
-class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
- extends Partitioner {
- // when table not exist, startKeys = Byte[0][]
- override def numPartitions: Int = if (startKeys.length == 0) 1 else startKeys.length
-
- override def getPartition(key: Any): Int = {
-
- val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {
- override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
- Bytes.compareTo(o1, o2)
- }
- }
-
- val rowKey:Array[Byte] =
- key match {
- case qualifier: KeyFamilyQualifier =>
- qualifier.rowKey
- case wrapper: ByteArrayWrapper =>
- wrapper.value
- case _ =>
- key.asInstanceOf[Array[Byte]]
- }
- var partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
- if (partition < 0)
- partition = partition * -1 + -2
- if (partition < 0)
- partition = 0
- partition
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala
deleted file mode 100644
index 2d0be38..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes
-
-@InterfaceAudience.Public
-class ByteArrayComparable(val bytes:Array[Byte], val offset:Int = 0, var length:Int = -1)
- extends Comparable[ByteArrayComparable] {
-
- if (length == -1) {
- length = bytes.length
- }
-
- override def compareTo(o: ByteArrayComparable): Int = {
- Bytes.compareTo(bytes, offset, length, o.bytes, o.offset, o.length)
- }
-
- override def hashCode(): Int = {
- Bytes.hashCode(bytes, offset, length)
- }
-
- override def equals (obj: Any): Boolean = {
- obj match {
- case b: ByteArrayComparable =>
- Bytes.equals(bytes, offset, length, b.bytes, b.offset, b.length)
- case _ =>
- false
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala
deleted file mode 100644
index 738fa45..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayWrapper.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark
-
-import java.io.Serializable
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * This is a wrapper over a byte array so it can work as
- * a key in a hashMap
- *
- * @param value The Byte Array value
- */
-@InterfaceAudience.Public
-class ByteArrayWrapper (var value:Array[Byte])
- extends Comparable[ByteArrayWrapper] with Serializable {
- override def compareTo(valueOther: ByteArrayWrapper): Int = {
- Bytes.compareTo(value,valueOther.value)
- }
- override def equals(o2: Any): Boolean = {
- o2 match {
- case wrapper: ByteArrayWrapper =>
- Bytes.equals(value, wrapper.value)
- case _ =>
- false
- }
- }
- override def hashCode():Int = {
- Bytes.hashCode(value)
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala
deleted file mode 100644
index 3037001..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ColumnFamilyQualifierMapKeyWrapper.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * A wrapper class that will allow both columnFamily and qualifier to
- * be the key of a hashMap. Also allow for finding the value in a hashmap
- * with out cloning the HBase value from the HBase Cell object
- * @param columnFamily ColumnFamily byte array
- * @param columnFamilyOffSet Offset of columnFamily value in the array
- * @param columnFamilyLength Length of the columnFamily value in the columnFamily array
- * @param qualifier Qualifier byte array
- * @param qualifierOffSet Offset of qualifier value in the array
- * @param qualifierLength Length of the qualifier value with in the array
- */
-@InterfaceAudience.Public
-class ColumnFamilyQualifierMapKeyWrapper(val columnFamily:Array[Byte],
- val columnFamilyOffSet:Int,
- val columnFamilyLength:Int,
- val qualifier:Array[Byte],
- val qualifierOffSet:Int,
- val qualifierLength:Int)
- extends Serializable{
-
- override def equals(other:Any): Boolean = {
- val otherWrapper = other.asInstanceOf[ColumnFamilyQualifierMapKeyWrapper]
-
- Bytes.compareTo(columnFamily,
- columnFamilyOffSet,
- columnFamilyLength,
- otherWrapper.columnFamily,
- otherWrapper.columnFamilyOffSet,
- otherWrapper.columnFamilyLength) == 0 && Bytes.compareTo(qualifier,
- qualifierOffSet,
- qualifierLength,
- otherWrapper.qualifier,
- otherWrapper.qualifierOffSet,
- otherWrapper.qualifierLength) == 0
- }
-
- override def hashCode():Int = {
- Bytes.hashCode(columnFamily, columnFamilyOffSet, columnFamilyLength) +
- Bytes.hashCode(qualifier, qualifierOffSet, qualifierLength)
- }
-
- def cloneColumnFamily():Array[Byte] = {
- val resultArray = new Array[Byte](columnFamilyLength)
- System.arraycopy(columnFamily, columnFamilyOffSet, resultArray, 0, columnFamilyLength)
- resultArray
- }
-
- def cloneQualifier():Array[Byte] = {
- val resultArray = new Array[Byte](qualifierLength)
- System.arraycopy(qualifier, qualifierOffSet, resultArray, 0, qualifierLength)
- resultArray
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
deleted file mode 100644
index a488dd3..0000000
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ /dev/null
@@ -1,1224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.hbase.mapred.TableOutputFormat
-import org.apache.hadoop.hbase.spark.datasources._
-import org.apache.hadoop.hbase.types._
-import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositionedMutableByteRange}
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.HTableDescriptor
-import org.apache.hadoop.hbase.HColumnDescriptor
-import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.CellUtil
-import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.datasources.hbase.{Utils, Field, HBaseTableCatalog}
-import org.apache.spark.sql.{DataFrame, SaveMode, Row, SQLContext}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types._
-
-import scala.collection.mutable
-
-/**
- * DefaultSource for integration with Spark's dataframe datasources.
- * This class will produce a relationProvider based on input given to it from spark
- *
- * This class needs to stay in the current package 'org.apache.hadoop.hbase.spark'
- * for Spark to match the hbase data source name.
- *
- * In all this DefaultSource support the following datasource functionality
- * - Scan range pruning through filter push down logic based on rowKeys
- * - Filter push down logic on HBase Cells
- * - Qualifier filtering based on columns used in the SparkSQL statement
- * - Type conversions of basic SQL types. All conversions will be
- * Through the HBase Bytes object commands.
- */
-@InterfaceAudience.Private
-class DefaultSource extends RelationProvider with CreatableRelationProvider with Logging {
- /**
- * Is given input from SparkSQL to construct a BaseRelation
- *
- * @param sqlContext SparkSQL context
- * @param parameters Parameters given to us from SparkSQL
- * @return A BaseRelation Object
- */
- override def createRelation(sqlContext: SQLContext,
- parameters: Map[String, String]):
- BaseRelation = {
- new HBaseRelation(parameters, None)(sqlContext)
- }
-
-
- override def createRelation(
- sqlContext: SQLContext,
- mode: SaveMode,
- parameters: Map[String, String],
- data: DataFrame): BaseRelation = {
- val relation = HBaseRelation(parameters, Some(data.schema))(sqlContext)
- relation.createTable()
- relation.insert(data, false)
- relation
- }
-}
-
-/**
- * Implementation of Spark BaseRelation that will build up our scan logic
- * , do the scan pruning, filter push down, and value conversions
- *
- * @param sqlContext SparkSQL context
- */
-@InterfaceAudience.Private
-case class HBaseRelation (
- @transient parameters: Map[String, String],
- userSpecifiedSchema: Option[StructType]
- )(@transient val sqlContext: SQLContext)
- extends BaseRelation with PrunedFilteredScan with InsertableRelation with Logging {
- val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
- val minTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_START).map(_.toLong)
- val maxTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_END).map(_.toLong)
- val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
- val encoderClsName = parameters.get(HBaseSparkConf.QUERY_ENCODER).getOrElse(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
-
- @transient val encoder = JavaBytesEncoder.create(encoderClsName)
-
- val catalog = HBaseTableCatalog(parameters)
- def tableName = catalog.name
- val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_LOCATION, "")
- val useHBaseContext = parameters.get(HBaseSparkConf.USE_HBASECONTEXT).map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_USE_HBASECONTEXT)
- val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSHDOWN_COLUMN_FILTER)
- .map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_PUSHDOWN_COLUMN_FILTER)
-
- // The user supplied per table parameter will overwrite global ones in SparkConf
- val blockCacheEnable = parameters.get(HBaseSparkConf.QUERY_CACHEBLOCKS).map(_.toBoolean)
- .getOrElse(
- sqlContext.sparkContext.getConf.getBoolean(
- HBaseSparkConf.QUERY_CACHEBLOCKS, HBaseSparkConf.DEFAULT_QUERY_CACHEBLOCKS))
- val cacheSize = parameters.get(HBaseSparkConf.QUERY_CACHEDROWS).map(_.toInt)
- .getOrElse(
- sqlContext.sparkContext.getConf.getInt(
- HBaseSparkConf.QUERY_CACHEDROWS, -1))
- val batchNum = parameters.get(HBaseSparkConf.QUERY_BATCHSIZE).map(_.toInt)
- .getOrElse(sqlContext.sparkContext.getConf.getInt(
- HBaseSparkConf.QUERY_BATCHSIZE, -1))
-
- val bulkGetSize = parameters.get(HBaseSparkConf.BULKGET_SIZE).map(_.toInt)
- .getOrElse(sqlContext.sparkContext.getConf.getInt(
- HBaseSparkConf.BULKGET_SIZE, HBaseSparkConf.DEFAULT_BULKGET_SIZE))
-
- //create or get latest HBaseContext
- val hbaseContext:HBaseContext = if (useHBaseContext) {
- LatestHBaseContextCache.latest
- } else {
- val config = HBaseConfiguration.create()
- configResources.split(",").foreach( r => config.addResource(r))
- new HBaseContext(sqlContext.sparkContext, config)
- }
-
- val wrappedConf = new SerializableConfiguration(hbaseContext.config)
- def hbaseConf = wrappedConf.value
-
- /**
- * Generates a Spark SQL schema objeparametersct so Spark SQL knows what is being
- * provided by this BaseRelation
- *
- * @return schema generated from the SCHEMA_COLUMNS_MAPPING_KEY value
- */
- override val schema: StructType = userSpecifiedSchema.getOrElse(catalog.toDataType)
-
-
-
- def createTable() {
- val numReg = parameters.get(HBaseTableCatalog.newTable).map(x => x.toInt).getOrElse(0)
- val startKey = Bytes.toBytes(
- parameters.get(HBaseTableCatalog.regionStart)
- .getOrElse(HBaseTableCatalog.defaultRegionStart))
- val endKey = Bytes.toBytes(
- parameters.get(HBaseTableCatalog.regionEnd)
- .getOrElse(HBaseTableCatalog.defaultRegionEnd))
- if (numReg > 3) {
- val tName = TableName.valueOf(catalog.name)
- val cfs = catalog.getColumnFamilies
-
- val connection = HBaseConnectionCache.getConnection(hbaseConf)
- // Initialize hBase table if necessary
- val admin = connection.getAdmin
- try {
- if (!admin.isTableAvailable(tName)) {
- val tableDesc = new HTableDescriptor(tName)
- cfs.foreach { x =>
- val cf = new HColumnDescriptor(x.getBytes())
- logDebug(s"add family $x to ${catalog.name}")
- tableDesc.addFamily(cf)
- }
- val splitKeys = Bytes.split(startKey, endKey, numReg);
- admin.createTable(tableDesc, splitKeys)
-
- }
- }finally {
- admin.close()
- connection.close()
- }
- } else {
- logInfo(
- s"""${HBaseTableCatalog.newTable}
- |is not defined or no larger than 3, skip the create table""".stripMargin)
- }
- }
-
- /**
- *
- * @param data
- * @param overwrite
- */
- override def insert(data: DataFrame, overwrite: Boolean): Unit = {
- val jobConfig: JobConf = new JobConf(hbaseConf, this.getClass)
- jobConfig.setOutputFormat(classOf[TableOutputFormat])
- jobConfig.set(TableOutputFormat.OUTPUT_TABLE, catalog.name)
- var count = 0
- val rkFields = catalog.getRowKey
- val rkIdxedFields = rkFields.map{ case x =>
- (schema.fieldIndex(x.colName), x)
- }
- val colsIdxedFields = schema
- .fieldNames
- .partition( x => rkFields.map(_.colName).contains(x))
- ._2.map(x => (schema.fieldIndex(x), catalog.getField(x)))
- val rdd = data.rdd
- def convertToPut(row: Row) = {
- // construct bytes for row key
- val rowBytes = rkIdxedFields.map { case (x, y) =>
- Utils.toBytes(row(x), y)
- }
- val rLen = rowBytes.foldLeft(0) { case (x, y) =>
- x + y.length
- }
- val rBytes = new Array[Byte](rLen)
- var offset = 0
- rowBytes.foreach { x =>
- System.arraycopy(x, 0, rBytes, offset, x.length)
- offset += x.length
- }
- val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _))
-
- colsIdxedFields.foreach { case (x, y) =>
- val b = Utils.toBytes(row(x), y)
- put.addColumn(Bytes.toBytes(y.cf), Bytes.toBytes(y.col), b)
- }
- count += 1
- (new ImmutableBytesWritable, put)
- }
- rdd.map(convertToPut(_)).saveAsHadoopDataset(jobConfig)
- }
-
- def getIndexedProjections(requiredColumns: Array[String]): Seq[(Field, Int)] = {
- requiredColumns.map(catalog.sMap.getField(_)).zipWithIndex
- }
-
-
- /**
- * Takes a HBase Row object and parses all of the fields from it.
- * This is independent of which fields were requested from the key
- * Because we have all the data it's less complex to parse everything.
- *
- * @param row the retrieved row from hbase.
- * @param keyFields all of the fields in the row key, ORDERED by their order in the row key.
- */
- def parseRowKey(row: Array[Byte], keyFields: Seq[Field]): Map[Field, Any] = {
- keyFields.foldLeft((0, Seq[(Field, Any)]()))((state, field) => {
- val idx = state._1
- val parsed = state._2
- if (field.length != -1) {
- val value = Utils.hbaseFieldToScalaType(field, row, idx, field.length)
- // Return the new index and appended value
- (idx + field.length, parsed ++ Seq((field, value)))
- } else {
- field.dt match {
- case StringType =>
- val pos = row.indexOf(HBaseTableCatalog.delimiter, idx)
- if (pos == -1 || pos > row.length) {
- // this is at the last dimension
- val value = Utils.hbaseFieldToScalaType(field, row, idx, row.length)
- (row.length + 1, parsed ++ Seq((field, value)))
- } else {
- val value = Utils.hbaseFieldToScalaType(field, row, idx, pos - idx)
- (pos, parsed ++ Seq((field, value)))
- }
- // We don't know the length, assume it extends to the end of the rowkey.
- case _ => (row.length + 1, parsed ++ Seq((field, Utils.hbaseFieldToScalaType(field, row, idx, row.length))))
- }
- }
- })._2.toMap
- }
-
- def buildRow(fields: Seq[Field], result: Result): Row = {
- val r = result.getRow
- val keySeq = parseRowKey(r, catalog.getRowKey)
- val valueSeq = fields.filter(!_.isRowKey).map { x =>
- val kv = result.getColumnLatestCell(Bytes.toBytes(x.cf), Bytes.toBytes(x.col))
- if (kv == null || kv.getValueLength == 0) {
- (x, null)
- } else {
- val v = CellUtil.cloneValue(kv)
- (x, x.dt match {
- // Here, to avoid arraycopy, return v directly instead of calling hbaseFieldToScalaType
- case BinaryType => v
- case _ => Utils.hbaseFieldToScalaType(x, v, 0, v.length)
- })
- }
- }.toMap
- val unionedRow = keySeq ++ valueSeq
- // Return the row ordered by the requested order
- Row.fromSeq(fields.map(unionedRow.get(_).getOrElse(null)))
- }
-
- /**
- * Here we are building the functionality to populate the resulting RDD[Row]
- * Here is where we will do the following:
- * - Filter push down
- * - Scan or GetList pruning
- * - Executing our scan(s) or/and GetList to generate result
- *
- * @param requiredColumns The columns that are being requested by the requesting query
- * @param filters The filters that are being applied by the requesting query
- * @return RDD will all the results from HBase needed for SparkSQL to
- * execute the query on
- */
- override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
-
- val pushDownTuple = buildPushDownPredicatesResource(filters)
- val pushDownRowKeyFilter = pushDownTuple._1
- var pushDownDynamicLogicExpression = pushDownTuple._2
- val valueArray = pushDownTuple._3
-
- if (!usePushDownColumnFilter) {
- pushDownDynamicLogicExpression = null
- }
-
- logDebug("pushDownRowKeyFilter: " + pushDownRowKeyFilter.ranges)
- if (pushDownDynamicLogicExpression != null) {
- logDebug("pushDownDynamicLogicExpression: " +
- pushDownDynamicLogicExpression.toExpressionString)
- }
- logDebug("valueArray: " + valueArray.length)
-
- val requiredQualifierDefinitionList =
- new mutable.MutableList[Field]
-
- requiredColumns.foreach( c => {
- val field = catalog.getField(c)
- requiredQualifierDefinitionList += field
- })
-
- //retain the information for unit testing checks
- DefaultSourceStaticUtils.populateLatestExecutionRules(pushDownRowKeyFilter,
- pushDownDynamicLogicExpression)
-
- val getList = new util.ArrayList[Get]()
- val rddList = new util.ArrayList[RDD[Row]]()
-
- //add points to getList
- pushDownRowKeyFilter.points.foreach(p => {
- val get = new Get(p)
- requiredQualifierDefinitionList.foreach( d => {
- if (d.isRowKey)
- get.addColumn(d.cfBytes, d.colBytes)
- })
- getList.add(get)
- })
-
- val pushDownFilterJava = if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
- Some(new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
- valueArray, requiredQualifierDefinitionList, encoderClsName))
- } else {
- None
- }
- val hRdd = new HBaseTableScanRDD(this, hbaseContext, pushDownFilterJava, requiredQualifierDefinitionList.seq)
- pushDownRowKeyFilter.points.foreach(hRdd.addPoint(_))
- pushDownRowKeyFilter.ranges.foreach(hRdd.addRange(_))
-
- var resultRDD: RDD[Row] = {
- val tmp = hRdd.map{ r =>
- val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
- buildRow(indexedFields, r)
-
- }
- if (tmp.partitions.size > 0) {
- tmp
- } else {
- null
- }
- }
-
- if (resultRDD == null) {
- val scan = new Scan()
- scan.setCacheBlocks(blockCacheEnable)
- scan.setBatch(batchNum)
- scan.setCaching(cacheSize)
- requiredQualifierDefinitionList.foreach( d =>
- scan.addColumn(d.cfBytes, d.colBytes))
-
- val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
- val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
- buildRow(indexedFields, r._2)
- })
- resultRDD=rdd
- }
- resultRDD
- }
-
- def buildPushDownPredicatesResource(filters: Array[Filter]):
- (RowKeyFilter, DynamicLogicExpression, Array[Array[Byte]]) = {
- var superRowKeyFilter:RowKeyFilter = null
- val queryValueList = new mutable.MutableList[Array[Byte]]
- var superDynamicLogicExpression: DynamicLogicExpression = null
-
- filters.foreach( f => {
- val rowKeyFilter = new RowKeyFilter()
- val logicExpression = transverseFilterTree(rowKeyFilter, queryValueList, f)
- if (superDynamicLogicExpression == null) {
- superDynamicLogicExpression = logicExpression
- superRowKeyFilter = rowKeyFilter
- } else {
- superDynamicLogicExpression =
- new AndLogicExpression(superDynamicLogicExpression, logicExpression)
- superRowKeyFilter.mergeIntersect(rowKeyFilter)
- }
-
- })
-
- val queryValueArray = queryValueList.toArray
-
- if (superRowKeyFilter == null) {
- superRowKeyFilter = new RowKeyFilter
- }
-
- (superRowKeyFilter, superDynamicLogicExpression, queryValueArray)
- }
-
- /**
- * For some codec, the order may be inconsistent between java primitive
- * type and its byte array. We may have to split the predicates on some
- * of the java primitive type into multiple predicates. The encoder will take
- * care of it and returning the concrete ranges.
- *
- * For example in naive codec, some of the java primitive types have to be split into multiple
- * predicates, and union these predicates together to make the predicates be performed correctly.
- * For example, if we have "COLUMN < 2", we will transform it into
- * "0 <= COLUMN < 2 OR Integer.MIN_VALUE <= COLUMN <= -1"
- */
-
- def transverseFilterTree(parentRowKeyFilter:RowKeyFilter,
- valueArray:mutable.MutableList[Array[Byte]],
- filter:Filter): DynamicLogicExpression = {
- filter match {
- case EqualTo(attr, value) =>
- val field = catalog.getField(attr)
- if (field != null) {
- if (field.isRowKey) {
- parentRowKeyFilter.mergeIntersect(new RowKeyFilter(
- DefaultSourceStaticUtils.getByteValue(field,
- value.toString), null))
- }
- val byteValue =
- DefaultSourceStaticUtils.getByteValue(field, value.toString)
- valueArray += byteValue
- }
- new EqualLogicExpression(attr, valueArray.length - 1, false)
-
- /**
- * encoder may split the predicates into multiple byte array boundaries.
- * Each boundaries is mapped into the RowKeyFilter and then is unioned by the reduce
- * operation. If the data type is not supported, b will be None, and there is
- * no operation happens on the parentRowKeyFilter.
- *
- * Note that because LessThan is not inclusive, thus the first bound should be exclusive,
- * which is controlled by inc.
- *
- * The other predicates, i.e., GreaterThan/LessThanOrEqual/GreaterThanOrEqual follows
- * the similar logic.
- */
- case LessThan(attr, value) =>
- val field = catalog.getField(attr)
- if (field != null) {
- if (field.isRowKey) {
- val b = encoder.ranges(value)
- var inc = false
- b.map(_.less.map { x =>
- val r = new RowKeyFilter(null,
- new ScanRange(x.upper, inc, x.low, true)
- )
- inc = true
- r
- }).map { x =>
- x.reduce { (i, j) =>
- i.mergeUnion(j)
- }
- }.map(parentRowKeyFilter.mergeIntersect(_))
- }
- val byteValue = encoder.encode(field.dt, value)
- valueArray += byteValue
- }
- new LessThanLogicExpression(attr, valueArray.length - 1)
- case GreaterThan(attr, value) =>
- val field = catalog.getField(attr)
- if (field != null) {
- if (field.isRowKey) {
- val b = encoder.ranges(value)
- var inc = false
- b.map(_.greater.map{x =>
- val r = new RowKeyFilter(null,
- new ScanRange(x.upper, true, x.low, inc))
- inc = true
- r
- }).map { x =>
- x.reduce { (i, j) =>
- i.mergeUnion(j)
- }
- }.map(parentRowKeyFilter.mergeIntersect(_))
- }
- val byteValue = encoder.encode(field.dt, value)
- valueArray += byteValue
- }
- new GreaterThanLogicExpression(attr, valueArray.length - 1)
- case LessThanOrEqual(attr, value) =>
- val field = catalog.getField(attr)
- if (field != null) {
- if (field.isRowKey) {
- val b = encoder.ranges(value)
- b.map(_.less.map(x =>
- new RowKeyFilter(null,
- new ScanRange(x.upper, true, x.low, true))))
- .map { x =>
- x.reduce{ (i, j) =>
- i.mergeUnion(j)
- }
- }.map(parentRowKeyFilter.mergeIntersect(_))
- }
- val byteValue = encoder.encode(field.dt, value)
- valueArray += byteValue
- }
- new LessThanOrEqualLogicExpression(attr, valueArray.length - 1)
- case GreaterThanOrEqual(attr, value) =>
- val field = catalog.getField(attr)
- if (field != null) {
- if (field.isRowKey) {
- val b = encoder.ranges(value)
- b.map(_.greater.map(x =>
- new RowKeyFilter(null,
- new ScanRange(x.upper, true, x.low, true))))
- .map { x =>
- x.reduce { (i, j) =>
- i.mergeUnion(j)
- }
- }.map(parentRowKeyFilter.mergeIntersect(_))
- }
- val byteValue = encoder.encode(field.dt, value)
- valueArray += byteValue
- }
- new GreaterThanOrEqualLogicExpression(attr, valueArray.length - 1)
- case Or(left, right) =>
- val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
- val rightSideRowKeyFilter = new RowKeyFilter
- val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
-
- parentRowKeyFilter.mergeUnion(rightSideRowKeyFilter)
-
- new OrLogicExpression(leftExpression, rightExpression)
- case And(left, right) =>
-
- val leftExpression = transverseFilterTree(parentRowKeyFilter, valueArray, left)
- val rightSideRowKeyFilter = new RowKeyFilter
- val rightExpression = transverseFilterTree(rightSideRowKeyFilter, valueArray, right)
- parentRowKeyFilter.mergeIntersect(rightSideRowKeyFilter)
-
- new AndLogicExpression(leftExpression, rightExpression)
- case IsNull(attr) =>
- new IsNullLogicExpression(attr, false)
- case IsNotNull(attr) =>
- new IsNullLogicExpression(attr, true)
- case _ =>
- new PassThroughLogicExpression
- }
- }
-}
-
-/**
- * Construct to contain a single scan ranges information. Also
- * provide functions to merge with other scan ranges through AND
- * or OR operators
- *
- * @param upperBound Upper bound of scan
- * @param isUpperBoundEqualTo Include upper bound value in the results
- * @param lowerBound Lower bound of scan
- * @param isLowerBoundEqualTo Include lower bound value in the results
- */
-@InterfaceAudience.Private
-class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
- var lowerBound:Array[Byte], var isLowerBoundEqualTo:Boolean)
- extends Serializable {
-
- /**
- * Function to merge another scan object through a AND operation
- *
- * @param other Other scan object
- */
- def mergeIntersect(other:ScanRange): Unit = {
- val upperBoundCompare = compareRange(upperBound, other.upperBound)
- val lowerBoundCompare = compareRange(lowerBound, other.lowerBound)
-
- upperBound = if (upperBoundCompare <0) upperBound else other.upperBound
- lowerBound = if (lowerBoundCompare >0) lowerBound else other.lowerBound
-
- isLowerBoundEqualTo = if (lowerBoundCompare == 0)
- isLowerBoundEqualTo && other.isLowerBoundEqualTo
- else isLowerBoundEqualTo
-
- isUpperBoundEqualTo = if (upperBoundCompare == 0)
- isUpperBoundEqualTo && other.isUpperBoundEqualTo
- else isUpperBoundEqualTo
- }
-
- /**
- * Function to merge another scan object through a OR operation
- *
- * @param other Other scan object
- */
- def mergeUnion(other:ScanRange): Unit = {
-
- val upperBoundCompare = compareRange(upperBound, other.upperBound)
- val lowerBoundCompare = compareRange(lowerBound, other.lowerBound)
-
- upperBound = if (upperBoundCompare >0) upperBound else other.upperBound
- lowerBound = if (lowerBoundCompare <0) lowerBound else other.lowerBound
-
- isLowerBoundEqualTo = if (lowerBoundCompare == 0)
- isLowerBoundEqualTo || other.isLowerBoundEqualTo
- else if (lowerBoundCompare < 0) isLowerBoundEqualTo else other.isLowerBoundEqualTo
-
- isUpperBoundEqualTo = if (upperBoundCompare == 0)
- isUpperBoundEqualTo || other.isUpperBoundEqualTo
- else if (upperBoundCompare < 0) other.isUpperBoundEqualTo else isUpperBoundEqualTo
- }
-
- /**
- * Common function to see if this scan over laps with another
- *
- * Reference Visual
- *
- * A B
- * |---------------------------|
- * LL--------------LU
- * RL--------------RU
- *
- * A = lowest value is byte[0]
- * B = highest value is null
- * LL = Left Lower Bound
- * LU = Left Upper Bound
- * RL = Right Lower Bound
- * RU = Right Upper Bound
- *
- * @param other Other scan object
- * @return True is overlap false is not overlap
- */
- def getOverLapScanRange(other:ScanRange): ScanRange = {
-
- var leftRange:ScanRange = null
- var rightRange:ScanRange = null
-
- // First identify the Left range
- // Also lower bound can't be null
- if (compareRange(lowerBound, other.lowerBound) < 0 ||
- compareRange(upperBound, other.upperBound) < 0) {
- leftRange = this
- rightRange = other
- } else {
- leftRange = other
- rightRange = this
- }
-
- if (hasOverlap(leftRange, rightRange)) {
- // Find the upper bound and lower bound
- if (compareRange(leftRange.upperBound, rightRange.upperBound) >= 0) {
- new ScanRange(rightRange.upperBound, rightRange.isUpperBoundEqualTo,
- rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
- } else {
- new ScanRange(leftRange.upperBound, leftRange.isUpperBoundEqualTo,
- rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
- }
- } else {
- null
- }
- }
-
- /**
- * The leftRange.upperBound has to be larger than the rightRange's lowerBound.
- * Otherwise, there is no overlap.
- *
- * @param left: The range with the smaller lowBound
- * @param right: The range with the larger lowBound
- * @return Whether two ranges have overlap.
- */
-
- def hasOverlap(left: ScanRange, right: ScanRange): Boolean = {
- compareRange(left.upperBound, right.lowerBound) >= 0
- }
-
- /**
- * Special compare logic because we can have null values
- * for left or right bound
- *
- * @param left Left byte array
- * @param right Right byte array
- * @return 0 for equals 1 is left is greater and -1 is right is greater
- */
- def compareRange(left:Array[Byte], right:Array[Byte]): Int = {
- if (left == null && right == null) 0
- else if (left == null && right != null) 1
- else if (left != null && right == null) -1
- else Bytes.compareTo(left, right)
- }
-
- /**
- *
- * @return
- */
- def containsPoint(point:Array[Byte]): Boolean = {
- val lowerCompare = compareRange(point, lowerBound)
- val upperCompare = compareRange(point, upperBound)
-
- ((isLowerBoundEqualTo && lowerCompare >= 0) ||
- (!isLowerBoundEqualTo && lowerCompare > 0)) &&
- ((isUpperBoundEqualTo && upperCompare <= 0) ||
- (!isUpperBoundEqualTo && upperCompare < 0))
-
- }
- override def toString:String = {
- "ScanRange:(upperBound:" + Bytes.toString(upperBound) +
- ",isUpperBoundEqualTo:" + isUpperBoundEqualTo + ",lowerBound:" +
- Bytes.toString(lowerBound) + ",isLowerBoundEqualTo:" + isLowerBoundEqualTo + ")"
- }
-}
-
-/**
- * Contains information related to a filters for a given column.
- * This can contain many ranges or points.
- *
- * @param currentPoint the initial point when the filter is created
- * @param currentRange the initial scanRange when the filter is created
- */
-@InterfaceAudience.Private
-class ColumnFilter (currentPoint:Array[Byte] = null,
- currentRange:ScanRange = null,
- var points:mutable.MutableList[Array[Byte]] =
- new mutable.MutableList[Array[Byte]](),
- var ranges:mutable.MutableList[ScanRange] =
- new mutable.MutableList[ScanRange]() ) extends Serializable {
- //Collection of ranges
- if (currentRange != null ) ranges.+=(currentRange)
-
- //Collection of points
- if (currentPoint != null) points.+=(currentPoint)
-
- /**
- * This will validate a give value through the filter's points and/or ranges
- * the result will be if the value passed the filter
- *
- * @param value Value to be validated
- * @param valueOffSet The offset of the value
- * @param valueLength The length of the value
- * @return True is the value passes the filter false if not
- */
- def validate(value:Array[Byte], valueOffSet:Int, valueLength:Int):Boolean = {
- var result = false
-
- points.foreach( p => {
- if (Bytes.equals(p, 0, p.length, value, valueOffSet, valueLength)) {
- result = true
- }
- })
-
- ranges.foreach( r => {
- val upperBoundPass = r.upperBound == null ||
- (r.isUpperBoundEqualTo &&
- Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
- value, valueOffSet, valueLength) >= 0) ||
- (!r.isUpperBoundEqualTo &&
- Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
- value, valueOffSet, valueLength) > 0)
-
- val lowerBoundPass = r.lowerBound == null || r.lowerBound.length == 0
- (r.isLowerBoundEqualTo &&
- Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
- value, valueOffSet, valueLength) <= 0) ||
- (!r.isLowerBoundEqualTo &&
- Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
- value, valueOffSet, valueLength) < 0)
-
- result = result || (upperBoundPass && lowerBoundPass)
- })
- result
- }
-
- /**
- * This will allow us to merge filter logic that is joined to the existing filter
- * through a OR operator
- *
- * @param other Filter to merge
- */
- def mergeUnion(other:ColumnFilter): Unit = {
- other.points.foreach( p => points += p)
-
- other.ranges.foreach( otherR => {
- var doesOverLap = false
- ranges.foreach{ r =>
- if (r.getOverLapScanRange(otherR) != null) {
- r.mergeUnion(otherR)
- doesOverLap = true
- }}
- if (!doesOverLap) ranges.+=(otherR)
- })
- }
-
- /**
- * This will allow us to merge filter logic that is joined to the existing filter
- * through a AND operator
- *
- * @param other Filter to merge
- */
- def mergeIntersect(other:ColumnFilter): Unit = {
- val survivingPoints = new mutable.MutableList[Array[Byte]]()
- points.foreach( p => {
- other.points.foreach( otherP => {
- if (Bytes.equals(p, otherP)) {
- survivingPoints.+=(p)
- }
- })
- })
- points = survivingPoints
-
- val survivingRanges = new mutable.MutableList[ScanRange]()
-
- other.ranges.foreach( otherR => {
- ranges.foreach( r => {
- if (r.getOverLapScanRange(otherR) != null) {
- r.mergeIntersect(otherR)
- survivingRanges += r
- }
- })
- })
- ranges = survivingRanges
- }
-
- override def toString:String = {
- val strBuilder = new StringBuilder
- strBuilder.append("(points:(")
- var isFirst = true
- points.foreach( p => {
- if (isFirst) isFirst = false
- else strBuilder.append(",")
- strBuilder.append(Bytes.toString(p))
- })
- strBuilder.append("),ranges:")
- isFirst = true
- ranges.foreach( r => {
- if (isFirst) isFirst = false
- else strBuilder.append(",")
- strBuilder.append(r)
- })
- strBuilder.append("))")
- strBuilder.toString()
- }
-}
-
-/**
- * A collection of ColumnFilters indexed by column names.
- *
- * Also contains merge commends that will consolidate the filters
- * per column name
- */
-@InterfaceAudience.Private
-class ColumnFilterCollection {
- val columnFilterMap = new mutable.HashMap[String, ColumnFilter]
-
- def clear(): Unit = {
- columnFilterMap.clear()
- }
-
- /**
- * This will allow us to merge filter logic that is joined to the existing filter
- * through a OR operator. This will merge a single columns filter
- *
- * @param column The column to be merged
- * @param other The other ColumnFilter object to merge
- */
- def mergeUnion(column:String, other:ColumnFilter): Unit = {
- val existingFilter = columnFilterMap.get(column)
- if (existingFilter.isEmpty) {
- columnFilterMap.+=((column, other))
- } else {
- existingFilter.get.mergeUnion(other)
- }
- }
-
- /**
- * This will allow us to merge all filters in the existing collection
- * to the filters in the other collection. All merges are done as a result
- * of a OR operator
- *
- * @param other The other Column Filter Collection to be merged
- */
- def mergeUnion(other:ColumnFilterCollection): Unit = {
- other.columnFilterMap.foreach( e => {
- mergeUnion(e._1, e._2)
- })
- }
-
- /**
- * This will allow us to merge all filters in the existing collection
- * to the filters in the other collection. All merges are done as a result
- * of a AND operator
- *
- * @param other The column filter from the other collection
- */
- def mergeIntersect(other:ColumnFilterCollection): Unit = {
- other.columnFilterMap.foreach( e => {
- val existingColumnFilter = columnFilterMap.get(e._1)
- if (existingColumnFilter.isEmpty) {
- columnFilterMap += e
- } else {
- existingColumnFilter.get.mergeIntersect(e._2)
- }
- })
- }
-
- override def toString:String = {
- val strBuilder = new StringBuilder
- columnFilterMap.foreach( e => strBuilder.append(e))
- strBuilder.toString()
- }
-}
-
-/**
- * Status object to store static functions but also to hold last executed
- * information that can be used for unit testing.
- */
-@InterfaceAudience.Private
-object DefaultSourceStaticUtils {
-
- val rawInteger = new RawInteger
- val rawLong = new RawLong
- val rawFloat = new RawFloat
- val rawDouble = new RawDouble
- val rawString = RawString.ASCENDING
-
- val byteRange = new ThreadLocal[PositionedByteRange] {
- override def initialValue(): PositionedByteRange = {
- val range = new SimplePositionedMutableByteRange()
- range.setOffset(0)
- range.setPosition(0)
- }
- }
-
- def getFreshByteRange(bytes: Array[Byte]): PositionedByteRange = {
- getFreshByteRange(bytes, 0, bytes.length)
- }
-
- def getFreshByteRange(bytes: Array[Byte], offset: Int = 0, length: Int):
- PositionedByteRange = {
- byteRange.get().set(bytes).setLength(length).setOffset(offset)
- }
-
- //This will contain the last 5 filters and required fields used in buildScan
- // These values can be used in unit testing to make sure we are converting
- // The Spark SQL input correctly
- val lastFiveExecutionRules =
- new ConcurrentLinkedQueue[ExecutionRuleForUnitTesting]()
-
- /**
- * This method is to populate the lastFiveExecutionRules for unit test perposes
- * This method is not thread safe.
- *
- * @param rowKeyFilter The rowKey Filter logic used in the last query
- * @param dynamicLogicExpression The dynamicLogicExpression used in the last query
- */
- def populateLatestExecutionRules(rowKeyFilter: RowKeyFilter,
- dynamicLogicExpression: DynamicLogicExpression): Unit = {
- lastFiveExecutionRules.add(new ExecutionRuleForUnitTesting(
- rowKeyFilter, dynamicLogicExpression))
- while (lastFiveExecutionRules.size() > 5) {
- lastFiveExecutionRules.poll()
- }
- }
-
- /**
- * This method will convert the result content from HBase into the
- * SQL value type that is requested by the Spark SQL schema definition
- *
- * @param field The structure of the SparkSQL Column
- * @param r The result object from HBase
- * @return The converted object type
- */
- def getValue(field: Field,
- r: Result): Any = {
- if (field.isRowKey) {
- val row = r.getRow
-
- field.dt match {
- case IntegerType => rawInteger.decode(getFreshByteRange(row))
- case LongType => rawLong.decode(getFreshByteRange(row))
- case FloatType => rawFloat.decode(getFreshByteRange(row))
- case DoubleType => rawDouble.decode(getFreshByteRange(row))
- case StringType => rawString.decode(getFreshByteRange(row))
- case TimestampType => rawLong.decode(getFreshByteRange(row))
- case _ => Bytes.toString(row)
- }
- } else {
- val cellByteValue =
- r.getColumnLatestCell(field.cfBytes, field.colBytes)
- if (cellByteValue == null) null
- else field.dt match {
- case IntegerType => rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength))
- case LongType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength))
- case FloatType => rawFloat.decode(getFreshByteRange(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength))
- case DoubleType => rawDouble.decode(getFreshByteRange(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength))
- case StringType => Bytes.toString(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength)
- case TimestampType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength))
- case _ => Bytes.toString(cellByteValue.getValueArray,
- cellByteValue.getValueOffset, cellByteValue.getValueLength)
- }
- }
- }
-
- /**
- * This will convert the value from SparkSQL to be stored into HBase using the
- * right byte Type
- *
- * @param value String value from SparkSQL
- * @return Returns the byte array to go into HBase
- */
- def getByteValue(field: Field,
- value: String): Array[Byte] = {
- field.dt match {
- case IntegerType =>
- val result = new Array[Byte](Bytes.SIZEOF_INT)
- val localDataRange = getFreshByteRange(result)
- rawInteger.encode(localDataRange, value.toInt)
- localDataRange.getBytes
- case LongType =>
- val result = new Array[Byte](Bytes.SIZEOF_LONG)
- val localDataRange = getFreshByteRange(result)
- rawLong.encode(localDataRange, value.toLong)
- localDataRange.getBytes
- case FloatType =>
- val result = new Array[Byte](Bytes.SIZEOF_FLOAT)
- val localDataRange = getFreshByteRange(result)
- rawFloat.encode(localDataRange, value.toFloat)
- localDataRange.getBytes
- case DoubleType =>
- val result = new Array[Byte](Bytes.SIZEOF_DOUBLE)
- val localDataRange = getFreshByteRange(result)
- rawDouble.encode(localDataRange, value.toDouble)
- localDataRange.getBytes
- case StringType =>
- Bytes.toBytes(value)
- case TimestampType =>
- val result = new Array[Byte](Bytes.SIZEOF_LONG)
- val localDataRange = getFreshByteRange(result)
- rawLong.encode(localDataRange, value.toLong)
- localDataRange.getBytes
-
- case _ => Bytes.toBytes(value)
- }
- }
-}
-
-/**
- * Contains information related to a filters for a given column.
- * This can contain many ranges or points.
- *
- * @param currentPoint the initial point when the filter is created
- * @param currentRange the initial scanRange when the filter is created
- */
-@InterfaceAudience.Private
-class RowKeyFilter (currentPoint:Array[Byte] = null,
- currentRange:ScanRange =
- new ScanRange(null, true, new Array[Byte](0), true),
- var points:mutable.MutableList[Array[Byte]] =
- new mutable.MutableList[Array[Byte]](),
- var ranges:mutable.MutableList[ScanRange] =
- new mutable.MutableList[ScanRange]() ) extends Serializable {
- //Collection of ranges
- if (currentRange != null ) ranges.+=(currentRange)
-
- //Collection of points
- if (currentPoint != null) points.+=(currentPoint)
-
- /**
- * This will validate a give value through the filter's points and/or ranges
- * the result will be if the value passed the filter
- *
- * @param value Value to be validated
- * @param valueOffSet The offset of the value
- * @param valueLength The length of the value
- * @return True is the value passes the filter false if not
- */
- def validate(value:Array[Byte], valueOffSet:Int, valueLength:Int):Boolean = {
- var result = false
-
- points.foreach( p => {
- if (Bytes.equals(p, 0, p.length, value, valueOffSet, valueLength)) {
- result = true
- }
- })
-
- ranges.foreach( r => {
- val upperBoundPass = r.upperBound == null ||
- (r.isUpperBoundEqualTo &&
- Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
- value, valueOffSet, valueLength) >= 0) ||
- (!r.isUpperBoundEqualTo &&
- Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
- value, valueOffSet, valueLength) > 0)
-
- val lowerBoundPass = r.lowerBound == null || r.lowerBound.length == 0
- (r.isLowerBoundEqualTo &&
- Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
- value, valueOffSet, valueLength) <= 0) ||
- (!r.isLowerBoundEqualTo &&
- Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
- value, valueOffSet, valueLength) < 0)
-
- result = result || (upperBoundPass && lowerBoundPass)
- })
- result
- }
-
- /**
- * This will allow us to merge filter logic that is joined to the existing filter
- * through a OR operator
- *
- * @param other Filter to merge
- */
- def mergeUnion(other:RowKeyFilter): RowKeyFilter = {
- other.points.foreach( p => points += p)
-
- other.ranges.foreach( otherR => {
- var doesOverLap = false
- ranges.foreach{ r =>
- if (r.getOverLapScanRange(otherR) != null) {
- r.mergeUnion(otherR)
- doesOverLap = true
- }}
- if (!doesOverLap) ranges.+=(otherR)
- })
- this
- }
-
- /**
- * This will allow us to merge filter logic that is joined to the existing filter
- * through a AND operator
- *
- * @param other Filter to merge
- */
- def mergeIntersect(other:RowKeyFilter): RowKeyFilter = {
- val survivingPoints = new mutable.MutableList[Array[Byte]]()
- val didntSurviveFirstPassPoints = new mutable.MutableList[Array[Byte]]()
- if (points == null || points.length == 0) {
- other.points.foreach( otherP => {
- didntSurviveFirstPassPoints += otherP
- })
- } else {
- points.foreach(p => {
- if (other.points.length == 0) {
- didntSurviveFirstPassPoints += p
- } else {
- other.points.foreach(otherP => {
- if (Bytes.equals(p, otherP)) {
- survivingPoints += p
- } else {
- didntSurviveFirstPassPoints += p
- }
- })
- }
- })
- }
-
- val survivingRanges = new mutable.MutableList[ScanRange]()
-
- if (ranges.length == 0) {
- didntSurviveFirstPassPoints.foreach(p => {
- survivingPoints += p
- })
- } else {
- ranges.foreach(r => {
- other.ranges.foreach(otherR => {
- val overLapScanRange = r.getOverLapScanRange(otherR)
- if (overLapScanRange != null) {
- survivingRanges += overLapScanRange
- }
- })
- didntSurviveFirstPassPoints.foreach(p => {
- if (r.containsPoint(p)) {
- survivingPoints += p
- }
- })
- })
- }
- points = survivingPoints
- ranges = survivingRanges
- this
- }
-
- override def toString:String = {
- val strBuilder = new StringBuilder
- strBuilder.append("(points:(")
- var isFirst = true
- points.foreach( p => {
- if (isFirst) isFirst = false
- else strBuilder.append(",")
- strBuilder.append(Bytes.toString(p))
- })
- strBuilder.append("),ranges:")
- isFirst = true
- ranges.foreach( r => {
- if (isFirst) isFirst = false
- else strBuilder.append(",")
- strBuilder.append(r)
- })
- strBuilder.append("))")
- strBuilder.toString()
- }
-}
-
-@InterfaceAudience.Private
-class ExecutionRuleForUnitTesting(val rowKeyFilter: RowKeyFilter,
- val dynamicLogicExpression: DynamicLogicExpression)