You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/09 04:59:55 UTC
[9/9] hbase git commit: HBASE-18817 pull the hbase-spark module out
of branch-2.
HBASE-18817 pull the hbase-spark module out of branch-2.
Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/516d370b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/516d370b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/516d370b
Branch: refs/heads/branch-2
Commit: 516d370b4b41dddb6de335c9ae3dcb9205c35668
Parents: 6d88c49
Author: Sean Busbey <bu...@apache.org>
Authored: Thu Sep 21 14:41:43 2017 -0500
Committer: Sean Busbey <bu...@apache.org>
Committed: Wed Nov 8 22:55:23 2017 -0600
----------------------------------------------------------------------
dev-support/findbugs-exclude.xml | 5 -
hbase-assembly/pom.xml | 27 -
.../src/main/assembly/hadoop-two-compat.xml | 1 -
hbase-spark-it/pom.xml | 333 -----
.../spark/IntegrationTestSparkBulkLoad.java | 663 ----------
hbase-spark/README.txt | 6 -
hbase-spark/pom.xml | 702 ----------
.../hbase/spark/SparkSQLPushDownFilter.java | 275 ----
.../JavaHBaseBulkDeleteExample.java | 80 --
.../hbasecontext/JavaHBaseBulkGetExample.java | 115 --
.../hbasecontext/JavaHBaseBulkLoadExample.java | 102 --
.../hbasecontext/JavaHBaseBulkPutExample.java | 90 --
.../hbasecontext/JavaHBaseDistributedScan.java | 81 --
.../hbasecontext/JavaHBaseMapGetPutExample.java | 105 --
.../JavaHBaseStreamingBulkPutExample.java | 90 --
hbase-spark/src/main/protobuf/SparkFilter.proto | 40 -
.../hbase/spark/BulkLoadPartitioner.scala | 63 -
.../hbase/spark/ByteArrayComparable.scala | 49 -
.../hadoop/hbase/spark/ByteArrayWrapper.scala | 47 -
.../ColumnFamilyQualifierMapKeyWrapper.scala | 75 --
.../hadoop/hbase/spark/DefaultSource.scala | 1224 ------------------
.../hbase/spark/DynamicLogicExpression.scala | 260 ----
.../hbase/spark/FamiliesQualifiersValues.scala | 68 -
.../hbase/spark/FamilyHFileWriteOptions.scala | 38 -
.../hbase/spark/HBaseConnectionCache.scala | 265 ----
.../hadoop/hbase/spark/HBaseContext.scala | 1115 ----------------
.../hbase/spark/HBaseDStreamFunctions.scala | 160 ---
.../hadoop/hbase/spark/HBaseRDDFunctions.scala | 253 ----
.../hadoop/hbase/spark/JavaHBaseContext.scala | 408 ------
.../hadoop/hbase/spark/KeyFamilyQualifier.scala | 48 -
.../apache/hadoop/hbase/spark/NewHBaseRDD.scala | 38 -
.../hadoop/hbase/spark/datasources/Bound.scala | 121 --
.../spark/datasources/HBaseResources.scala | 171 ---
.../spark/datasources/HBaseSparkConf.scala | 62 -
.../spark/datasources/HBaseTableScanRDD.scala | 308 -----
.../spark/datasources/JavaBytesEncoder.scala | 116 --
.../hbase/spark/datasources/NaiveEncoder.scala | 261 ----
.../spark/datasources/SchemaConverters.scala | 430 ------
.../hadoop/hbase/spark/datasources/SerDes.scala | 47 -
.../datasources/SerializableConfiguration.scala | 47 -
.../hbase/spark/datasources/package.scala | 38 -
.../spark/example/datasources/AvroSource.scala | 158 ---
.../spark/example/datasources/DataType.scala | 165 ---
.../spark/example/datasources/HBaseSource.scala | 103 --
.../hbasecontext/HBaseBulkDeleteExample.scala | 63 -
.../hbasecontext/HBaseBulkGetExample.scala | 93 --
.../hbasecontext/HBaseBulkPutExample.scala | 75 --
.../HBaseBulkPutExampleFromFile.scala | 76 --
.../HBaseBulkPutTimestampExample.scala | 77 --
.../HBaseDistributedScanExample.scala | 59 -
.../HBaseStreamingBulkPutExample.scala | 74 --
.../example/rdd/HBaseBulkDeleteExample.scala | 64 -
.../spark/example/rdd/HBaseBulkGetExample.scala | 88 --
.../spark/example/rdd/HBaseBulkPutExample.scala | 76 --
.../rdd/HBaseForeachPartitionExample.scala | 83 --
.../example/rdd/HBaseMapPartitionExample.scala | 89 --
.../hbase/DataTypeParserWrapper.scala | 31 -
.../datasources/hbase/HBaseTableCatalog.scala | 377 ------
.../spark/sql/datasources/hbase/Utils.scala | 100 --
.../hbase/spark/TestJavaHBaseContext.java | 520 --------
hbase-spark/src/test/resources/hbase-site.xml | 157 ---
hbase-spark/src/test/resources/log4j.properties | 76 --
.../hadoop/hbase/spark/BulkLoadSuite.scala | 956 --------------
.../hadoop/hbase/spark/DefaultSourceSuite.scala | 1040 ---------------
.../spark/DynamicLogicExpressionSuite.scala | 339 -----
.../hadoop/hbase/spark/HBaseCatalogSuite.scala | 111 --
.../hbase/spark/HBaseConnectionCacheSuite.scala | 237 ----
.../hadoop/hbase/spark/HBaseContextSuite.scala | 356 -----
.../spark/HBaseDStreamFunctionsSuite.scala | 142 --
.../hbase/spark/HBaseRDDFunctionsSuite.scala | 398 ------
.../hadoop/hbase/spark/HBaseTestSource.scala | 62 -
.../hbase/spark/PartitionFilterSuite.scala | 523 --------
pom.xml | 38 +-
src/main/asciidoc/_chapters/spark.adoc | 690 ----------
src/main/asciidoc/book.adoc | 1 -
75 files changed, 2 insertions(+), 15922 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/dev-support/findbugs-exclude.xml b/dev-support/findbugs-exclude.xml
index 3162cb2..9813546 100644
--- a/dev-support/findbugs-exclude.xml
+++ b/dev-support/findbugs-exclude.xml
@@ -246,9 +246,4 @@
<Source name="~.*\.scala" />
</Match>
- <Match>
- <Package name="org.apache.hadoop.hbase.spark.protobuf.generated"/>
- </Match>
-
-
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml
index 298505b..8961b2e 100644
--- a/hbase-assembly/pom.xml
+++ b/hbase-assembly/pom.xml
@@ -37,22 +37,6 @@
</properties>
<build>
<plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-enforcer-plugin</artifactId>
- <executions>
- <!-- hbase-spark is ok in the assembly -->
- <execution>
- <id>banned-hbase-spark</id>
- <goals>
- <goal>enforce</goal>
- </goals>
- <configuration>
- <skip>true</skip>
- </configuration>
- </execution>
- </executions>
- </plugin>
<!-- licensing info from our dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -267,11 +251,6 @@
<optional>true</optional>
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-spark</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
@@ -281,12 +260,6 @@
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
- <artifactId>hbase-spark-it</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
<artifactId>hbase-backup</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-assembly/src/main/assembly/hadoop-two-compat.xml
----------------------------------------------------------------------
diff --git a/hbase-assembly/src/main/assembly/hadoop-two-compat.xml b/hbase-assembly/src/main/assembly/hadoop-two-compat.xml
index a66237b..820430f 100644
--- a/hbase-assembly/src/main/assembly/hadoop-two-compat.xml
+++ b/hbase-assembly/src/main/assembly/hadoop-two-compat.xml
@@ -46,7 +46,6 @@
<include>org.apache.hbase:hbase-rsgroup</include>
<include>org.apache.hbase:hbase-server</include>
<include>org.apache.hbase:hbase-shell</include>
- <include>org.apache.hbase:hbase-spark</include>
<include>org.apache.hbase:hbase-thrift</include>
<include>org.apache.hbase:hbase-external-blockcache</include>
<include>org.apache.hbase:hbase-backup</include>
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark-it/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-spark-it/pom.xml b/hbase-spark-it/pom.xml
deleted file mode 100644
index faeaf23..0000000
--- a/hbase-spark-it/pom.xml
+++ /dev/null
@@ -1,333 +0,0 @@
-<?xml version="1.0"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-<!--
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
--->
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>hbase-build-configuration</artifactId>
- <groupId>org.apache.hbase</groupId>
- <version>2.0.0-beta-1.SNAPSHOT</version>
- <relativePath>../hbase-build-configuration</relativePath>
- </parent>
-
- <artifactId>hbase-spark-it</artifactId>
- <name>Apache HBase - Spark Integration Tests</name>
- <description>Integration and System tests for HBase</description>
-
-
- <properties>
- <spark.version>1.6.0</spark.version>
- <scala.version>2.10.4</scala.version>
- <scala.binary.version>2.10</scala.binary.version>
- <!-- Test inclusion patterns used by failsafe configuration -->
- <unittest.include>**/Test*.java</unittest.include>
- <integrationtest.include>**/IntegrationTest*.java</integrationtest.include>
- <!-- To Run Tests with a particular Xmx Value use -Dfailsafe.Xmx=XXXg -->
- <failsafe.Xmx>4g</failsafe.Xmx>
- <!-- To run a single integration test, use -Dit.test=IntegrationTestXXX -->
- </properties>
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-site-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- <!-- Make a jar and put the sources in the jar -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- </plugin>
- <plugin>
- <!--Make it so assembly:single does nothing in here-->
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <skipAssembly>true</skipAssembly>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <version>${surefire.version}</version>
- <dependencies>
- <dependency>
- <groupId>org.apache.maven.surefire</groupId>
- <artifactId>surefire-junit4</artifactId>
- <version>${surefire.version}</version>
- </dependency>
- </dependencies>
- <configuration>
- <includes>
- <include>${integrationtest.include}</include>
- </includes>
- <excludes>
- <exclude>${unittest.include}</exclude>
- <exclude>**/*$*</exclude>
- </excludes>
- <redirectTestOutputToFile>${test.output.tofile}</redirectTestOutputToFile>
- <failIfNoTests>false</failIfNoTests>
- <testFailureIgnore>false</testFailureIgnore>
- </configuration>
- <executions>
- <execution>
- <id>integration-test</id>
- <phase>integration-test</phase>
- <goals>
- <goal>integration-test</goal>
- </goals>
- </execution>
- <execution>
- <id>verify</id>
- <phase>verify</phase>
- <goals>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </pluginManagement>
-
- <plugins>
- <!-- Run integration tests with mvn verify -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <configuration>
- <skip>false</skip>
- <forkMode>always</forkMode>
- <!-- TODO: failsafe does timeout, but verify does not fail the build because of the timeout.
- I believe it is a failsafe bug, we may consider using surefire -->
- <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
- <argLine>-enableassertions -Xmx${failsafe.Xmx}
- -Djava.security.egd=file:/dev/./urandom -XX:+CMSClassUnloadingEnabled
- -verbose:gc -XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal</argLine>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-enforcer-plugin</artifactId>
- <executions>
- <!-- purposefully have jsr 305 exclusion only warn in this module -->
- <execution>
- <id>banned-jsr305</id>
- <goals>
- <goal>enforce</goal>
- </goals>
- <configuration>
- <fail>false</fail>
- </configuration>
- </execution>
- <!-- hbase-spark is ok in this modules -->
- <execution>
- <id>banned-hbase-spark</id>
- <goals>
- <goal>enforce</goal>
- </goals>
- <configuration>
- <skip>true</skip>
- </configuration>
- </execution>
- <execution>
- <id>banned-scala</id>
- <goals>
- <goal>enforce</goal>
- </goals>
- <configuration>
- <skip>true</skip>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate
- the required classpath that is required in the env
- of the launch container in the mini cluster
- -->
- <outputFile>${project.build.directory}/test-classes/spark-generated-classpath</outputFile>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
- </build>
-
- <dependencies>
- <!-- Intra-project dependencies -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-common</artifactId>
- <type>jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <type>jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-spark</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-it</artifactId>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>${compat.module}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-testing-util</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- <!-- Hadoop needs Netty 3.x at test scope for the minicluster -->
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>${netty.hadoop.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <!-- make sure wrong scala version is not pulled in -->
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </exclusion>
- <exclusion>
- <!-- make sure wrong scala version is not pulled in -->
- <groupId>org.scala-lang</groupId>
- <artifactId>scalap</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <type>test-jar</type>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <profiles>
- <!-- Skip the tests in this module -->
- <profile>
- <id>skipIntegrationTests</id>
- <activation>
- <property>
- <name>skipIntegrationTests</name>
- </property>
- </activation>
- <properties>
- <skipTests>true</skipTests>
- </properties>
- </profile>
- </profiles>
-
- <reporting>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-report-plugin</artifactId>
- <version>2.7.2</version>
- <reportSets>
- <reportSet>
- <id>spark-integration-tests</id>
- <reports>
- <report>report-only</report>
- </reports>
- <configuration>
- <outputName>failsafe-report</outputName>
- <reportsDirectories>
- <reportsDirectory>${project.build.directory}/failsafe-reports</reportsDirectory>
- </reportsDirectories>
- </configuration>
- </reportSet>
- </reportSets>
- </plugin>
- </plugins>
- </reporting>
-
-</project>
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
deleted file mode 100644
index b22c9ca..0000000
--- a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
+++ /dev/null
@@ -1,663 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.IntegrationTestBase;
-import org.apache.hadoop.hbase.IntegrationTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Consistency;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import org.apache.hadoop.hbase.mapreduce.IntegrationTestBulkLoad;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RegionSplitter;
-
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.spark.SerializableWritable;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import org.apache.spark.Partitioner;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.junit.Test;
-import scala.Tuple2;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * Test Bulk Load and Spark on a distributed cluster.
- * It starts an Spark job that creates linked chains.
- * This test mimic {@link IntegrationTestBulkLoad} in mapreduce.
- *
- * Usage on cluster:
- * First add hbase related jars and hbase-spark.jar into spark classpath.
- *
- * spark-submit --class org.apache.hadoop.hbase.spark.IntegrationTestSparkBulkLoad
- * HBASE_HOME/lib/hbase-spark-it-XXX-tests.jar -m slowDeterministic -Dhbase.spark.bulkload.chainlength=300
- */
-public class IntegrationTestSparkBulkLoad extends IntegrationTestBase {
-
- private static final Log LOG = LogFactory.getLog(IntegrationTestSparkBulkLoad.class);
-
- // The number of partitions for random generated data
- private static String BULKLOAD_PARTITIONS_NUM = "hbase.spark.bulkload.partitionsnum";
- private static int DEFAULT_BULKLOAD_PARTITIONS_NUM = 3;
-
- private static String BULKLOAD_CHAIN_LENGTH = "hbase.spark.bulkload.chainlength";
- private static int DEFAULT_BULKLOAD_CHAIN_LENGTH = 200000;
-
- private static String BULKLOAD_IMPORT_ROUNDS = "hbase.spark.bulkload.importround";
- private static int DEFAULT_BULKLOAD_IMPORT_ROUNDS = 1;
-
- private static String CURRENT_ROUND_NUM = "hbase.spark.bulkload.current.roundnum";
-
- private static String NUM_REPLICA_COUNT_KEY = "hbase.spark.bulkload.replica.countkey";
- private static int DEFAULT_NUM_REPLICA_COUNT = 1;
-
- private static String BULKLOAD_TABLE_NAME = "hbase.spark.bulkload.tableName";
- private static String DEFAULT_BULKLOAD_TABLE_NAME = "IntegrationTestSparkBulkLoad";
-
- private static String BULKLOAD_OUTPUT_PATH = "hbase.spark.bulkload.output.path";
-
- private static final String OPT_LOAD = "load";
- private static final String OPT_CHECK = "check";
-
- private boolean load = false;
- private boolean check = false;
-
- private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
- private static final byte[] SORT_FAM = Bytes.toBytes("S");
- private static final byte[] DATA_FAM = Bytes.toBytes("D");
-
- /**
- * Running spark job to load data into hbase table
- */
- public void runLoad() throws Exception {
- setupTable();
- int numImportRounds = getConf().getInt(BULKLOAD_IMPORT_ROUNDS, DEFAULT_BULKLOAD_IMPORT_ROUNDS);
- LOG.info("Running load with numIterations:" + numImportRounds);
- for (int i = 0; i < numImportRounds; i++) {
- runLinkedListSparkJob(i);
- }
- }
-
- /**
- * Running spark job to create LinkedList for testing
- * @param iteration iteration th of this job
- * @throws Exception
- */
- public void runLinkedListSparkJob(int iteration) throws Exception {
- String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + " _load " +
- EnvironmentEdgeManager.currentTime();
-
- LOG.info("Running iteration " + iteration + "in Spark Job");
-
- Path output = null;
- if (conf.get(BULKLOAD_OUTPUT_PATH) == null) {
- output = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
- } else {
- output = new Path(conf.get(BULKLOAD_OUTPUT_PATH));
- }
-
- SparkConf sparkConf = new SparkConf().setAppName(jobName).setMaster("local");
- Configuration hbaseConf = new Configuration(getConf());
- hbaseConf.setInt(CURRENT_ROUND_NUM, iteration);
- int partitionNum = hbaseConf.getInt(BULKLOAD_PARTITIONS_NUM, DEFAULT_BULKLOAD_PARTITIONS_NUM);
-
-
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, hbaseConf);
-
-
- LOG.info("Partition RDD into " + partitionNum + " parts");
- List<String> temp = new ArrayList<>();
- JavaRDD<List<byte[]>> rdd = jsc.parallelize(temp, partitionNum).
- mapPartitionsWithIndex(new LinkedListCreationMapper(new SerializableWritable<>(hbaseConf)), false);
-
- hbaseContext.bulkLoad(rdd, getTablename(), new ListToKeyValueFunc(), output.toUri().getPath(),
- new HashMap<>(), false, HConstants.DEFAULT_MAX_FILE_SIZE);
-
- try (Connection conn = ConnectionFactory.createConnection(conf);
- Admin admin = conn.getAdmin();
- Table table = conn.getTable(getTablename());
- RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
- // Create a new loader.
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
-
- // Load the HFiles into table.
- loader.doBulkLoad(output, admin, table, regionLocator);
- }
-
-
- // Delete the files.
- util.getTestFileSystem().delete(output, true);
- jsc.close();
- }
-
- // See mapreduce.IntegrationTestBulkLoad#LinkedListCreationMapper
- // Used to generate test data
- public static class LinkedListCreationMapper implements
- Function2<Integer, Iterator<String>, Iterator<List<byte[]>>> {
-
- SerializableWritable swConfig = null;
- private Random rand = new Random();
-
- public LinkedListCreationMapper(SerializableWritable conf) {
- this.swConfig = conf;
- }
-
- @Override
- public Iterator<List<byte[]>> call(Integer v1, Iterator v2) throws Exception {
- Configuration config = (Configuration) swConfig.value();
- int partitionId = v1.intValue();
- LOG.info("Starting create List in Partition " + partitionId);
-
- int partitionNum = config.getInt(BULKLOAD_PARTITIONS_NUM, DEFAULT_BULKLOAD_PARTITIONS_NUM);
- int chainLength = config.getInt(BULKLOAD_CHAIN_LENGTH, DEFAULT_BULKLOAD_CHAIN_LENGTH);
- int iterationsNum = config.getInt(BULKLOAD_IMPORT_ROUNDS, DEFAULT_BULKLOAD_IMPORT_ROUNDS);
- int iterationsCur = config.getInt(CURRENT_ROUND_NUM, 0);
- List<List<byte[]>> res = new LinkedList<>();
-
-
- long tempId = partitionId + iterationsCur * partitionNum;
- long totalPartitionNum = partitionNum * iterationsNum;
- long chainId = Math.abs(rand.nextLong());
- chainId = chainId - (chainId % totalPartitionNum) + tempId;
-
- byte[] chainIdArray = Bytes.toBytes(chainId);
- long currentRow = 0;
- long nextRow = getNextRow(0, chainLength);
- for(long i = 0; i < chainLength; i++) {
- byte[] rk = Bytes.toBytes(currentRow);
- // Insert record into a list
- List<byte[]> tmp1 = Arrays.asList(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
- List<byte[]> tmp2 = Arrays.asList(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
- List<byte[]> tmp3 = Arrays.asList(rk, DATA_FAM, chainIdArray, Bytes.toBytes(
- RandomStringUtils.randomAlphabetic(50)));
- res.add(tmp1);
- res.add(tmp2);
- res.add(tmp3);
-
- currentRow = nextRow;
- nextRow = getNextRow(i+1, chainLength);
- }
- return res.iterator();
- }
-
- /** Returns a unique row id within this chain for this index */
- private long getNextRow(long index, long chainLength) {
- long nextRow = Math.abs(new Random().nextLong());
- // use significant bits from the random number, but pad with index to ensure it is unique
- // this also ensures that we do not reuse row = 0
- // row collisions from multiple mappers are fine, since we guarantee unique chainIds
- nextRow = nextRow - (nextRow % chainLength) + index;
- return nextRow;
- }
- }
-
-
-
- public static class ListToKeyValueFunc implements
- Function<List<byte[]>, Pair<KeyFamilyQualifier, byte[]>> {
- @Override
- public Pair<KeyFamilyQualifier, byte[]> call(List<byte[]> v1) throws Exception {
- if (v1 == null || v1.size() != 4) {
- return null;
- }
- KeyFamilyQualifier kfq = new KeyFamilyQualifier(v1.get(0), v1.get(1), v1.get(2));
-
- return new Pair<>(kfq, v1.get(3));
- }
- }
-
- /**
- * After adding data to the table start a mr job to
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- public void runCheck() throws Exception {
- LOG.info("Running check");
- String jobName = IntegrationTestSparkBulkLoad.class.getSimpleName() + "_check" + EnvironmentEdgeManager.currentTime();
-
- SparkConf sparkConf = new SparkConf().setAppName(jobName).setMaster("local");
- Configuration hbaseConf = new Configuration(getConf());
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, hbaseConf);
-
- Scan scan = new Scan();
- scan.addFamily(CHAIN_FAM);
- scan.addFamily(SORT_FAM);
- scan.setMaxVersions(1);
- scan.setCacheBlocks(false);
- scan.setBatch(1000);
- int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT);
- if (replicaCount != DEFAULT_NUM_REPLICA_COUNT) {
- scan.setConsistency(Consistency.TIMELINE);
- }
-
- // 1. Using TableInputFormat to get data from HBase table
- // 2. Mimic LinkedListCheckingMapper in mapreduce.IntegrationTestBulkLoad
- // 3. Sort LinkKey by its order ID
- // 4. Group LinkKey if they have same chainId, and repartition RDD by NaturalKeyPartitioner
- // 5. Check LinkList in each Partition using LinkedListCheckingFlatMapFunc
- hbaseContext.hbaseRDD(getTablename(), scan).flatMapToPair(new LinkedListCheckingFlatMapFunc())
- .sortByKey()
- .combineByKey(new createCombinerFunc(), new mergeValueFunc(), new mergeCombinersFunc(),
- new NaturalKeyPartitioner(new SerializableWritable<>(hbaseConf)))
- .foreach(new LinkedListCheckingForeachFunc(new SerializableWritable<>(hbaseConf)));
- jsc.close();
- }
-
- private void runCheckWithRetry() throws Exception {
- try {
- runCheck();
- } catch (Throwable t) {
- LOG.warn("Received " + StringUtils.stringifyException(t));
- LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not");
- runCheck();
- throw t; // we should still fail the test even if second retry succeeds
- }
- // everything green
- }
-
- /**
- * PairFlatMapFunction used to transfer <Row, Result> to Tuple <SparkLinkKey, SparkLinkChain>
- */
- public static class LinkedListCheckingFlatMapFunc implements
- PairFlatMapFunction<Tuple2<ImmutableBytesWritable, Result>, SparkLinkKey, SparkLinkChain> {
-
- @Override
- public Iterable<Tuple2<SparkLinkKey, SparkLinkChain>> call(Tuple2<ImmutableBytesWritable, Result> v)
- throws Exception {
- Result value = v._2();
- long longRk = Bytes.toLong(value.getRow());
- List<Tuple2<SparkLinkKey, SparkLinkChain>> list = new LinkedList<>();
-
- for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
- long chainId = Bytes.toLong(entry.getKey());
- long next = Bytes.toLong(entry.getValue());
- Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
- long order = Bytes.toLong(CellUtil.cloneValue(c));
- Tuple2<SparkLinkKey, SparkLinkChain> tuple2 =
- new Tuple2<>(new SparkLinkKey(chainId, order), new SparkLinkChain(longRk, next));
- list.add(tuple2);
- }
- return list;
- }
- }
-
- public static class createCombinerFunc implements
- Function<SparkLinkChain, List<SparkLinkChain>> {
- @Override
- public List<SparkLinkChain> call(SparkLinkChain v1) throws Exception {
- List<SparkLinkChain> list = new LinkedList<>();
- list.add(v1);
- return list;
- }
- }
-
- public static class mergeValueFunc implements
- Function2<List<SparkLinkChain>, SparkLinkChain, List<SparkLinkChain>> {
- @Override
- public List<SparkLinkChain> call(List<SparkLinkChain> v1, SparkLinkChain v2) throws Exception {
- if (v1 == null)
- v1 = new LinkedList<>();
- v1.add(v2);
- return v1;
- }
- }
-
- public static class mergeCombinersFunc implements
- Function2<List<SparkLinkChain>, List<SparkLinkChain>, List<SparkLinkChain>> {
- @Override
- public List<SparkLinkChain> call(List<SparkLinkChain> v1, List<SparkLinkChain> v2) throws Exception {
- v1.addAll(v2);
- return v1;
- }
- }
-
- /**
- * Class to figure out what partition to send a link in the chain to. This is based upon
- * the linkKey's ChainId.
- */
- public static class NaturalKeyPartitioner extends Partitioner {
-
- private int numPartions = 0;
- public NaturalKeyPartitioner(SerializableWritable swConf) {
- Configuration hbaseConf = (Configuration) swConf.value();
- numPartions = hbaseConf.getInt(BULKLOAD_PARTITIONS_NUM, DEFAULT_BULKLOAD_PARTITIONS_NUM);
-
- }
-
- @Override
- public int numPartitions() {
- return numPartions;
- }
-
- @Override
- public int getPartition(Object key) {
- if (!(key instanceof SparkLinkKey))
- return -1;
- int hash = ((SparkLinkKey) key).getChainId().hashCode();
- return Math.abs(hash % numPartions);
-
- }
- }
-
- /**
- * Sort all LinkChain for one LinkKey, and test List<LinkChain>
- */
- public static class LinkedListCheckingForeachFunc
- implements VoidFunction<Tuple2<SparkLinkKey, List<SparkLinkChain>>> {
-
- private SerializableWritable swConf = null;
-
- public LinkedListCheckingForeachFunc(SerializableWritable conf) {
- swConf = conf;
- }
-
- @Override
- public void call(Tuple2<SparkLinkKey, List<SparkLinkChain>> v1) throws Exception {
- long next = -1L;
- long prev = -1L;
- long count = 0L;
-
- SparkLinkKey key = v1._1();
- List<SparkLinkChain> values = v1._2();
-
- for (SparkLinkChain lc : values) {
-
- if (next == -1) {
- if (lc.getRk() != 0L) {
- String msg = "Chains should all start at rk 0, but read rk " + lc.getRk()
- + ". Chain:" + key.getChainId() + ", order:" + key.getOrder();
- throw new RuntimeException(msg);
- }
- next = lc.getNext();
- } else {
- if (next != lc.getRk()) {
- String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting "
- + next + " but got " + lc.getRk() + ". Chain:" + key.getChainId()
- + ", order:" + key.getOrder();
- throw new RuntimeException(msg);
- }
- prev = lc.getRk();
- next = lc.getNext();
- }
- count++;
- }
- Configuration hbaseConf = (Configuration) swConf.value();
- int expectedChainLen = hbaseConf.getInt(BULKLOAD_CHAIN_LENGTH, DEFAULT_BULKLOAD_CHAIN_LENGTH);
- if (count != expectedChainLen) {
- String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got "
- + count + ". Chain:" + key.getChainId() + ", order:" + key.getOrder();
- throw new RuntimeException(msg);
- }
- }
- }
-
- /**
- * Writable class used as the key to group links in the linked list.
- *
- * Used as the key emited from a pass over the table.
- */
- public static class SparkLinkKey implements java.io.Serializable, Comparable<SparkLinkKey> {
-
- private Long chainId;
- private Long order;
-
- public Long getOrder() {
- return order;
- }
-
- public Long getChainId() {
- return chainId;
- }
-
- public SparkLinkKey(long chainId, long order) {
- this.chainId = chainId;
- this.order = order;
- }
-
- @Override
- public int hashCode() {
- return this.getChainId().hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof SparkLinkKey))
- return false;
- SparkLinkKey otherKey = (SparkLinkKey) other;
- return this.getChainId().equals(otherKey.getChainId());
- }
-
- @Override
- public int compareTo(SparkLinkKey other) {
- int res = getChainId().compareTo(other.getChainId());
- if (res == 0)
- res= getOrder().compareTo(other.getOrder());
- return res;
- }
- }
-
- /**
- * Writable used as the value emitted from a pass over the hbase table.
- */
- public static class SparkLinkChain implements java.io.Serializable, Comparable<SparkLinkChain>{
-
- public Long getNext() {
- return next;
- }
-
- public Long getRk() {
- return rk;
- }
-
-
- public SparkLinkChain(Long rk, Long next) {
- this.rk = rk;
- this.next = next;
- }
-
- private Long rk;
- private Long next;
-
- @Override
- public int compareTo(SparkLinkChain linkChain) {
- int res = getRk().compareTo(linkChain.getRk());
- if (res == 0) {
- res = getNext().compareTo(linkChain.getNext());
- }
- return res;
- }
-
- @Override
- public int hashCode() {
- return getRk().hashCode() ^ getNext().hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof SparkLinkChain))
- return false;
- SparkLinkChain otherKey = (SparkLinkChain) other;
- return this.getRk().equals(otherKey.getRk()) && this.getNext().equals(otherKey.getNext());
- }
- }
-
-
- /**
- * Allow the scan to go to replica, this would not affect the runCheck()
- * Since data are BulkLoaded from HFile into table
- * @throws IOException
- * @throws InterruptedException
- */
- private void installSlowingCoproc() throws IOException, InterruptedException {
- int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT);
- if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) return;
-
- TableName t = getTablename();
- Admin admin = util.getAdmin();
- HTableDescriptor desc = admin.getTableDescriptor(t);
- desc.addCoprocessor(IntegrationTestBulkLoad.SlowMeCoproScanOperations.class.getName());
- HBaseTestingUtility.modifyTableSync(admin, desc);
- }
-
- @Test
- public void testBulkLoad() throws Exception {
- runLoad();
- installSlowingCoproc();
- runCheckWithRetry();
- }
-
-
- private byte[][] getSplits(int numRegions) {
- RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
- split.setFirstRow(Bytes.toBytes(0L));
- split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
- return split.split(numRegions);
- }
-
- private void setupTable() throws IOException, InterruptedException {
- if (util.getAdmin().tableExists(getTablename())) {
- util.deleteTable(getTablename());
- }
-
- util.createTable(
- getTablename(),
- new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
- getSplits(16)
- );
-
- int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT);
- if (replicaCount == DEFAULT_NUM_REPLICA_COUNT) return;
-
- TableName t = getTablename();
- HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount);
- }
-
- @Override
- public void setUpCluster() throws Exception {
- util = getTestingUtil(getConf());
- util.initializeCluster(1);
- int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, DEFAULT_NUM_REPLICA_COUNT);
- if (LOG.isDebugEnabled() && replicaCount != DEFAULT_NUM_REPLICA_COUNT) {
- LOG.debug("Region Replicas enabled: " + replicaCount);
- }
-
- // Scale this up on a real cluster
- if (util.isDistributedCluster()) {
- util.getConfiguration().setIfUnset(BULKLOAD_PARTITIONS_NUM, String.valueOf(DEFAULT_BULKLOAD_PARTITIONS_NUM));
- util.getConfiguration().setIfUnset(BULKLOAD_IMPORT_ROUNDS, "1");
- } else {
- util.startMiniMapReduceCluster();
- }
- }
-
- @Override
- protected void addOptions() {
- super.addOptions();
- super.addOptNoArg(OPT_CHECK, "Run check only");
- super.addOptNoArg(OPT_LOAD, "Run load only");
- }
-
- @Override
- protected void processOptions(CommandLine cmd) {
- super.processOptions(cmd);
- check = cmd.hasOption(OPT_CHECK);
- load = cmd.hasOption(OPT_LOAD);
- }
-
- @Override
- public int runTestFromCommandLine() throws Exception {
- if (load) {
- runLoad();
- } else if (check) {
- installSlowingCoproc();
- runCheckWithRetry();
- } else {
- testBulkLoad();
- }
- return 0;
- }
-
- @Override
- public TableName getTablename() {
- return getTableName(getConf());
- }
-
- public static TableName getTableName(Configuration conf) {
- return TableName.valueOf(conf.get(BULKLOAD_TABLE_NAME, DEFAULT_BULKLOAD_TABLE_NAME));
- }
-
- @Override
- protected Set<String> getColumnFamilies() {
- return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM),
- Bytes.toString(SORT_FAM));
- }
-
- public static void main(String[] args) throws Exception {
- Configuration conf = HBaseConfiguration.create();
- IntegrationTestingUtility.setUseDistributedCluster(conf);
- int status = ToolRunner.run(conf, new IntegrationTestSparkBulkLoad(), args);
- System.exit(status);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/README.txt
----------------------------------------------------------------------
diff --git a/hbase-spark/README.txt b/hbase-spark/README.txt
deleted file mode 100644
index 7fad811..0000000
--- a/hbase-spark/README.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-ON PROTOBUFS
-This maven module has core protobuf definition files ('.protos') used by hbase
-Spark that ship with hbase core including tests.
-
-Generation of java files from protobuf .proto files included here is done as
-part of the build.
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-spark/pom.xml b/hbase-spark/pom.xml
deleted file mode 100644
index b3e74ea..0000000
--- a/hbase-spark/pom.xml
+++ /dev/null
@@ -1,702 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>hbase-build-configuration</artifactId>
- <groupId>org.apache.hbase</groupId>
- <version>2.0.0-beta-1.SNAPSHOT</version>
- <relativePath>../hbase-build-configuration</relativePath>
- </parent>
- <artifactId>hbase-spark</artifactId>
- <name>Apache HBase - Spark</name>
- <properties>
- <spark.version>1.6.0</spark.version>
- <scala.version>2.10.4</scala.version>
- <scala.binary.version>2.10</scala.binary.version>
- <top.dir>${project.basedir}/..</top.dir>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.hbase.thirdparty</groupId>
- <artifactId>hbase-shaded-miscellaneous</artifactId>
- </dependency>
- <!-- Force import of Spark's servlet API for unit tests -->
- <dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>javax.servlet-api</artifactId>
- <scope>test</scope>
- </dependency>
- <!-- Mark Spark / Scala as provided -->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- <scope>provided</scope>
- </dependency>
- <!-- we exclude jsr305 below and then expressly relist it as
- provided / optional to avoid dependency resolution possibly
- bringing it back into runtime scope.
- -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <!-- make sure wrong scala version is not pulled in -->
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </exclusion>
- <exclusion>
- <!-- make sure wrong scala version is not pulled in -->
- <groupId>org.scala-lang</groupId>
- <artifactId>scalap</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- <version>1.3.9</version>
- <scope>provided</scope>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <type>test-jar</type>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_${scala.binary.version}</artifactId>
- <version>2.2.4</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalamock</groupId>
- <artifactId>scalamock-scalatest-support_${scala.binary.version}</artifactId>
- <version>3.1.4</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
- <version>${jackson.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop-two.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet.jsp</groupId>
- <artifactId>jsp-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jruby</groupId>
- <artifactId>jruby-complete</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop-two.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet.jsp</groupId>
- <artifactId>jsp-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jruby</groupId>
- <artifactId>jruby-complete</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop-two.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet.jsp</groupId>
- <artifactId>jsp-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jruby</groupId>
- <artifactId>jruby-complete</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop-two.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet.jsp</groupId>
- <artifactId>jsp-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jruby</groupId>
- <artifactId>jruby-complete</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>xerces</groupId>
- <artifactId>xercesImpl</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.thrift</groupId>
- <artifactId>thrift</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-api-2.1</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api-2.5</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-json</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-protocol</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-protocol-shaded</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-annotations</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-annotations</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-hadoop-compat</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.thrift</groupId>
- <artifactId>thrift</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-api-2.1</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api-2.5</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-json</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-hadoop2-compat</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.thrift</groupId>
- <artifactId>thrift</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-api-2.1</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api-2.5</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-json</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-mapreduce</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.0</version>
- <configuration>
- <charset>${project.build.sourceEncoding}</charset>
- <scalaVersion>${scala.version}</scalaVersion>
- <args>
- <arg>-feature</arg>
- </args>
- </configuration>
- <executions>
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>add-source</goal>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <version>1.0</version>
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <junitxml>.</junitxml>
- <filereports>WDF TestSuite.txt</filereports>
- <parallel>false</parallel>
- <systemProperties>
- <org.apache.hadoop.hbase.shaded.io.netty.packagePrefix>org.apache.hadoop.hbase.shaded.</org.apache.hadoop.hbase.shaded.io.netty.packagePrefix>
- </systemProperties>
- </configuration>
- <executions>
- <execution>
- <id>test</id>
- <phase>test</phase>
- <goals>
- <goal>test</goal>
- </goals>
- <configuration>
- <argLine>
- -Xmx1536m -XX:ReservedCodeCacheSize=512m
- </argLine>
- <parallel>false</parallel>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <!-- clover fails due to scala/java cross compile. This guarantees that the scala is
- compiled before the java that will be evaluated by code coverage (scala will not be).
- https://confluence.atlassian.com/display/CLOVERKB/Java-+Scala+cross-compilation+error+-+cannot+find+symbol
- -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>validate</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-test-source</id>
- <phase>validate</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.xolstice.maven.plugins</groupId>
- <artifactId>protobuf-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>compile-protoc</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-enforcer-plugin</artifactId>
- <executions>
- <!-- purposefully have jsr 305 exclusion only warn in this module -->
- <execution>
- <id>banned-jsr305</id>
- <goals>
- <goal>enforce</goal>
- </goals>
- <configuration>
- <fail>false</fail>
- </configuration>
- </execution>
- <!-- scala is ok in the spark modules -->
- <execution>
- <id>banned-scala</id>
- <goals>
- <goal>enforce</goal>
- </goals>
- <configuration>
- <skip>true</skip>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <profiles>
- <!-- Skip the tests in this module -->
- <profile>
- <id>skipSparkTests</id>
- <activation>
- <property>
- <name>skipSparkTests</name>
- </property>
- </activation>
- <properties>
- <surefire.skipFirstPart>true</surefire.skipFirstPart>
- <surefire.skipSecondPart>true</surefire.skipSecondPart>
- <skipTests>true</skipTests>
- </properties>
- </profile>
- </profiles>
-</project>
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
deleted file mode 100644
index a94c59c..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.spark.datasources.BytesEncoder;
-import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder;
-import org.apache.hadoop.hbase.spark.protobuf.generated.SparkFilterProtos;
-import org.apache.hadoop.hbase.util.ByteStringer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.sql.datasources.hbase.Field;
-import scala.collection.mutable.MutableList;
-
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.ByteString;
-
-/**
- * This filter will push down all qualifier logic given to us
- * by SparkSQL so that we have make the filters at the region server level
- * and avoid sending the data back to the client to be filtered.
- */
-@InterfaceAudience.Private
-public class SparkSQLPushDownFilter extends FilterBase{
- protected static final Log log = LogFactory.getLog(SparkSQLPushDownFilter.class);
-
- //The following values are populated with protobuffer
- DynamicLogicExpression dynamicLogicExpression;
- byte[][] valueFromQueryArray;
- HashMap<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
- currentCellToColumnIndexMap;
-
- //The following values are transient
- HashMap<String, ByteArrayComparable> columnToCurrentRowValueMap = null;
-
- static final byte[] rowKeyFamily = new byte[0];
- static final byte[] rowKeyQualifier = Bytes.toBytes("key");
-
- String encoderClassName;
-
- public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression,
- byte[][] valueFromQueryArray,
- HashMap<ByteArrayComparable,
- HashMap<ByteArrayComparable, String>>
- currentCellToColumnIndexMap, String encoderClassName) {
- this.dynamicLogicExpression = dynamicLogicExpression;
- this.valueFromQueryArray = valueFromQueryArray;
- this.currentCellToColumnIndexMap = currentCellToColumnIndexMap;
- this.encoderClassName = encoderClassName;
- }
-
- public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression,
- byte[][] valueFromQueryArray,
- MutableList<Field> fields, String encoderClassName) {
- this.dynamicLogicExpression = dynamicLogicExpression;
- this.valueFromQueryArray = valueFromQueryArray;
- this.encoderClassName = encoderClassName;
-
- //generate family qualifier to index mapping
- this.currentCellToColumnIndexMap =
- new HashMap<>();
-
- for (int i = 0; i < fields.size(); i++) {
- Field field = fields.apply(i);
-
- byte[] cfBytes = field.cfBytes();
- ByteArrayComparable familyByteComparable =
- new ByteArrayComparable(cfBytes, 0, cfBytes.length);
-
- HashMap<ByteArrayComparable, String> qualifierIndexMap =
- currentCellToColumnIndexMap.get(familyByteComparable);
-
- if (qualifierIndexMap == null) {
- qualifierIndexMap = new HashMap<>();
- currentCellToColumnIndexMap.put(familyByteComparable, qualifierIndexMap);
- }
- byte[] qBytes = field.colBytes();
- ByteArrayComparable qualifierByteComparable =
- new ByteArrayComparable(qBytes, 0, qBytes.length);
-
- qualifierIndexMap.put(qualifierByteComparable, field.colName());
- }
- }
-
- @Override
- public ReturnCode filterCell(final Cell c) throws IOException {
-
- //If the map RowValueMap is empty then we need to populate
- // the row key
- if (columnToCurrentRowValueMap == null) {
- columnToCurrentRowValueMap = new HashMap<>();
- HashMap<ByteArrayComparable, String> qualifierColumnMap =
- currentCellToColumnIndexMap.get(
- new ByteArrayComparable(rowKeyFamily, 0, rowKeyFamily.length));
-
- if (qualifierColumnMap != null) {
- String rowKeyColumnName =
- qualifierColumnMap.get(
- new ByteArrayComparable(rowKeyQualifier, 0,
- rowKeyQualifier.length));
- //Make sure that the rowKey is part of the where clause
- if (rowKeyColumnName != null) {
- columnToCurrentRowValueMap.put(rowKeyColumnName,
- new ByteArrayComparable(c.getRowArray(),
- c.getRowOffset(), c.getRowLength()));
- }
- }
- }
-
- //Always populate the column value into the RowValueMap
- ByteArrayComparable currentFamilyByteComparable =
- new ByteArrayComparable(c.getFamilyArray(),
- c.getFamilyOffset(),
- c.getFamilyLength());
-
- HashMap<ByteArrayComparable, String> qualifierColumnMap =
- currentCellToColumnIndexMap.get(
- currentFamilyByteComparable);
-
- if (qualifierColumnMap != null) {
-
- String columnName =
- qualifierColumnMap.get(
- new ByteArrayComparable(c.getQualifierArray(),
- c.getQualifierOffset(),
- c.getQualifierLength()));
-
- if (columnName != null) {
- columnToCurrentRowValueMap.put(columnName,
- new ByteArrayComparable(c.getValueArray(),
- c.getValueOffset(), c.getValueLength()));
- }
- }
-
- return ReturnCode.INCLUDE;
- }
-
-
- @Override
- public boolean filterRow() throws IOException {
-
- try {
- boolean result =
- dynamicLogicExpression.execute(columnToCurrentRowValueMap,
- valueFromQueryArray);
- columnToCurrentRowValueMap = null;
- return !result;
- } catch (Throwable e) {
- log.error("Error running dynamic logic on row", e);
- }
- return false;
- }
-
-
- /**
- * @param pbBytes A pb serialized instance
- * @return An instance of SparkSQLPushDownFilter
- * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
- */
- @SuppressWarnings("unused")
- public static SparkSQLPushDownFilter parseFrom(final byte[] pbBytes)
- throws DeserializationException {
-
- SparkFilterProtos.SQLPredicatePushDownFilter proto;
- try {
- proto = SparkFilterProtos.SQLPredicatePushDownFilter.parseFrom(pbBytes);
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
-
- String encoder = proto.getEncoderClassName();
- BytesEncoder enc = JavaBytesEncoder.create(encoder);
-
- //Load DynamicLogicExpression
- DynamicLogicExpression dynamicLogicExpression =
- DynamicLogicExpressionBuilder.build(proto.getDynamicLogicExpression(), enc);
-
- //Load valuesFromQuery
- final List<ByteString> valueFromQueryArrayList = proto.getValueFromQueryArrayList();
- byte[][] valueFromQueryArray = new byte[valueFromQueryArrayList.size()][];
- for (int i = 0; i < valueFromQueryArrayList.size(); i++) {
- valueFromQueryArray[i] = valueFromQueryArrayList.get(i).toByteArray();
- }
-
- //Load mapping from HBase family/qualifier to Spark SQL columnName
- HashMap<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
- currentCellToColumnIndexMap = new HashMap<>();
-
- for (SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping
- sqlPredicatePushDownCellToColumnMapping :
- proto.getCellToColumnMappingList()) {
-
- byte[] familyArray =
- sqlPredicatePushDownCellToColumnMapping.getColumnFamily().toByteArray();
- ByteArrayComparable familyByteComparable =
- new ByteArrayComparable(familyArray, 0, familyArray.length);
- HashMap<ByteArrayComparable, String> qualifierMap =
- currentCellToColumnIndexMap.get(familyByteComparable);
-
- if (qualifierMap == null) {
- qualifierMap = new HashMap<>();
- currentCellToColumnIndexMap.put(familyByteComparable, qualifierMap);
- }
- byte[] qualifierArray =
- sqlPredicatePushDownCellToColumnMapping.getQualifier().toByteArray();
-
- ByteArrayComparable qualifierByteComparable =
- new ByteArrayComparable(qualifierArray, 0 ,qualifierArray.length);
-
- qualifierMap.put(qualifierByteComparable,
- sqlPredicatePushDownCellToColumnMapping.getColumnName());
- }
-
- return new SparkSQLPushDownFilter(dynamicLogicExpression,
- valueFromQueryArray, currentCellToColumnIndexMap, encoder);
- }
-
- /**
- * @return The filter serialized using pb
- */
- public byte[] toByteArray() {
-
- SparkFilterProtos.SQLPredicatePushDownFilter.Builder builder =
- SparkFilterProtos.SQLPredicatePushDownFilter.newBuilder();
-
- SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping.Builder columnMappingBuilder =
- SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping.newBuilder();
-
- builder.setDynamicLogicExpression(dynamicLogicExpression.toExpressionString());
- for (byte[] valueFromQuery: valueFromQueryArray) {
- builder.addValueFromQueryArray(ByteStringer.wrap(valueFromQuery));
- }
-
- for (Map.Entry<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
- familyEntry : currentCellToColumnIndexMap.entrySet()) {
- for (Map.Entry<ByteArrayComparable, String> qualifierEntry :
- familyEntry.getValue().entrySet()) {
- columnMappingBuilder.setColumnFamily(
- ByteStringer.wrap(familyEntry.getKey().bytes()));
- columnMappingBuilder.setQualifier(
- ByteStringer.wrap(qualifierEntry.getKey().bytes()));
- columnMappingBuilder.setColumnName(qualifierEntry.getValue());
- builder.addCellToColumnMapping(columnMappingBuilder.build());
- }
- }
- builder.setEncoderClassName(encoderClassName);
-
-
- return builder.build().toByteArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java
deleted file mode 100644
index 97cf140..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkDeleteExample.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This is a simple example of deleting records in HBase
- * with the bulkDelete function.
- */
-final public class JavaHBaseBulkDeleteExample {
-
- private JavaHBaseBulkDeleteExample() {}
-
- public static void main(String[] args) {
- if (args.length < 1) {
- System.out.println("JavaHBaseBulkDeleteExample {tableName}");
- return;
- }
-
- String tableName = args[0];
-
- SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkDeleteExample " + tableName);
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
- try {
- List<byte[]> list = new ArrayList<>(5);
- list.add(Bytes.toBytes("1"));
- list.add(Bytes.toBytes("2"));
- list.add(Bytes.toBytes("3"));
- list.add(Bytes.toBytes("4"));
- list.add(Bytes.toBytes("5"));
-
- JavaRDD<byte[]> rdd = jsc.parallelize(list);
-
- Configuration conf = HBaseConfiguration.create();
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- hbaseContext.bulkDelete(rdd,
- TableName.valueOf(tableName), new DeleteFunction(), 4);
- } finally {
- jsc.stop();
- }
-
- }
-
- public static class DeleteFunction implements Function<byte[], Delete> {
- private static final long serialVersionUID = 1L;
- public Delete call(byte[] v) throws Exception {
- return new Delete(v);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/516d370b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java
deleted file mode 100644
index cb9e0c7..0000000
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkGetExample.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.spark.example.hbasecontext;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.spark.JavaHBaseContext;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-/**
- * This is a simple example of getting records in HBase
- * with the bulkGet function.
- */
-final public class JavaHBaseBulkGetExample {
-
- private JavaHBaseBulkGetExample() {}
-
- public static void main(String[] args) {
- if (args.length < 1) {
- System.out.println("JavaHBaseBulkGetExample {tableName}");
- return;
- }
-
- String tableName = args[0];
-
- SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkGetExample " + tableName);
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-
- try {
- List<byte[]> list = new ArrayList<>(5);
- list.add(Bytes.toBytes("1"));
- list.add(Bytes.toBytes("2"));
- list.add(Bytes.toBytes("3"));
- list.add(Bytes.toBytes("4"));
- list.add(Bytes.toBytes("5"));
-
- JavaRDD<byte[]> rdd = jsc.parallelize(list);
-
- Configuration conf = HBaseConfiguration.create();
-
- JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
-
- hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, new GetFunction(),
- new ResultFunction());
- } finally {
- jsc.stop();
- }
- }
-
- public static class GetFunction implements Function<byte[], Get> {
-
- private static final long serialVersionUID = 1L;
-
- public Get call(byte[] v) throws Exception {
- return new Get(v);
- }
- }
-
- public static class ResultFunction implements Function<Result, String> {
-
- private static final long serialVersionUID = 1L;
-
- public String call(Result result) throws Exception {
- Iterator<Cell> it = result.listCells().iterator();
- StringBuilder b = new StringBuilder();
-
- b.append(Bytes.toString(result.getRow())).append(":");
-
- while (it.hasNext()) {
- Cell cell = it.next();
- String q = Bytes.toString(cell.getQualifierArray());
- if (q.equals("counter")) {
- b.append("(")
- .append(Bytes.toString(cell.getQualifierArray()))
- .append(",")
- .append(Bytes.toLong(cell.getValueArray()))
- .append(")");
- } else {
- b.append("(")
- .append(Bytes.toString(cell.getQualifierArray()))
- .append(",")
- .append(Bytes.toString(cell.getValueArray()))
- .append(")");
- }
- }
- return b.toString();
- }
- }
-}